1. 程式人生 > >elasticsearch -- Logstash實現mysql同步數據到elasticsearch

elasticsearch -- Logstash實現mysql同步數據到elasticsearch

code nload pre att www 我們 net lib 技術

配置

  1. 安裝插件
    由於這裏是從mysql同步數據到elasticsearch,所以需要安裝jdbc的入插件和elasticsearch的出插件:logstash-input-jdbc、logstash-output-elasticsearch
    安裝效果圖如下所示:
    技術分享圖片
    技術分享圖片
  2. 下載mysql連接庫
    由於logstash是ruby開發的,所以這裏要下載mysql的連接庫jar包,從官網下載,我這裏下載的是:mysql-connector-java-5.1.46.jar
    將下載好的mysql-connector-java-5.1.46.jar,放至/usr/local/logstash/config/目錄下。
  3. 修改配置文件
    在config目錄下,創建配置文件(logstash-mysql-es.conf):
    input {
      jdbc {
        # mysql相關jdbc配置
        jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
        jdbc_user => "root"
        jdbc_password => "123456"
    
        # jdbc連接mysql驅動的文件目錄,可去官網下載:https://dev.mysql.com/downloads/connector/j/
        jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"
        # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => true
        jdbc_page_size => "50000"
    
        jdbc_default_timezone =>"Asia/Shanghai"
    
        # mysql文件, 也可以直接寫SQL語句在此處,如下:
        # statement => "select * from t_order where update_time >= :sql_last_value"
        statement_filepath => "./config/jdbc.sql"
    
        # 這裏類似crontab,可以定制定時操作,比如每分鐘執行一次同步(分 時 天 月 年)
        schedule => "* * * * *"
        #type => "jdbc"
    
        # 是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
        #record_last_run => true
    
        # 是否需要記錄某個column 的值,如果record_last_run為真,可以自定義我們需要 track 的 column 名稱,此時該參數就要為 true. 否則默認 track 的是 timestamp 的值.
        use_column_value => true
    
        # 如果 use_column_value 為真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 一般是mysql主鍵
        tracking_column => "update_time"
        
        tracking_column_type => "timestamp"
    
        last_run_metadata_path => "./logstash_capital_bill_last_id"
    
        # 是否清除 last_run_metadata_path 的記錄,如果為真那麽每次都相當於從頭開始查詢所有的數據庫記錄
        clean_run => false
    
        #是否將 字段(column) 名稱轉小寫
        lowercase_column_names => false
      }
    }
    
    output {
      elasticsearch {
        hosts => "10.112.76.31:9200"
        index => "mysql_order"
        document_id => "%{id}"
        template_overwrite => true
      }
    
      # 這裏輸出調試,正式運行時可以註釋掉
      stdout {
          codec => json_lines
      } 
    }
    這裏有幾個註意點:
    (1)jdbc_driver_library
    mysql-connector-java-5.1.46.jar的存放目錄,這個一定要配置正確,支持全路徑和相對路徑。如果配置不對,將會報“can ”錯誤。
    (2)sql_last_value
    標誌目前logstash同步的位置信息(類似offset)。比如id、updatetime。logstash通過這個標誌,可以判斷目前同步到哪一條數據。
    (3)statement、statement_filepath
    statement:執行同步的sql語句,可以同步部分數據。
    statement_filepath:存儲執行同步的sql語句。不和statement同時使用。
    (4)schedule
    定時器,表示每隔多長時間同步一次數據。格式類似crontab。
    (5)tracking_column、tracking_column_type
    tracking_column:表示表中哪一列用於判斷logstash同步的位置信息。與sql_last_value比較判斷是否需要同步這條數據。
    tracking_column_type:racking_column指定列的類型。支持兩種類型:numeric(默認)、timestamp。註意:如果列是時間字段(比如updateTime),一定要指定這個類型為timestamp。我就踩了這個大坑。。。一直同步不成功!!!
    (6)last_run_metadata_path
    存儲sql_last_value值的文件名稱及位置。
    (7)document_id
    生成elasticsearch的文檔值,盡量使用同步的數據中已有的唯一標識。比如同步訂單數據,可以使用訂單號。

啟動

在根目錄下,執行命令:

nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out &

效果圖如下:

技術分享圖片

同步

技術分享圖片

完成了一條數據的同步

elasticsearch -- Logstash實現mysql同步數據到elasticsearch