1. 程式人生 > >elasticsearch解決同步刪除資料庫中不存在的資料

elasticsearch解決同步刪除資料庫中不存在的資料

jdbc-input-plugin 只能實現資料庫的追加,對於 elasticsearch 增量寫入,但經常jdbc源一端的資料庫可能會做資料庫刪除或者更新操作。這樣一來資料庫與搜尋引擎的資料庫就出現了不對稱的情況。

當然你如果有開發團隊可以寫程式在刪除或者更新的時候同步對搜尋引擎操作。如果你沒有這個能力,可以嘗試下面的方法。

這裡有一個數據表 article , mtime 欄位定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時間都會變化

  1. mysql> desc article;

  2. +-------------+--------------+------+-----+--------------------------------+-------+

  3. | Field | Type | Null | Key | Default | Extra |

  4. +-------------+--------------+------+-----+--------------------------------+-------+

  5. | id | int(11) | NO | | 0 | |

  6. | title | mediumtext | NO | | NULL | |

  7. | description | mediumtext | YES | | NULL | |

  8. | author | varchar(100) | YES | | NULL | |

  9. | source | varchar(100) | YES | | NULL | |

  10. | content | longtext | YES | | NULL | |

  11. | status | enum('Y','N')| NO | | 'N' | |

  12. | ctime | timestamp | NO | | CURRENT_TIMESTAMP | |

  13. | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |

  14. +-------------+--------------+------+-----+--------------------------------+-------+

  15. 7 rows in set (0.00 sec)

logstash 增加 mtime 的查詢規則

  1. jdbc {

  2. jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"

  3. jdbc_driver_class => "com.mysql.jdbc.Driver"

  4. jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"

  5. jdbc_user => "cms"

  6. jdbc_password => "password"

  7. schedule => "* * * * *" #定時cron的表示式,這裡是每分鐘執行一次

  8. statement => "select * from article where mtime > :sql_last_value"

  9. use_column_value => true

  10. tracking_column => "mtime"

  11. tracking_column_type => "timestamp"

  12. record_last_run => true

  13. last_run_metadata_path => "/var/tmp/article-mtime.last"

  14. }

建立回收站表,這個事用於解決資料庫刪除,或者禁用 status = 'N' 這種情況的。

  1. CREATE TABLE `elasticsearch_trash` (

  2. `id` int(11) NOT NULL,

  3. `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,

  4. PRIMARY KEY (`id`)

  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8

為 article 表建立觸發器

  1. CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW

  2. BEGIN

  3. -- 此處的邏輯是解決文章狀態變為 N 的時候,需要將搜尋引擎中對應的資料刪除。

  4. IF NEW.status = 'N' THEN

  5. insert into elasticsearch_trash(id) values(OLD.id);

  6. END IF;

  7. -- 此處邏輯是修改狀態到 Y 的時候,方式elasticsearch_trash仍然存在該文章ID,導致誤刪除。所以需要刪除回收站中得回收記錄。

  8. IF NEW.status = 'Y' THEN

  9. delete from elasticsearch_trash where id = OLD.id;

  10. END IF;

  11. END

  12. CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW

  13. BEGIN

  14. -- 此處邏輯是文章被刪除同事將改文章放入搜尋引擎回收站。

  15. insert into elasticsearch_trash(id) values(OLD.id);

  16. END

接下來我們需要寫一個簡單地 Shell 每分鐘執行一次,從 elasticsearch_trash 資料表中取出資料,然後使用 curl 命令呼叫 elasticsearch restful 介面,刪除被收回的資料。

你還可以開發相關的程式,這裡提供一個 Spring boot 定時任務例子。

實體

  1. package cn.netkiller.api.domain.elasticsearch;

  2. import java.util.Date;

  3. import javax.persistence.Column;

  4. import javax.persistence.Entity;

  5. import javax.persistence.Id;

  6. import javax.persistence.Table;

  7. @Entity

  8. @Table

  9. public class ElasticsearchTrash {

  10. @Id

  11. private int id;

  12. @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")

  13. private Date ctime;

  14. public int getId() {

  15. return id;

  16. }

  17. public void setId(int id) {

  18. this.id = id;

  19. }

  20. public Date getCtime() {

  21. return ctime;

  22. }

  23. public void setCtime(Date ctime) {

  24. this.ctime = ctime;

  25. }

  26. }

倉庫 

  1. package cn.netkiller.api.repository.elasticsearch;

  2. import org.springframework.data.repository.CrudRepository;

  3. import com.example.api.domain.elasticsearch.ElasticsearchTrash;

  4. public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{

  5. }

定時任務 

  1. package cn.netkiller.api.schedule;

  2. import org.elasticsearch.action.delete.DeleteResponse;

  3. import org.elasticsearch.client.transport.TransportClient;

  4. import org.elasticsearch.rest.RestStatus;

  5. import org.slf4j.Logger;

  6. import org.slf4j.LoggerFactory;

  7. import org.springframework.beans.factory.annotation.Autowired;

  8. import org.springframework.scheduling.annotation.Scheduled;

  9. import org.springframework.stereotype.Component;

  10. import com.example.api.domain.elasticsearch.ElasticsearchTrash;

  11. import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

  12. @Component

  13. public class ScheduledTasks {

  14. private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

  15. @Autowired

  16. private TransportClient client;

  17. @Autowired

  18. private ElasticsearchTrashRepository alasticsearchTrashRepository;

  19. public ScheduledTasks() {

  20. }

  21. @Scheduled(fixedRate = 1000 * 60) // 60秒執行一次排程任務

  22. public void cleanTrash() {

  23. for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {

  24. DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();

  25. RestStatus status = response.status();

  26. logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());

  27. if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {

  28. alasticsearchTrashRepository.delete(elasticsearchTrash);

  29. }

  30. }

  31. }

  32. }

Spring boot 啟動主程式。 

  1. package cn.netkiller.api;

  2. import org.springframework.boot.SpringApplication;

  3. import org.springframework.boot.autoconfigure.SpringBootApplication;

  4. import org.springframework.scheduling.annotation.EnableScheduling;

  5. @SpringBootApplication

  6. @EnableScheduling

  7. public class Application {

  8. public static void main(String[] args) {

  9. SpringApplication.run(Application.class, args);

  10. }

  11. }