1. 程式人生 > >ElasticSearch Logstash資料採集工具---從mysql自動採集

ElasticSearch Logstash資料採集工具---從mysql自動採集

Logstash是ES下的一款開源軟體,它能夠同時從多個來源採集資料、轉換資料,然後將資料傳送到Eleasticsearch中建立索引。

一、下載Logstash

注意下載的logstash版本最好要和你使用的ElasticSearch版本一致。

下載網址:https://www.elastic.co/downloads/logstash

解壓之後:

二、安裝logstash-input-jdbc

logstash-input-jdbc是ruby開發的,先下載ruby並安裝

下載地址:https://rubyinstaller.org/downloads/

下載2.5版本即可。

安裝之後在命令提示符中鍵入 ruby -v 檢視ruby版本

Logstash5.x以上版本本身自帶有logstash-input-jdbc,6.x版本本身不帶logstash-input-jdbc外掛,需要手動安裝

安裝成功後我們可以在logstash根目錄下的以下目錄檢視對應的外掛版本

三、建立模板檔案

我們使用Logstash從MySQL中讀取資料,向ES中建立索引,這裡需要提前建立mapping的模板檔案以便logstash使用。

在logstach的config目錄建立XXX.json,內容和在建立ElasticSearch對映欄位時一樣。

例如,

{
	"mappings" : {
		"doc" : {
			"properties" : {
				"charge" : {
					"type" : "keyword"
				},
				"description" : {
					"analyzer" : "ik_max_word",
					"search_analyzer" : "ik_smart",
					"type" : "text"
				},
				"end_time" : {
					"format" : "yyyy‐MM‐dd HH:mm:ss",
					"type" : "date"
				},
				"expires" : {
					"format" : "yyyy‐MM‐dd HH:mm:ss",
					"type" : "date"
				},
				"grade" : {
					"type" : "keyword"
				},
				"id" : {
					"type" : "keyword"
				},
				"mt" : {
					"type" : "keyword"
				},
				"name" : {
					"analyzer" : "ik_max_word",
					"search_analyzer" : "ik_smart",
					"type" : "text"
				},
				"pic" : {
					"index" : false,
					"type" : "keyword"
				},
				"price" : {
					"type" : "float"
				},
				"price_old" : {
					"type" : "float"
				},
				"pub_time" : {
					"format" : "yyyy‐MM‐dd HH:mm:ss",
					"type" : "date"
				},
				"qq" : {
					"index" : false,
					"type" : "keyword"
				},
				"st" : {
					"type" : "keyword"
				},
				"start_time" : {
					"format" : "yyyy‐MM‐dd HH:mm:ss",
					"type" : "date"
				},
				"status" : {
					"type" : "keyword"
				},
				"studymodel" : {
					"type" : "keyword"
				},
				"teachmode" : {
					"type" : "keyword"
				},
				"teachplan" : {
					"analyzer" : "ik_max_word",
					"search_analyzer" : "ik_smart",
					"type" : "text"
				},
				"users" : {
					"index" : false,
					"type" : "text"
				},
				"valid" : {
					"type" : "keyword"
				}
			}
		}
	},
	"template" : "xc_course"
}

四、配置mysql.conf

在logstash的config目錄下配置mysql.conf檔案供logstash使用,logstash會根據mysql.conf檔案的配置的地址從MySQL中讀取資料向ES中寫入索引。

參考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

配置輸入資料來源和輸出資料來源。

input {
  stdin {
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
  # the user we wish to excute our statement as
  jdbc_user => "root"
  jdbc_password => root
  # the path to our downloaded jdbc driver  
  jdbc_driver_library => "D:/software/ElasticSearch01/logstash-6.2.1/config/mysql-connector-java-5.1.32.jar"
  # the name of the driver class for mysql
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  
  #要執行的sql檔案
  #statement_filepath => "/conf/course.sql"
  statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
  #定時配置
  schedule => "* * * * *"
  record_last_run => true
  last_run_metadata_path => "D:/software/ElasticSearch01/logstash-6.2.1/config/logstash_metadata"
  }
}


output {
  elasticsearch {
  #ES的ip地址和埠
  hosts => "localhost:9200"
  #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
  #ES索引庫名稱
  index => "xc_course"
  document_id => "%{id}"
  document_type => "doc"
  template =>"D:/software/ElasticSearch01/logstash-6.2.1/config/xc_course_template.json"
  template_name =>"xc_course"
  template_overwrite =>"true"
  }
  stdout {
 #日誌輸出
  codec => json_lines
  }
}

說明:

1、ES採用UTC時區問題

ES採用UTC 時區,比北京時間早8小時,所以ES讀取資料時讓最後更新時間加8小時
where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)

2、logstash每個執行完成會在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata記錄執行時間下次以此時間為基準進行增量同步資料到索引庫。

六、測試

啟動logstash.bat:

.\logstash.bat ‐f ..\config\mysql.conf

通過在資料庫中更新對應欄位時間戳可以完成Elastic