1. 程式人生 > >Logstash使用Jdbc input plugin定時讀取資料庫新記錄

Logstash使用Jdbc input plugin定時讀取資料庫新記錄

MySQL表結構:

mysql> desc employee;
+-----------+-------------+------+-----+---------+----------------+
| Field     | Type        | Null | Key | Default | Extra          |
+-----------+-------------+------+-----+---------+----------------+
| id        | int(11)     | NO   | PRI | NULL    | auto_increment |
| last_
name | varchar(50) | NO | | NULL | | | email | varchar(50) | NO | | NULL | | | gender | varchar(20) | NO | | NULL | | +-----------+-------------+------+-----+---------+----------------+ 4 rows in set (0.00 sec)

MySQL預備資料:

mysql> select * from employee;
+----+-----------+------------+--------+
| id | last_name | email | gender | +----+-----------+------------+--------+ | 1 | A1 | [email protected] | Male | | 2 | A2 | [email protected] | Male | | 3 | A3 | [email protected] | Female | +----+-----------+------------+--------+ 3 rows in set (0.00 sec)

Logstash配置:

input {
  jdbc {
    jdbc_driver_library => "drivers/mysql-connector-java-5.1.45.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mybatis"
    jdbc_user => "root"
    jdbc_password => "root123.."
    parameters => { "gender" => "Male" }
    schedule => "* * * * *"
    statement => "SELECT id,last_name,email,gender from employee where id > :sql_last_value and gender = :gender"
    use_column_value => true
    tracking_column => "id"
  }
}

output {
    stdout { codec => rubydebug }
}

啟動Logstash:

[[email protected] logstash-5.6.3]$ ./bin/logstash -f file/jdbc_input.conf
Sending Logstash's logs to /usr/local/logstash/logstash-5.6.3/logs which is now configured via log4j2.properties
[2018-04-25T15:37:19,037][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-5.6.3/modules/fb_apache/configuration"}
[2018-04-25T15:37:19,040][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-5.6.3/modules/netflow/configuration"}
[2018-04-25T15:37:19,862][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2018-04-25T15:37:19,961][INFO ][logstash.pipeline        ] Pipeline main started
[2018-04-25T15:37:20,048][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

一分鐘後執行情況:

[2018-04-25T15:38:00,779][INFO ][logstash.inputs.jdbc     ] (0.007000s) SELECT id,last_name,email,gender from employee where id > 0 and gender = 'Male'
{
    "@timestamp" => 2018-04-25T07:38:00.794Z,
        "gender" => "Male",
      "@version" => "1",
     "last_name" => "A1",
            "id" => 1,
         "email" => "[email protected]"
}
{
    "@timestamp" => 2018-04-25T07:38:00.863Z,
        "gender" => "Male",
      "@version" => "1",
     "last_name" => "A2",
            "id" => 2,
         "email" => "[email protected]"
}

MySQL插入一條資料:

mysql> insert into employee(last_name,email,gender) values('A4','[email protected]','Male');
Query OK, 1 row affected (0.02 sec)

mysql> select * from employee;
+----+-----------+------------+--------+
| id | last_name | email      | gender |
+----+-----------+------------+--------+
|  1 | A1        | [email protected] | Male   |
|  2 | A2        | [email protected] | Male   |
|  3 | A3        | [email protected] | Female |
|  4 | A4        | [email protected]  | Male   |
+----+-----------+------------+--------+
4 rows in set (0.00 sec)

Logstash最新執行結果:

[2018-04-25T15:41:00,103][INFO ][logstash.inputs.jdbc     ] (0.004000s) SELECT id,last_name,email,gender from employee where id > 2 and gender = 'Male'
{
    "@timestamp" => 2018-04-25T07:41:00.107Z,
        "gender" => "Male",
      "@version" => "1",
     "last_name" => "A4",
            "id" => 4,
         "email" => "[email protected]"
}

劃重點:

  • 重點在於 use_column_valuetracking_column 這兩個引數,當use_column_value為true時,可以用 :sql_last_value 這個變數來獲取tracking_column對應的欄位的最新值,預設即第一次啟動時為 0 。我的示例中tracking_column對應id,即Logstash都會記錄每次查詢結果id的最大值,供下一次查詢使用。
  • Logstash將tracking_column的最新值記錄到 last_run_metadata_path 引數下的 .logstash_jdbc_last_run 檔案,預設是/home/${user}/.logstash_jdbc_last_run,所以重啟後也不會從最初載入,還是從上次記錄的最新值開始查。當然,也可以設定 clean_run 引數為true,重啟後刪除上次的執行狀態,就可以從最初的資料開始讀取了。
  • 文件地址:https://www.elastic.co/guide/en/logstash/6.2/plugins-inputs-jdbc.html