1. 程式人生 > >33.如何將不同型別資料匯入Elaticsearch中(ES同步小結)

33.如何將不同型別資料匯入Elaticsearch中(ES同步小結)

題記

Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 
1)單條資料,如程式中自己構造的JSON格式資料; 
2)符合Elasticsearch索引規範的批量資料; 
3)日誌檔案,格式*.log; 
4)結構化資料,儲存在mysql、oracle等關係型資料庫中; 
5)非結構化資料,儲存在mongo中; 
如何將這些資料匯入到Elasticsearch中呢?接下來,本文將逐個介紹。

這裡寫圖片描述

1、單條索引匯入elasticsearch

該方法類似mysql的insert 語句,用於插入一條資料。

[root@yang json_input]# curl -XPUT 'http://192.168.1.1:9200/blog/article/1' -d '
> { > "title":"New version of Elasticsearch released!", > "content":"Version 1.0 released today!", > "tags":["announce","elasticsearch","release"] > }'

結果檢視如下所示:

[root@yang json_input]# curl -XGET 'http://192.168.1.1:9200/blog/article/1?pretty'
{
  "_index" : "blog",
  "_type" : "article",
  "_id"
: "1", "_version" : 1, "found" : true, "_source" : { "title" : "New version of Elasticsearch released!", "content" : "Version 1.0 released today!", "tags" : [ "announce", "elasticsearch", "release" ] } }

圖形化顯示如下: 
這裡寫圖片描述

2、批量索引匯入到 elasticsearch。

(1)索引結構對映

類似於SQL建立模式描述資料,Mapping控制並定義結構。

[root@yang json_input]# cat mapping.json
{ "book" : { "_all": { "enabled": false }, "properties" : { "author" : { "type" : "string" }, "characters" : { "type" : "string" }, "copies" : { "type" : "long", "ignore_malformed" : false }, "otitle" : { "type" : "string" }, "tags" : { "type" : "string" }, "title" : { "type" : "string" }, "year" : { "type" : "long", "ignore_malformed" : false, "index" : "analyzed" }, "available" : { "type" : "boolean" } } } } [root@yang json_input]# curl -XPUT 'http://110.0.11.120:9200/library/book/_mapping' -d @mapping.json {"acknowledged":true}

(2)批量索引,將構造好的JSON資訊和資料匯入elasticsearch

Elasticsearch可以合併多個請求至單個包中,而這些包可以單個請求的形式傳送。如此,可以將多個操作結合起來:

1)在索引中增加或更換現有文件(index); 
2)從索引中移除文件(delete); 
3)當索引中不存在其他文件定義時,在索引中增加新文件(create)。

為了獲得較高的處理效率,選擇這樣的請求格式。它假定,請求的每一行包含描述操作說明的JSON物件,第二行為JSON物件本身。

可以把第一行看做資訊行,第二行行為資料行。唯一的例外是Delete操作,它只包含資訊行。

舉例如下:

[root@yang json_input]# cat documents_03.json
{ "index": {"_index": "library", "_type": "book", "_id": "1"}}
{ "title": "All Quiet on the Western Front","otitle": "Im Westen nichts Neues","author": "Erich Maria Remarque","year": 1929,"characters": ["Paul Bäumer", "Albert Kropp", "Haie Westhus", "Fredrich Müller", "Stanislaus Katczinsky", "Tjaden"],"tags": ["novel"],"copies": 1, "available": true, "section" : 3}
{ "index": {"_index": "library", "_type": "book", "_id": "2"}}
{ "title": "Catch-22","author": "Joseph Heller","year": 1961,"characters": ["John Yossarian", "Captain Aardvark", "Chaplain Tappman", "Colonel Cathcart", "Doctor Daneeka"],"tags": ["novel"],"copies": 6, "available" : false, "section" : 1}
{ "index": {"_index": "library", "_type": "book", "_id": "3"}}
{ "title": "The Complete Sherlock Holmes","author": "Arthur Conan Doyle","year": 1936,"characters": ["Sherlock Holmes","Dr. Watson", "G. Lestrade"],"tags": [],"copies": 0, "available" : false, "section" : 12}
{ "index": {"_index": "library", "_type": "book", "_id": "4"}}
{ "title": "Crime and Punishment","otitle": "Преступлéние и наказáние","author": "Fyodor Dostoevsky","year": 1886,"characters": ["Raskolnikov", "Sofia Semyonovna Marmeladova"],"tags": [],"copies": 0, "available" : true}

為了執行批量請求,Elasticsearch提供了_bulk端點,形式是/_bulk,或者是/index_name/_bulk, 甚至是/index_name/type_name/_bulk。

Elasticsearch會返回每個操作的具體的資訊,因此對於大批量的請求,響應也是巨大的。

3)執行結果如下所示:

[root@yang json_input]# curl -s -XPOST '10.0.1.30:9200/_bulk' --data-binary @documents_03.json
{"took":150,"errors":false,"items":[{"index":{"_index":"library","_type":"book","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"2","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"3","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"4","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}}]}

執行結果如下圖所示: 
這裡寫圖片描述

3、使用Logstash將 log檔案匯入elasticsearch

以下以專案實戰的 MprobeDebug.log匯入到ES中。

[[email protected] logstash_conf]# tail -f MrobeDebug.log
[DEBUG][2015-07-23 23:59:58,138] : After CurProbe.Update()....lineNo:233, function:main
[DEBUG][2015-07-23 23:59:58,594] : lineNo:960, function:MNetworker::MessageTranslator, revoke nMsgRes = m_MsgPool.PeekMessage(CurMsg);
[DEBUG][2015-07-23 23:59:58,608] : ProbeTaskType_FTP lineNo:148, function:TempProbe::Update
........

核心配置檔案要求如下:

[[email protected] logstash_conf]# cat three.conf
input {
file {
path=> "/opt/logstash/bin/logstash_conf/MrobeDebug.log"
type=>"ttlog"
}
}

output {
elasticsearch {
hosts => "110.10.11.120:9200"
index => "tt_index"
}
stdout { codec => json_lines }
}

匯入結果如下: 
合計匯入資料:200414條。 
這裡寫圖片描述

4、從Mysql/Oracle關係型資料庫向Elasticsearch匯入資料

5、從MongoDB非關係型資料庫向Elasticsearch匯入資料

使用外掛:mongo-connector 
1)mongo與副本整合員連線 
2)初始化副本集配置 
3)Mongo與ES同步操作

相關推薦

33.如何不同型別資料匯入Elaticsearch(ES同步小結)

題記Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql、oracle等關係型資料

如何不同型別資料匯入Elaticsearch

題記 Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql

sqlserver的資料匯入hbase

將sqlserver的資料匯入hbase中 1.解壓sqoop-sqlserver-1.0.tar.gz,並改名(可以不改)          tar  -zxvf  sqoop- sql

MapReduceHDFS文字資料匯入HBase

HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式: 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase 另一種方式就是使用HBase原生Client API 本文就是示範如何通過M

利用sqoophive資料匯入Oracle(踩的坑)

教程很多,這裡只說踩過的坑 1.下載sqoop時,還得下一個bin的包,拿到sqoop-1.4.6.jar 的包,放到hadoop的lib目錄下 2.匯入oracle,執行程式碼時,使用者名稱和表名必須大寫!且資料庫建表時表名必須大寫!  示例程式碼: sqoop expo

mysql匯入資料load data infile用法(txt檔案資料匯入)

我們常常匯入資料!mysql有一個高效匯入方法,那就是load data infile 下面來看案例說明   基本語法: load data  [low_priority] [local] infile 'file_name txt' [replace | ignor

flumekafkatopic資料匯入hive

一、首先更加資料的表結構在hive中進行表的建立。          create table AREA1(unid string,area_punid string,area_no string,area_name s

oracle通過load data 資料匯入通過儲存過程進行批量處理

說明:雖然沒圖,但文字表述很清楚,自己做過的專案留著備用(這只是初版,比較繁瑣,但很明確) 準備工作做完之後,後期可直接使用。如後期excel資料有變更,只需改動對應的部分即可,不涉及改動的可直接使用。 實際操作步驟 依照excel資料模版格式準備好建表語句,將中間過渡

利用sqoophive資料匯入Oracle

首先: 如oracle則執行sqoop list-databases --connect jdbc:oracle:thin:@//192.168.27.235:1521/ORCL --username DATACENTER -P 來測試是否能正確連線資料庫  如mysql則執行sq

Hive 實戰練習(一)—按照日期每天的資料匯入Hive表

需求:         每天會產生很多的日誌檔案資料,有這麼一種需求:需要將每天產生的日誌資料在晚上12點鐘過後定時執行操作,匯入到Hive表中供第二天資料分析使用。要求建立分割槽表,並按照日期分割槽。資料檔案命名是以當天日期命名的,如2015-01-09.txt一、建立分割

模板word的特定欄位替換(資料匯入word

一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 開發程式碼 /** * @Title createContract * @description 生成合

Excel的資料匯入SqlServer的表

記錄一下最近從Excel匯入大量資料到SqlServer表中的步驟。 在將Excel資料準備好以後。 1、右鍵SQL Server中需要匯入資料的庫名,選擇【任務】—【匯入資料】如圖: 2、彈

用sqoopmysql的資料匯入到hive表,原理分析

Sqoop 將 Mysql 的資料匯入到 Hive 中 準備Mysql 資料 如圖所示,準備一張表,資料隨便造一些,當然我這裡的資料很簡單。 編寫命令 編寫引數檔案 個人習慣問題,我喜歡把引數寫到檔案裡,然後再命令列引用。 vim mysql-info, #

用sqoopmysql的資料匯入到hive表

用sqoop將mysql的資料匯入到hive表中 1:先將mysql一張表的資料用sqoop匯入到hdfs中 準備一張表    需求 將 bbs_product 表中的前100條資料導 匯出來  只要id  brand_id和 na

excel資料匯入mysql

1.開啟存放資料段的Excel檔案,處理好列的關係,將不必要的列都刪除掉,將Excel另存為CSV格式。2.在MySQL中建表,列的名字和資料格式都要和Excel中的一致。建表格式參考:CREATE TABLE IF NOT EXISTS data1( id INT UN

c語言不同型別資料間的混合運算

      在程式中經常會遇到不同型別的資料進行運算,如果一個運算子兩側的資料型別不同,則會先自動進行資料型別轉換,使運算子兩側的資料型別相同,然後再進行運算,因此整型、實型、字元型資料間都可以進行混合運算     規律為:       (1)+、-、*、/、運算子兩側中有一

float型別資料寫入暫存器

在FreeBSD中針對暫存器在特殊情況下可能需要寫入float型別的值,具體實現步驟如下: //變數 typedef struct{ float   under_limit;         float   over_limit;  }VFEPIO_SetRawLimit

sqoopMysql資料匯入Hive

注:筆者用的是sqoop1.4.6版本 一、將Mysql資料匯入Hive中 命令: sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true --con

C語言不同型別資料所佔位元組數彙總

16位編譯器         char:1個位元組         char*:2個位元組         short int:2個位元組         int:2個位元組         unsigned int:2個位元組         long:4個位元組         long long:8個位

使用命令列Excel資料匯入Mysql的方法小結

從Excel資料表匯入Mysql,已經做過好幾次了,但每次都會碰到各種問題:invalid utf8 character string, data too long, ...,浪費了不少時間 為了提高效率,是時候指定一個數據匯入的SOP了: 1.準備.txt檔案