[大數據]-Logstash-5.3.1的安裝導入數據到Elasticsearch5.3.1並配置同義詞過濾
阿新 • • 發佈:2017-05-23
cat 3.1 send text 開啟 gui 插件 work message
閱讀此文請先閱讀上文:[大數據]-Elasticsearch5.3.1 IK分詞,同義詞/聯想搜索設置,前面介紹了ES,Kibana5.3.1的安裝配置,以及IK分詞的安裝和同義詞設置,這裏主要記錄Logstash導入mysql數據到Elasticsearch5.3.1並設置IK分詞和同義詞。由於logstash配置好JDBC,ES連接之後運行腳本一站式創建index,mapping,導入數據。但是如果我們要配置IK分詞器就需要修改創建index,mapping的配置,下面詳細介紹。
一、Logstash-5.3.1下載安裝:
- 下載:https://www.elastic.co/cn/downloads/logstash
- 解壓:tar -zxf logstash-5.3.1.tar.gz
- 啟動:bin/logstash -e ‘input { stdin { } } output { stdout {} }‘ (參數表示終端輸入輸出)如下則成功。
-
Sending Logstash‘s logs to /home/rzxes/logstash-5.3.1/logs which is now configured via log4j2.properties [2017-05-16T10:27:36,957][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/rzxes/logstash-5.3.1/data/queue"} [2017-05-16T10:27:37,041][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"c987803c-9b18-4395-bbee-a83a90e6ea60", :path=>"/home/rzxes/logstash-5.3.1/data/uuid"} [2017-05-16T10:27:37,581][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125} [2017-05-16T10:27:37,682][INFO ][logstash.pipeline ] Pipeline main started The stdin plugin is now waiting for input: [2017-05-16T10:27:37,886][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
二、Logstash-5.3.1連接mysql作為數據源,ES作為數據輸出端:
- 由於此版本的logstash已經集成了jdbc插件,我們只需要添加一個配置文件xxx.conf。內容如下test.conf:
-
input { stdin { } jdbc { # 數據庫地址 端口 數據庫名 jdbc_connection_string => "jdbc:mysql://IP:3306/dbname" # 數據庫用戶名 jdbc_user => "user" # 數據庫密碼 jdbc_password => "pass" # mysql java驅動地址 jdbc_driver_library => "/home/rzxes/logstash-5.3.1/mysql-connector-java-5.1.17.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "100000" # sql 語句文件,也可以直接寫SQL,如statement => "select * from table1" statement_filepath => "/home/rzxes/logstash-5.3.1/test.sql" schedule => "* * * * *" type => "jdbc" } } output { stdout { codec => json_lines } elasticsearch { hosts => "192.168.230.150:9200" index => "test-1" #索引名稱 document_type => "form" #type名稱 document_id => "%{id}"
} }
- 創建一個SQL文件:如上配置test.sql內容: select * from table1
- test.conf,test.sql文件都在logstash的根目錄下。
- 運行logstash腳本導入數據: bin/logstash -f test.conf 啟動如下;
- 等待數據導入完成。開啟Es-head,訪問9100端口如下:
-
可以看到已經導入了11597條數據。
- 更多詳細的配置參考官方文檔:plugins-inputs-jdbc-jdbc_driver_library
三、logstash是如何創建index,mapping,並導入數據?
ES導入數據必須先創建index,mapping,但是在logstash中並沒有直接創建,我們只傳入了index,type等參數,logstash是通過es的mapping template來創建的,這個模板文件不需要指定字段,就可以根據輸入自動生成。在logstash啟動的時候這個模板已經輸出了如下log:
[2017-05-23T15:58:45,801][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x68f0d43b URL:http://192.168.230.150:9200/>} [2017-05-23T15:58:45,805][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2017-05-23T15:58:45,979][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
- 添加IK分詞,只需要創建一個json文件: vim /home/rzxes/logstash-5.3.1/template/logstash.json 添加如下內容:
-
{ "template": "*", "version": 50001, "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { "_all": { "enabled": true, "norms": false }, "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "analyzer": "ik_max_word",#只需要添加這一行即可設置分詞器為ik_max_word "fields": { "keyword": { "type": "keyword" } } } } } ], "properties": { "@timestamp": { "type": "date", "include_in_all": false }, "@version": { "type": "keyword", "include_in_all": false } } } } }
-
如需配置同義詞,需自定義分詞器,配置同義詞過濾<IK分詞同義詞詳見上一篇文章>。修改模板logstash.json如下:
-
{ "template" : "*", "version" : 50001, "settings" : { "index.refresh_interval" : "5s", #分詞,同義詞配置:自定義分詞器,過濾器,如不配同義詞則沒有index這一部分 "index": { "analysis": { "analyzer": { "by_smart": { "type": "custom", "tokenizer": "ik_smart", "filter": ["by_tfr","by_sfr"], "char_filter": ["by_cfr"] }, "by_max_word": { "type": "custom", "tokenizer": "ik_max_word", "filter": ["by_tfr","by_sfr"], "char_filter": ["by_cfr"] } }, "filter": { "by_tfr": { "type": "stop", "stopwords": [" "] }, "by_sfr": { "type": "synonym", "synonyms_path": "analysis/synonyms.txt" #同義詞路徑 } }, "char_filter": { "by_cfr": { "type": "mapping", "mappings": ["| => |"] } } } } # index --end-- }, "mappings" : { "_default_" : { "_all" : { "enabled" : true, "norms" : false }, "dynamic_templates" : [ { "message_field" : { "path_match" : "message", "match_mapping_type" : "string", "mapping" : { "type" : "text", "norms" : false }} }, { "string_fields" : { "match" : "*", "match_mapping_type" : "string", "mapping" : { "type" : "text", "norms" : false, #選擇分詞器:自定義分詞器,或者ik_mmax_word "analyzer" : "by_max_word", "fields" : { "keyword" : { "type" : "keyword" } } } } } ], "properties" : { "@timestamp" : { "type" : "date", "include_in_all" : false }, "@version" : { "type" : "keyword", "include_in_all" : false } } } } }
- 有了自定義模板文件,test.conf中配置模板覆蓋使模板生效。test.conf最終配置如下:
-
input { stdin { } jdbc { # 數據庫地址 端口 數據庫名 jdbc_connection_string => "jdbc:mysql://IP:3306/dbname" # 數據庫用戶名 jdbc_user => "user" # 數據庫密碼 jdbc_password => "pass" # mysql java驅動地址 jdbc_driver_library => "/home/rzxes/logstash-5.3.1/mysql-connector-java-5.1.17.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "100000" # sql 語句文件 statement_filepath => "/home/rzxes/logstash-5.3.1/mytest.sql" schedule => "* * * * *" type => "jdbc" } } output { stdout { codec => json_lines } elasticsearch { hosts => "192.168.230.150:9200" index => "test-1" document_type => "form" document_id => "%{id}" #id必須是待查詢的數據表的序列字段 template_overwrite => true template => "/home/rzxes/logstash-5.3.1/template/logstash.json" } }
- 刪除上次創建的index(由於數據導入時會根據原有數據的index,mapping進行索引創建),重新啟動logstash。
- 最終在Kibana中檢索關鍵詞 番茄,就會發現西紅柿也會被檢索到。如下圖:
- 致此logstash數據導入的template重寫就完成了。
- 另一種方式配置IK分詞:全局配置,不需要自定義模板。
-
curl -XPUT "http://192.168.230.150:9200/_template/rtf" -H ‘Content-Type: application/json‘ -d‘ { "template" : "*", "version" : 50001, "settings" : { "index.refresh_interval" : "5s", "index": { "analysis": { "analyzer": { "by_smart": { "type": "custom", "tokenizer": "ik_smart", "filter": ["by_tfr","by_sfr"], "char_filter": ["by_cfr"] }, "by_max_word": { "type": "custom", "tokenizer": "ik_max_word", "filter": ["by_tfr","by_sfr"], "char_filter": ["by_cfr"] } }, "filter": { "by_tfr": { "type": "stop", "stopwords": [" "] }, "by_sfr": { "type": "synonym", "synonyms_path": "analysis/synonyms.txt" } }, "char_filter": { "by_cfr": { "type": "mapping", "mappings": ["| => |"] } } } } }, "mappings" : { "_default_" : { "_all" : { "enabled" : true, "norms" : false }, "dynamic_templates" : [ { "message_field" : { "path_match" : "message", "match_mapping_type" : "string", "mapping" : { "type" : "text", "norms" : false }} }, { "string_fields" : { "match" : "*", "match_mapping_type" : "string", "mapping" : { "type" : "text", "norms" : false, "analyzer" : "by_max_word", "fields" : { "keyword" : { "type" : "keyword" } } } } } ], "properties" : { "@timestamp" : { "type" : "date", "include_in_all" : false }, "@version" : { "type" : "keyword", "include_in_all" : false } } } } }‘
- 可以使用curl查看模板: curl -XGET "http://192.168.230.150:9200/_template"
[大數據]-Logstash-5.3.1的安裝導入數據到Elasticsearch5.3.1並配置同義詞過濾