1. 程式人生 > >最全Flume、ElasticSearch、Kibana實現日誌實時展示

最全Flume、ElasticSearch、Kibana實現日誌實時展示

今天一天的時間,成功使用flume把日誌扇入ElasticSearch中,並執行Kibana實現日誌的初步展示,記錄於此。

1:ES叢集的搭建不予贅述,可參考:如何搭建ES叢集

2:Flume與ES協同

這一部分堪稱是重中之重,主要的時間就是花費在這上面了。

flume的sink裡,其實是有ElasticSearchSink的,我的打算,也是想直接使用其實現功能即可,後發現,ES使用的版本過高,但又不想照網上那樣說的恢復到以前的1.x版本,於是,自己想辦法把flume內的flume-ng-elasticsearch-sink的jar包修改後重新打了一份,成功執行起來。

廢話少說,進入正題。

JDK版本:1.8.0_111

Flume版本:1.6.0;很多人可能疑惑如何找到flume的版本,可以參照lib目錄下那些jar包,很容易看到自己的flume版本的。

ElasticSearch:5.6.2。

下面是詳細步驟:

1:flume-ng-elasticsearch-sink-1.6.0.jar

這個包,就在flume下的lib內,負責的就是ElasticSearchSink的功能,從官網上找到對應的flume版本,將原始碼全部下載下來。

我是從github上下載的,參照我的版本,找到了flume-1.6對應的branch,全部程式碼拷貝下來之後,作為maven工程匯入到開發工具,我用的是Eclipse。


匯入之後,目錄結構應該大致如此,修改build-path為1.8即可。

匯入完畢之後,不會主動報錯,但是我們必須按照自己的配置來,讓它報錯,從報錯的地方一點點去改。

首先,修改pom.xml檔案。

<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
	license agreements. See the NOTICE file distributed with this work for additional 
	information regarding copyright ownership. The ASF licenses this file to 
	You under the Apache License, Version 2.0 (the "License"); you may not use 
	this file except in compliance with the License. You may obtain a copy of 
	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
	by applicable law or agreed to in writing, software distributed under the 
	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
	OF ANY KIND, either express or implied. See the License for the specific 
	language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<artifactId>flume-ng-sinks</artifactId>
		<groupId>org.apache.flume</groupId>
		<version>1.6.0</version>
	</parent>
	<groupId>org.apache.flume.flume-ng-sinks</groupId>
	<artifactId>flume-ng-elasticsearch-sink</artifactId>
	<name>Flume NG ElasticSearch Sink</name>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.rat</groupId>
				<artifactId>apache-rat-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-sdk</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
		</dependency>
		<!-- 這裡,我把原先的option改為了我需要的5.6.2版本 -->
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>5.6.2</version><!--$NO-MVN-MAN-VER$ -->
		</dependency>
		<!-- 這個包是必須要新增的,不然也會報錯 -->
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<version>5.6.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
		</dependency>
		<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</dependency>
		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-all</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- 這個包是為了完成自己的用途加的,用不到可以不加 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.44</version>
		</dependency>
	</dependencies>
</project>

裡面我改動過的地方,都有註釋;因為我需要連線的ES是5.6.2版本的,所以把對應的牽涉到ES的版本,全部更換為了5.6.2。

2:修改之後,大片大片的開始報錯了,這裡,test報的錯不用管它,打包時候skip tests即可。

下面的步驟順序不是固定的,但是所有步驟進行下來,最終應該是全正確的。

1:ContentBuilderUtil類會報錯:不太記得改動的地方了,直接附上原始碼

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * Utility methods for using ElasticSearch {@link XContentBuilder}
 */
public class ContentBuilderUtil {

	private static final Charset charset = Charset.defaultCharset();

	private ContentBuilderUtil() {
	}

	public static void appendField(XContentBuilder builder, String field,
			byte[] data) throws IOException {
		XContentType contentType = XContentFactory.xContentType(data);
		if (contentType == null) {
			addSimpleField(builder, field, data);
		} else {
			addComplexField(builder, field, contentType, data);
		}
	}

	public static void addSimpleField(XContentBuilder builder,
			String fieldName, byte[] data) throws IOException {
		builder.field(fieldName, new String(data, charset));
	}

	public static void addComplexField(XContentBuilder builder,
			String fieldName, XContentType contentType, byte[] data)
			throws IOException {
		XContentParser parser = XContentFactory.xContent(contentType)
				.createParser(NamedXContentRegistry.EMPTY, data);
		parser.nextToken();
		// Add the field name, but not the value.
		builder.field(fieldName);
		try {
			// This will add the whole parsed content as the value of the field.
			builder.copyCurrentStructure(parser);
		} catch (Exception ex) {
			// If we get an exception here the most likely cause is nested JSON
			// that
			// can't be figured out in the body. At this point just push it
			// through
			// as is, we have already added the field so don't do it again
			builder.endObject();
			builder.field(fieldName, new String(data, charset));
		} finally {
			if (parser != null) {
				parser.close();
			}
		}
	}
}

2:ElasticSearchEventSerializer會報錯

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import java.io.IOException;

/**
 * Interface for an event serializer which serializes the headers and body of an
 * event to write them to ElasticSearch. This is configurable, so any config
 * params required should be taken through this.
 */
public interface ElasticSearchEventSerializer extends Configurable,
		ConfigurableComponent {

	public static final Charset charset = Charset.defaultCharset();

	/**
	 * Return an {@link BytesStream} made up of the serialized flume event
	 * 
	 * @param event
	 *            The flume event to serialize
	 * @return A {@link BytesStream} used to write to ElasticSearch
	 * @throws IOException
	 *             If an error occurs during serialization
	 */
	abstract XContentBuilder getContentBuilder(Event event) throws IOException;
}

3:ElasticSearchLogStashEventSerializer

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * Serialize flume events into the same format LogStash uses</p>
 *
 * This can be used to send events to ElasticSearch and use clients such as
 * Kabana which expect Logstash formated indexes
 *
 * <pre>
 * {
 *    "@timestamp": "2010-12-21T21:48:33.309258Z",
 *    "@tags": [ "array", "of", "tags" ],
 *    "@type": "string",
 *    "@source": "source of the event, usually a URL."
 *    "@source_host": ""
 *    "@source_path": ""
 *    "@fields":{
 *       # a set of fields for this event
 *       "user": "jordan",
 *       "command": "shutdown -r":
 *     }
 *     "@message": "the original plain-text message"
 *   }
 * </pre>
 *
 * If the following headers are present, they will map to the above logstash
 * output as long as the logstash fields are not already present.</p>
 *
 * <pre>
 *  timestamp: long -> @timestamp:Date
 *  host: String -> @source_host: String
 *  src_path: String -> @source_path: String
 *  type: String -> @type: String
 *  source: String -> @source: String
 * </pre>
 *
 * @see https
 *      ://github.com/logstash/logstash/wiki/logstash%27s-internal-message-
 *      format
 */
public class ElasticSearchLogStashEventSerializer implements
		ElasticSearchEventSerializer {

	@Override
	public XContentBuilder getContentBuilder(Event event) throws IOException {
		XContentBuilder builder = jsonBuilder().startObject();
		appendBody(builder, event);
		appendHeaders(builder, event);
		return builder;
	}

	private void appendBody(XContentBuilder builder, Event event)
			throws IOException, UnsupportedEncodingException {
		byte[] body = event.getBody();
		ContentBuilderUtil.appendField(builder, "@message", body);
	}

	private void appendHeaders(XContentBuilder builder, Event event)
			throws IOException {
		Map<String, String> headers = new HashMap<String, String>(
				event.getHeaders());

		String timestamp = headers.get("timestamp");
		if (!StringUtils.isBlank(timestamp)
				&& StringUtils.isBlank(headers.get("@timestamp"))) {
			long timestampMs = Long.parseLong(timestamp);
			builder.field("@timestamp", new Date(timestampMs));
		}

		String source = headers.get("source");
		if (!StringUtils.isBlank(source)
				&& StringUtils.isBlank(headers.get("@source"))) {
			ContentBuilderUtil.appendField(builder, "@source",
					source.getBytes(charset));
		}

		String type = headers.get("type");
		if (!StringUtils.isBlank(type)
				&& StringUtils.isBlank(headers.get("@type"))) {
			ContentBuilderUtil.appendField(builder, "@type",
					type.getBytes(charset));
		}

		String host = headers.get("host");
		if (!StringUtils.isBlank(host)
				&& StringUtils.isBlank(headers.get("@source_host"))) {
			ContentBuilderUtil.appendField(builder, "@source_host",
					host.getBytes(charset));
		}

		String srcPath = headers.get("src_path");
		if (!StringUtils.isBlank(srcPath)
				&& StringUtils.isBlank(headers.get("@source_path"))) {
			ContentBuilderUtil.appendField(builder, "@source_path",
					srcPath.getBytes(charset));
		}

		builder.startObject("@fields");
		for (String key : headers.keySet()) {
			byte[] val = headers.get(key).getBytes(charset);
			ContentBuilderUtil.appendField(builder, key, val);
		}
		builder.endObject();
	}

	@Override
	public void configure(Context context) {
		// NO-OP...
	}

	@Override
	public void configure(ComponentConfiguration conf) {
		// NO-OP...
	}
}

4:ElasticSearchTransportClient

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch.client;

import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public class ElasticSearchTransportClient implements ElasticSearchClient {

	public static final Logger logger = LoggerFactory
			.getLogger(ElasticSearchTransportClient.class);

	private InetSocketTransportAddress[] serverAddresses;
	private ElasticSearchEventSerializer serializer;
	private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
	private BulkRequestBuilder bulkRequestBuilder;

	private Client client;

	@VisibleForTesting
	InetSocketTransportAddress[] getServerAddresses() {
		return serverAddresses;
	}

	@VisibleForTesting
	void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
		this.bulkRequestBuilder = bulkRequestBuilder;
	}

	/**
	 * Transport client for external cluster
	 * 
	 * @param hostNames
	 * @param clusterName
	 * @param serializer
	 */
	public ElasticSearchTransportClient(String[] hostNames, String clusterName,
			ElasticSearchEventSerializer serializer) {
		configureHostnames(hostNames);
		this.serializer = serializer;
		openClient(clusterName);
	}

	public ElasticSearchTransportClient(String[] hostNames, String clusterName,
			ElasticSearchIndexRequestBuilderFactory indexBuilder) {
		configureHostnames(hostNames);
		this.indexRequestBuilderFactory = indexBuilder;
		openClient(clusterName);
	}

	/**
	 * Local transport client only for testing
	 * 
	 * @param indexBuilderFactory
	 */
	// public ElasticSearchTransportClient(
	// ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
	// this.indexRequestBuilderFactory = indexBuilderFactory;
	// openLocalDiscoveryClient();
	// }

	/**
	 * Local transport client only for testing
	 *
	 * @param serializer
	 */
	// public ElasticSearchTransportClient(ElasticSearchEventSerializer
	// serializer) {
	// this.serializer = serializer;
	// openLocalDiscoveryClient();
	// }

	/**
	 * Used for testing
	 *
	 * @param client
	 *            ElasticSearch Client
	 * @param serializer
	 *            Event Serializer
	 */
	public ElasticSearchTransportClient(Client client,
			ElasticSearchEventSerializer serializer) {
		this.client = client;
		this.serializer = serializer;
	}

	/**
	 * Used for testing
	 *
	 * @param client
	 *            ElasticSearch Client
	 * @param serializer
	 *            Event Serializer
	 */
	public ElasticSearchTransportClient(Client client,
			ElasticSearchIndexRequestBuilderFactory requestBuilderFactory)
			throws IOException {
		this.client = client;
		requestBuilderFactory.createIndexRequest(client, null, null, null);
	}

	private void configureHostnames(String[] hostNames) {
		logger.warn(Arrays.toString(hostNames));
		serverAddresses = new InetSocketTransportAddress[hostNames.length];
		for (int i = 0; i < hostNames.length; i++) {
			String[] hostPort = hostNames[i].trim().split(":");
			String host = hostPort[0].trim();
			int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1]
					.trim()) : DEFAULT_PORT;
			// 此處加以修改了
			try {
				serverAddresses[i] = new InetSocketTransportAddress(
						InetAddress.getByName(host), port);
			} catch (UnknownHostException e) {
				e.printStackTrace();
			}
		}
	}

	@Override
	public void close() {
		if (client != null) {
			client.close();
		}
		client = null;
	}

	/**
	 * 
	 * @description:將輸出的異常轉換為字串
	 * @author:yuzhao.yang
	 * @param:
	 * @return:
	 * @time:2017年6月7日 上午10:27:00
	 */
	public String transfer(Exception e) throws Exception {
		// e.printStackTrace();
		ByteArrayOutputStream buf = new ByteArrayOutputStream();
		e.printStackTrace(new PrintWriter(buf, true));
		String expMessage = buf.toString();
		buf.close();
		if (null != expMessage) {
			return expMessage;
		} else {
			return null;
		}
	}

	@Override
	public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
			String indexType, long ttlMs) throws Exception {
		if (bulkRequestBuilder == null) {
			bulkRequestBuilder = client.prepareBulk();
		}

		IndexRequestBuilder indexRequestBuilder = null;
		if (indexRequestBuilderFactory == null) {
			Map<String, ?> map = null;
			try {
				String body = new String(event.getBody());
				logger.error("資料結果:" + body);
				map = (Map<String, ?>) JSON.parse(body);
			} catch (Exception e) {
				logger.error("getContentBuilder異常:" + transfer(e));
			}

			indexRequestBuilder = client.prepareIndex(
					indexNameBuilder.getIndexName(event), indexType).setSource(
					map);
		} else {
			indexRequestBuilder = indexRequestBuilderFactory
					.createIndexRequest(client,
							indexNameBuilder.getIndexPrefix(event), indexType,
							event);
		}

		if (ttlMs > 0) {
			indexRequestBuilder.setTTL(ttlMs);
		}
		bulkRequestBuilder.add(indexRequestBuilder);
	}

	@Override
	public void execute() throws Exception {
		try {
			BulkResponse bulkResponse = bulkRequestBuilder.execute()
					.actionGet();
			if (bulkResponse.hasFailures()) {
				throw new EventDeliveryException(
						bulkResponse.buildFailureMessage());
			}
		} finally {
			bulkRequestBuilder = client.prepareBulk();
		}
	}

	/**
	 * Open client to elaticsearch cluster
	 * 
	 * @param clusterName
	 */
	private void openClient(String clusterName) {
		Settings settings = Settings.builder().put("cluster.name", clusterName)
				.build();

		// TransportClient transportClient = new TransportClient(settings);
		// for (InetSocketTransportAddress host : serverAddresses) {
		// transportClient.addTransportAddress(host);
		// }

		TransportClient transportClient = null;
		for (InetSocketTransportAddress host : serverAddresses) {
			if (null == transportClient) {
				transportClient = new PreBuiltTransportClient(settings)
						.addTransportAddress(host);
			} else {
				transportClient = transportClient.addTransportAddress(host);
			}
		}
		if (client != null) {
			client.close();
		}
		client = transportClient;
	}

	/*
	 * FOR TESTING ONLY...
	 * 
	 * Opens a local discovery node for talking to an elasticsearch server
	 * running in the same JVM
	 */
	// private void openLocalDiscoveryClient() {
	// logger.info("Using ElasticSearch AutoDiscovery mode");
	// Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
	// if (client != null) {
	// client.close();
	// }
	// client = node.client();
	// }

	@Override
	public void configure(Context context) {
		// To change body of implemented methods use File | Settings | File
		// Templates.
	}
}

這個類,我主要是在addEvent內部做出了一點修改,因為直接使用XContentBuilder總是報錯,於是使用了map格式來進行資料轉換。

其他基本沒什麼需要修改得了。

可能有些修改不太記得了,這裡,我把修改後的程式碼放在了github上:修改後的程式碼地址

這裡需要注意:我的資料是map格式的,所以針對transportclient做出瞭如上圖的修改,其他資料格式不同的,可以思考一下自己的實現方式。

程式碼修改完成之後,直接打包,雖然pom.xml沒有打包外掛,但是按照預設的打包邏輯進行就可以了,會自動下載maven外掛進行打包的。

打包後體積很小,因為很多jar包並沒有打入進去,所以,接下來我們還要針對flume下的jar包進行修改和替換,這裡,直接附上我的flume下的所有jar包記錄,方便大家使用:

apache-log4j-extras-1.1.jar            flume-ng-kafka-sink-1.6.0.jar             kite-data-core-1.0.0.jar             parquet-avro-1.4.1.jar
async-1.4.0.jar                        flume-ng-log4jappender-1.6.0.jar          kite-data-hbase-1.0.0.jar            parquet-column-1.4.1.jar
asynchbase-1.5.0.jar                   flume-ng-morphline-solr-sink-1.6.0.jar    kite-data-hive-1.0.0.jar             parquet-common-1.4.1.jar
avro-1.7.4.jar                         flume-ng-node-1.6.0.jar                   kite-hadoop-compatibility-1.0.0.jar  parquet-encoding-1.4.1.jar
avro-ipc-1.7.4.jar                     flume-ng-sdk-1.6.0.jar                    lang-mustache-client-5.6.2.jar       parquet-format-2.0.0.jar
commons-cli-1.2.jar                    flume-scribe-source-1.6.0.jar             libthrift-0.9.0.jar                  parquet-generator-1.4.1.jar
commons-codec-1.8.jar                  flume-spillable-memory-channel-1.6.0.jar  log4j-1.2.17.jar                     parquet-hadoop-1.4.1.jar
commons-collections-3.2.1.jar          flume-thrift-source-1.6.0.jar             log4j-api-2.9.1.jar                  parquet-hive-bundle-1.4.1.jar
commons-compress-1.4.1.jar             flume-tools-1.6.0.jar                     lucene-analyzers-common-6.6.1.jar    parquet-jackson-1.4.1.jar
commons-dbcp-1.4.jar                   flume-twitter-source-1.6.0.jar            lucene-backward-codecs-6.6.1.jar     percolator-client-5.6.2.jar
commons-io-2.1.jar                     gson-2.2.2.jar                            lucene-core-6.6.1.jar                plugin-cli-5.6.2.jar
commons-jexl-2.1.1.jar                 guava-11.0.2.jar                          lucene-grouping-6.6.1.jar            protobuf-java-2.5.0.jar
commons-lang-2.5.jar                   HdrHistogram-2.1.9.jar                    lucene-highlighter-6.6.1.jar         reindex-client-5.6.2.jar
commons-logging-1.1.1.jar              hppc-0.7.1.jar                            lucene-join-6.6.1.jar                scala-library-2.10.1.jar
commons-pool-1.5.4.jar                 httpclient-4.2.1.jar                      lucene-memory-6.6.1.jar              securesm-1.1.jar
curator-client-2.6.0.jar               httpcore-4.1.3.jar                        lucene-misc-6.6.1.jar                serializer-2.7.2.jar
curator-framework-2.6.0.jar            irclib-1.10.jar                           lucene-queries-6.6.1.jar             servlet-api-2.5-20110124.jar
curator-recipes-2.6.0.jar              jackson-annotations-2.3.0.jar             lucene-queryparser-6.6.1.jar         slf4j-api-1.6.1.jar
derby-10.8.2.2.jar                     jackson-core-2.8.6.jar                    lucene-sandbox-6.6.1.jar             slf4j-log4j12-1.6.1.jar
elasticsearch-5.6.2.jar                jackson-core-asl-1.9.3.jar                lucene-spatial3d-6.6.1.jar           snakeyaml-1.15.jar
flume-avro-source-1.6.0.jar            jackson-databind-2.3.1.jar                lucene-spatial-6.6.1.jar             snappy-java-1.1.0.jar
flume-dataset-sink-1.6.0.jar           jackson-dataformat-cbor-2.8.6.jar         lucene-spatial-extras-6.6.1.jar      spatial4j-0.6.jar
flume-file-channel-1.6.0.jar           jackson-dataformat-smile-2.8.6.jar        lucene-suggest-6.6.1.jar             t-digest-3.0.jar
flume-hdfs-sink-1.6.0.jar              jackson-dataformat-yaml-2.8.6.jar         mapdb-0.9.9.jar                      transport-5.6.2.jar
flume-hive-sink-1.6.0.jar              jackson-mapper-asl-1.9.3.jar              metrics-core-2.2.0.jar               transport-netty3-client-5.6.2.jar
flume-irc-sink-1.6.0.jar               java-version-checker-5.6.2.jar            mina-core-2.0.4.jar                  transport-netty4-client-5.6.2.jar
flume-jdbc-channel-1.6.0.jar           jetty-6.1.26.jar                          netty-3.5.12.Final.jar               twitter4j-core-3.0.3.jar
flume-jms-source-1.6.0.jar             jetty-util-6.1.26.jar                     netty-buffer-4.1.13.Final.jar        twitter4j-media-support-3.0.3.jar
flume-kafka-channel-1.6.0.jar          jna-4.4.0-1.jar                           netty-codec-4.1.13.Final.jar         twitter4j-stream-3.0.3.jar
flume-kafka-source-1.6.0.jar           joda-time-2.1.jar                         netty-common-4.1.13.Final.jar        velocity-1.7.jar
flume-ng-auth-1.6.0.jar                joda-time-2.9.5.jar                       netty-handler-4.1.13.Final.jar       xalan-2.7.2.jar
flume-ng-configuration-1.6.0.jar       jopt-simple-3.2.jar                       netty-resolver-4.1.13.Final.jar      xercesImpl-2.9.1.jar
flume-ng-core-1.6.0.jar                jopt-simple-5.0.2.jar                     netty-transport-4.1.13.Final.jar     xml-apis-1.3.04.jar
flume-ng-elasticsearch-sink-1.6.0.jar  jsr305-1.3.9.jar                          opencsv-2.3.jar                      xz-1.0.jar
flume-ng-embedded-agent-1.6.0.jar      jts-1.13.jar                              paranamer-2.3.jar                    zkclient-0.3.jar
flume-ng-hbase-sink-1.6.0.jar          kafka_2.10-0.8.1.1.jar                    parent-join-client-5.6.2.jar

我這裡首先把ElasticSearch下lib目錄內的所有jar包都放了過來,然後對於自己的專案,把那些不重複的jar包全部拋了過去,最終正常執行起來。

檢視日誌,發現數據的確成功輸送到了ES中,完美解決。

3:Kibana簡單使用

我這裡,使用的是:kibana-5.6.2-linux-x86_64,大家可以直接在ES官網中找到下載;解壓,修改配置檔案,這裡把配置檔案附上:

# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
# the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests
# to Kibana. This setting cannot end in a slash.
#server.basePath: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576

# The Kibana server's name.  This is used for display purposes.
#server.name: "your-hostname"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.100.34:9200"

# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
# that connects to this Kibana instance.
#elasticsearch.preserveHost: true

# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
kibana.index: ".kibana"

# The default application to load.
#kibana.defaultAppId: "discover"

# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "user"
#elasticsearch.password: "pass"

# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key

# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files validate that your Elasticsearch backend uses the same key files.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key

# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]

# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full

# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500

# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
#elasticsearch.requestTimeout: 30000

# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]

# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}

# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 0

# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.
#elasticsearch.startupTimeout: 5000

# Specifies the path where Kibana creates the process ID file.
#pid.file: /var/run/kibana.pid

# Enables you specify a file where Kibana stores log output.
#logging.dest: stdout

# Set the value of this setting to true to suppress all logging output.
#logging.silent: false

# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false

# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false

# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000

# The default locale. This locale can be used in certain circumstances to substitute any missing
# translations.
#i18n.defaultLocale: "en"

主要是server.host和elasticsearch.url稍微修改了下,很容易看懂。

啟動,成功。

瀏覽器通過:hostIp:5601。

恭喜你,看到了Kibana的啟動介面,至於kibana的使用,那就是另一篇文章的事情了。