1. 程式人生 > >使用Logstash同步數據至Elasticsearch,Spring Boot中集成Elasticsearch實現搜索

使用Logstash同步數據至Elasticsearch,Spring Boot中集成Elasticsearch實現搜索

開啟 stash auto -a zab rest driver tid list

安裝logstash、同步數據至ElasticSearch

為什麽使用logstash來同步,CSDN上有一篇文章簡要的分析了以下幾種同步工具的優缺點:https://blog.csdn.net/laoyang360/article/details/51694519。

下面開始實踐:

  1. 下載Logstash 安裝包,需要註意版本與elasticsearch保持一致,windows系統下直接解壓即可。

  2.添加同步mysql數據庫的配置,並將mysql連接驅動jar包放在指定的配置目錄

    註: 目前版本的logstash已經集成了logstash-jdbc-input,不需要再配置這個插件,

       配置文件需要UTF-8編碼,我在配置過程中開始新建文件默認的是GBK編碼,後面啟動logstash後讀取配置文件報編碼錯誤了。

我的配置目錄:

技術分享圖片

配置文件mysql.conf

 1 input {
 2     stdin {
 3     }
 4     jdbc {
 5       # mysql數據庫連接
 6       jdbc_connection_string => "jdbc:mysql://localhost/blog?characterEncoding=utf-8&useSSL=false&serverTimezone
=UTC" 7 # mysqly用戶名和密碼 8 jdbc_user => "root" 9 jdbc_password => "123456" 10 # 驅動配置 11 jdbc_driver_library => "C:/logstash-6.4.0/mysqletc/mysql-connector-java-6.0.5.jar" 12 # 驅動類名 13 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" 14 jdbc_paging_enabled => "true"
15 jdbc_page_size => "50000" 16 # 執行指定的sql文件 17 statement_filepath => "C:/logstash-6.4.0/mysqletc/blog.sql" 18 # 設置監聽 各字段含義 分 時 天 月 年 ,默認全部為*代表含義:每分鐘都更新 19 schedule => "* * * * *" 20 # 索引類型 21 type => "blog" 22 } 23 } 24 25 filter { 26 json { 27 source => "message" 28 remove_field => ["message"] 29 } 30 } 31 32 output { 33 34 elasticsearch { 35 #es服務器 36 hosts => ["localhost:9200"] 37 #ES索引名稱 38 index => "sl_blog" 39 #自增ID 40 document_id => "%{id}" 41 } 42 43 44 stdout { 45 codec => json_lines 46 } 47 }

Blog.sql文件:

SELECT * FROM blog

如果需要同步多個mysql表,可以修改output配置文件mysql.conf,在input和output中添加其他的表。

3. 啟動logstash,正常的話將會同步數據值elasticsearch,根據上面的配置logstash每分鐘去數據庫讀取最新數據

  logstash -f ../mysqletc/mysql.conf

技術分享圖片

Elasticseach-head插件中查看:

技術分享圖片

SpringBoot中集成Elasticsearch

下面以開源博客系統new-star-blog(https://github.com/waylau/new-star-blog)為例,簡單實現這個功能。

環境:Springboot 2.0.4 +ES 6.4.0

  1. 添加Spring DataElastic依賴:

<!--SpringBoot默認使用SpringData ElasticSearch模塊進行操作-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

Spring Data Elastic是Spring官方提供的訪問Elasticsearch的方式,相對於直接REST訪問,它有提供了完善的封裝,Spring Data Elastic遵循Spring Data規範。另外它屏蔽了Elasticsearch REST接口的復雜性,由Spring內部實現對Elastic 接口的封裝。

  2. 添加Elasticsearch服務器配置:

#開啟 Elasticsearch 倉庫(默認值:true)
spring.data.elasticsearch.repositories.enabled=true
#默認 9300 是 Java 客戶端的端口。9200 是支持 Restful HTTP 的接口
spring.data.elasticsearch.cluster-nodes = 127.0.0.1:9300
# ES設置連接超時時間
#spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s

  3. 實現代碼:

技術分享圖片
  1 package com.sl.blog.domain;
  2 import org.springframework.data.elasticsearch.annotations.Document;
  3 import org.springframework.data.annotation.Id;
  4 import java.io.Serializable;
  5 import java.util.Date;
  6 //映射Elasticsearch中的索引和文檔類型
  7 @Document(indexName = "sl_blog", type = "blog")
  8 public class EsBlog implements Serializable {
  9 
 10     private static final long serialVersionUID = 1L;
 11     @Id  // 主鍵
 12     private String id;
 13 
 14     private String title;
 15 
 16     //@Field(index = false)
 17     private Date create_time;
 18 
 19     //@Field(index = false,type = FieldType.Long)
 20     private  Long user_id;
 21 
 22     private String tags;
 23 
 24     //@Field(index = false,type = FieldType.Long)
 25     private Long read_size;
 26 
 27 
 28     private Long catalog_id;
 29 
 30     private String summary;
 31 
 32     //@Field(index = false,type = FieldType.Long)
 33     private Long comment_size;
 34 
 35     //@Field(index = false,type = FieldType.Long)
 36     private Long like_size;
 37 
 38     private String content;
 39 
 40     private String username;
 41 
 42     protected EsBlog() {
 43     }
 44 
 45     public String getId() {
 46         return id;
 47     }
 48 
 49     public void setId(String id) {
 50         this.id = id;
 51     }
 52 
 53     public String getTitle() {
 54         return title;
 55     }
 56 
 57     public void setTitle(String title) {
 58         this.title = title;
 59     }
 60 
 61 
 62     public Date getCreate_time() {
 63         return create_time;
 64     }
 65 
 66     public void setCreate_time(Date create_time) {
 67         this.create_time = create_time;
 68     }
 69 
 70     public Long getUser_id() {
 71         return user_id;
 72     }
 73 
 74     public void setUser_id(Long user_id) {
 75         this.user_id = user_id;
 76     }
 77 
 78     public String getTags() {
 79         return tags;
 80     }
 81 
 82     public void setTags(String tags) {
 83         this.tags = tags;
 84     }
 85 
 86     public Long getRead_size() {
 87         return read_size;
 88     }
 89 
 90     public void setRead_size(Long read_size) {
 91         this.read_size = read_size;
 92     }
 93 
 94 
 95     public Long getCatalog_id() {
 96         return catalog_id;
 97     }
 98 
 99     public void setCatalog_id(Long catalog_id) {
100         this.catalog_id = catalog_id;
101     }
102 
103     public String getSummary() {
104         return summary;
105     }
106 
107     public void setSummary(String summary) {
108         this.summary = summary;
109     }
110 
111     public Long getComment_size() {
112         return comment_size;
113     }
114 
115     public void setComment_size(Long comment_size) {
116         this.comment_size = comment_size;
117     }
118 
119     public Long getLike_size() {
120         return like_size;
121     }
122 
123     public void setLike_size(Long like_size) {
124         this.like_size = like_size;
125     }
126 
127     public String getContent() {
128         return content;
129     }
130 
131     public void setContent(String content) {
132         this.content = content;
133     }
134 
135     public String getUsername() {
136         return username;
137     }
138 
139     public void setUsername(String username) {
140         this.username = username;
141     }
142 
143 }
View Code 技術分享圖片
 1 package com.sl.blog.repository;
 2 
 3 import com.sl.blog.domain.EsBlog;
 4 import org.springframework.data.domain.Page;
 5 import org.springframework.data.domain.Pageable;
 6 import org.springframework.data.elasticsearch.repository.ElasticsearchCrudRepository;
 7 import org.springframework.stereotype.Component;
 8 
 9 @Component
10 public interface IEsBlogRepository extends ElasticsearchCrudRepository<EsBlog,String> {
11 
12     Page<EsBlog> findDistinctEsBlogByTitleContainingOrSummaryContainingOrContentContainingOrTagsContaining(String title, String Summary, String content, String tags, Pageable pageable);
13     
14 }
View Code 技術分享圖片
 1 package com.sl.blog.service;
 2 
 3 import com.sl.blog.domain.EsBlog;
 4 import org.springframework.data.domain.Page;
 5 import org.springframework.data.domain.Pageable;
 6 
 7 public interface IEsBlogService {
 8 
 9     Page<EsBlog> getEsBlogByKeys(String keyword, Pageable pageable);
10 }
View Code 技術分享圖片
 1 package com.sl.blog.service.impl;
 2 
 3 import com.sl.blog.domain.EsBlog;
 4 import com.sl.blog.repository.IEsBlogRepository;
 5 import com.sl.blog.service.IEsBlogService;
 6 import joptsimple.internal.Strings;
 7 import org.elasticsearch.index.query.BoolQueryBuilder;
 8 import org.elasticsearch.index.query.QueryBuilders;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.data.domain.Page;
11 import org.springframework.data.domain.PageRequest;
12 import org.springframework.data.domain.Pageable;
13 import org.springframework.data.domain.Sort;
14 import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
15 import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
16 import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
17 import org.springframework.stereotype.Service;
18 
19 @Service
20 public class EsBlogServiceImpl implements IEsBlogService {
21 
22 
23     @Autowired
24     private ElasticsearchTemplate elasticsearchTemplate;
25 
26     @Autowired
27     private IEsBlogRepository esBlogRepository;
28 
29     /**
30      * 通過關鍵字搜索
31      * @param keyword
32      * @param pageable
33      * @return
34      */
35     @Override
36     public Page<EsBlog> getEsBlogByKeys(String keyword, Pageable pageable){
37         Sort sort = new Sort(Sort.Direction.DESC,"read_size","comment_size","like_size");
38         if (pageable.getSort() == null) {
39             pageable = new PageRequest(pageable.getPageNumber(), pageable.getPageSize(), sort);
40         }
41         if(Strings.isNullOrEmpty(keyword)){
42            return esBlogRepository.findAll(pageable);
43         }
44         //keyword 含有空格時拋異常
45         //return esBlogRepository.findDistinctEsBlogByTitleContainingOrSummaryContainingOrContentContainingOrTagsContaining(keyword, keyword, keyword, keyword, pageable);
46 
47         //使用 Elasticsearch API QueryBuilder
48         NativeSearchQueryBuilder aNativeSearchQueryBuilder = new NativeSearchQueryBuilder();
49         aNativeSearchQueryBuilder.withIndices("sl_blog").withTypes("blog");
50         final BoolQueryBuilder aQuery = new BoolQueryBuilder();
51         //builder下有的must、should、mustNot 相當於邏輯運算and、or、not
52         aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("title"));
53         aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("summary"));
54         aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("tags"));
55         aQuery.should(QueryBuilders.queryStringQuery(keyword).defaultField("content"));
56 
57         NativeSearchQuery nativeSearchQuery = aNativeSearchQueryBuilder.withQuery(aQuery).build();
58         Page<EsBlog> plist = elasticsearchTemplate.queryForPage(nativeSearchQuery,EsBlog.class);
59         return  plist;
60 
61     }
62 
63 }
View Code 技術分享圖片
 1 @Controller
 2 @RequestMapping("/blogs")
 3 public class BlogController {
 4 
 5     @Autowired
 6     private IEsBlogService esBlogService;
 7 
 8     @GetMapping
 9     public String listBlogs(@RequestParam(value="order",required=false,defaultValue="new") String order,
10                             @RequestParam(value="keyword",required=false,defaultValue="" ) String keyword,
11                             @RequestParam(value="async",required=false) boolean async,
12                             @RequestParam(value="pageIndex",required=false,defaultValue="0") int pageIndex,
13                             @RequestParam(value="pageSize",required=false,defaultValue="5") int pageSize,
14                             Model model) {
15         Pageable pageable = new PageRequest(pageIndex,pageSize);
16         Page<EsBlog> page = esBlogService.getEsBlogByKeys(keyword,pageable);
17         List<EsBlog> list  = page.getContent();
18         model.addAttribute("order", order);
19         model.addAttribute("keyword", keyword);
20         model.addAttribute("page", page);
21         model.addAttribute("blogList", list);
22         return (async==true?"/index :: #mainContainerRepleace":"/index");
23     }
24 }
View Code

演示效果:

技術分享圖片

使用Logstash同步數據至Elasticsearch,Spring Boot中集成Elasticsearch實現搜索