1. 程式人生 > >利用Flume將MySQL表資料準實時抽取到HDFS

利用Flume將MySQL表資料準實時抽取到HDFS

一、為什麼要用到Flume

        在以前搭建HAWQ資料倉庫實驗環境時,我使用Sqoop抽取從MySQL資料庫增量抽取資料到HDFS,然後用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成資料抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapReduce讀寫資料,而MapReduce是為了批處理場景設計的,目標是大吞吐量,並不太關心低延時問題。就像實驗中所做的,每天定時增量抽取資料一次。
        Flume是一個海量日誌採集、聚合和傳輸的系統,支援在日誌系統中定製各類資料傳送方,用於收集資料。同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方的能力。Flume以流方式處理資料,可作為代理持續執行。當新的資料可用時,Flume能夠立即獲取資料並輸出至目標,這樣就可以在很大程度上解決實時性問題。
        Flume是最初只是一個日誌收集器,但隨著flume-ng-sql-source外掛的出現,使得Flume從關係資料庫採集資料成為可能。下面簡單介紹Flume,並詳細說明如何配置Flume將MySQL表資料準實時抽取到HDFS。

二、Flume簡介

1. Flume的概念

        Flume是分散式的日誌收集系統,它將各個伺服器中的資料收集起來並送到指定的地方去,比如說送到HDFS,簡單來說flume就是收集日誌的,其架構如圖1所示。
圖1

2. Event的概念 

        在這裡有必要先介紹一下Flume中event的相關概念:Flume的核心是把資料從資料來源(source)收集過來,在將收集到的資料送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先快取資料(channel),待資料真正到達目的地(sink)後,Flume再刪除自己快取的資料。 
       在整個資料的傳輸的過程中,流動的是event,即事務保證是在event級別進行的。那麼什麼是event呢?Event將傳輸的資料進行封裝,是Flume傳輸資料的基本單位,如果是文字檔案,通常是一行記錄。Event也是事務的基本單位。Event從source,流向channel,再到sink,本身為一個位元組陣列,並可攜帶headers(頭資訊)資訊。Event代表著一個數據的最小完整單元,從外部資料來源來,向外部的目的地去。

3. Flume架構介紹 

        Flume之所以這麼神奇,是源於它自身的一個設計,這個設計就是agent。Agent本身是一個Java程序,執行在日誌收集節點——所謂日誌收集節點就是伺服器節點。 Agent裡面包含3個核心的元件:source、channel和sink,類似生產者、倉庫、消費者的架構。 
  • Source:source元件是專門用來收集資料的,可以處理各種型別、各種格式的日誌資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。 
  • Channel:source元件把資料收集來以後,臨時存放在channel中,即channel元件在agent中是專門用來存放臨時資料的——對採集到的資料進行簡單的快取,可以存放在memory、jdbc、file等等。 
  • Sink:sink元件是用於把資料傳送到目的地的元件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。 

4. Flume的執行機制 

        Flume的核心就是一個agent,這個agent對外有兩個進行互動的地方,一個是接受資料輸入的source,一個是資料輸出的sink,sink負責將資料傳送到外部指定的目的地。source接收到資料之後,將資料傳送給channel,chanel作為一個數據緩衝區會臨時存放這些資料,隨後sink會將channel中的資料傳送到指定的地方,例如HDFS等。注意:只有在sink將channel中的資料成功傳送出去之後,channel才會將臨時資料進行刪除,這種機制保證了資料傳輸的可靠性與安全性。 

三、安裝Hadoop和Flume

        我的實驗在HDP 2.5.0上進行,HDP安裝中包含Flume,只要配置Flume服務即可。HDP的安裝步驟參見“HAWQ技術解析(二) —— 安裝部署

四、配置與測試

1. 建立MySQL資料庫表

        建立測試表並新增資料。
use test;

create table  wlslog  
(id         int not null,
 time_stamp varchar(40),
 category   varchar(40),
 type       varchar(40),
 servername varchar(40),
 code       varchar(40),
 msg        varchar(40),
 primary key ( id )
);

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
commit;

2. 建立相關目錄與檔案

(1)建立本地狀態檔案
mkdir -p /var/lib/flume
cd /var/lib/flume
touch sql-source.status
chmod -R 777 /var/lib/flume

(2)建立HDFS目標目錄
hdfs dfs -mkdir -p /flume/mysql
hdfs dfs -chmod -R 777 /flume/mysql

3. 準備JAR包

        從http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下載flume-ng-sql-source-1.3.7.jar檔案,並複製到Flume庫目錄。
cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/
        將MySQL JDBC驅動JAR包也複製到Flume庫目錄。
cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 建立HAWQ外部表

create external table ext_wlslog
(id         int,
 time_stamp varchar(40),
 category   varchar(40),
 type       varchar(40),
 servername varchar(40),
 code       varchar(40),
 msg        varchar(40)
) location ('pxf://mycluster/flume/mysql?profile=hdfstextmulti') format 'csv' (quote=e'"'); 

5. 配置Flume

        在Ambari -> Flume -> Configs -> flume.conf中配置如下屬性:
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.channels = ch1
agent.sinks = HDFS

agent.sources = sql-source
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test
agent.sources.sql-source.user = root
agent.sources.sql-source.password = 123456
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0
        Flume在flume.conf檔案中指定Source、Channel和Sink相關的配置,各屬性描述如表1所示。

屬性

描述

agent.channels.ch1.type

Agent的channel型別

agent.sources.sql-source.channels

Source對應的channel名稱

agent.channels

Channel名稱

agent.sinks

Sink名稱

agent.sources

Source名稱

agent.sources.sql-source.type

Source型別

agent.sources.sql-source.connection.url

資料庫URL

agent.sources.sql-source.user

資料庫使用者名稱

agent.sources.sql-source.password

資料庫密碼

agent.sources.sql-source.table

資料庫表名

agent.sources.sql-source.columns.to.select

查詢的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

發起查詢的時間間隔,單位是毫秒

agent.sources.sql-source.status.file.path

狀態檔案路徑

agent.sources.sql-source.status.file.name

狀態檔名稱

agent.sinks.HDFS.channel

Sink對應的channel名稱

agent.sinks.HDFS.type

Sink型別

agent.sinks.HDFS.hdfs.path

Sink路徑

agent.sinks.HDFS.hdfs.fileType

流資料的檔案型別

agent.sinks.HDFS.hdfs.writeFormat

資料寫入格式

agent.sinks.HDFS.hdfs.rollSize

目標檔案輪轉大小,單位是位元組

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink間隔多長將臨時檔案滾動成最終目標檔案,單位是秒;如果設定成0,則表示不根據時間來滾動檔案

agent.sinks.HDFS.hdfs.rollCount

當events資料達到該數量時候,將臨時檔案滾動成目標檔案;如果設定成0,則表示不根據events資料來滾動檔案

表1


6. 執行Flume代理

        儲存上一步的設定,然後重啟Flume服務,如圖2所示。
圖2
        重啟後,狀態檔案已經記錄了將最新的id值7,如圖3所示。
圖3
        檢視目標路徑,生成了一個臨時檔案,其中有7條記錄,如圖4所示。
圖4
        查詢HAWQ外部表,結果也有全部7條資料,如圖5所示。
圖5
        至此,初始資料抽取已經完成。

7. 測試準實時增量抽取

        在源表中新增id為8、9、10的三條記錄。
use test;
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
commit;
        5秒之後查詢HAWQ外部表,從圖6可以看到,已經查詢出全部10條資料,準實時增量抽取成功。
圖6

五、方案優缺點

        利用Flume採集關係資料庫表資料最大的優點是配置簡單,不用程式設計。相比tungsten-replicator的複雜性,Flume只要在flume.conf檔案中配置source、channel及sink的相關屬性,已經沒什麼難度了。而與現在很火的canal比較,雖然不夠靈活,但畢竟一行程式碼也不用寫。再有該方案採用普通SQL輪詢的方式實現,具有通用性,適用於所有關係庫資料來源。
        這種方案的缺點與其優點一樣突出,主要體現在以下幾方面。
  • 在源庫上執行了查詢,具有入侵性。
  • 通過輪詢的方式實現增量,只能做到準實時,而且輪詢間隔越短,對源庫的影響越大。
  • 只能識別新增資料,檢測不到刪除與更新。
  • 要求源庫必須有用於表示增量的欄位。
        即便有諸多侷限,但用Flume抽取關係庫資料的方案還是有一定的價值,特別是在要求快速部署、簡化程式設計,又能滿足需求的應用場景,對傳統的Sqoop方式也不失為一種有效的補充。

參考:

Flume架構以及應用介紹
Streaming MySQL Database Table Data to HDFS with Flume
how to read data from oracle using FLUME to kafka broker
https://github.com/keedio/flume-ng-sql-source

相關推薦

利用FlumeMySQL資料實時抽取HDFSMySQL、Kafka

軟體版本號 jdk1.8、apache-flume-1.6.0-bin、kafka_2.8.0-0.8.0、zookeeper-3.4.5叢集環境安裝請先測試; 參考以下作者資訊,特此感謝;http://blog.csdn.net/wzy0623/article/detail

利用FlumeMySQL資料實時抽取HDFS

一、為什麼要用到Flume        在以前搭建HAWQ資料倉庫實驗環境時,我使用Sqoop抽取從MySQL資料庫增量抽取資料到HDFS,然後用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成資料抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapRe

使用FlumeMySQL資料實時抽取到hadoop

一、為什麼要用到Flume 在以前搭建HAWQ資料倉庫實驗環境時,我使用Sqoop抽取從MySQL資料庫增量抽取資料到HDFS,然後用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成資料抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapReduce

FlumeMySQL資料存入到HDFS

浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>   

FlumeMySQL資料存入到HBase

浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>   

利用shellmysql資料匯出到檔案和執行mysql語句

利用mysqldump匯出mysql資料 匯出指定條件的資料庫 命令格式 mysqldump -u使用者名稱 -p密碼 -h主機 -P埠 資料庫名 表名 --where "sql語句" > 路徑 示例程式碼 #!/bin/bash #變數定義 host="127.0.

Windows64環境下 使用FlumeMysql增量資料同步到Kafka

一.軟體準備1.jdk1.74.maven 下載地址 二.安裝並啟動Kafka1.安裝kafka此步驟看文章,比較詳細,相信不會有問題2.按順序啟動kafka(windows cmd下)    2.1

利用PythonExcel中的資料匯入Mysql資料庫

 python操作Excel 需要匯入xlrd包,可以通過pip install xlrd 一鍵安裝。 #coding=utf-8 ''' python 將指定目錄下的excel檔案匯入到資料庫中 ''' # 引入資料庫包 import pymysql # 引入操作excel包

利用poiexcel資料讀取存入mysql資料庫(資料量比較大)

最近被老大安排了一個任務,利用程式將excle表中的資料讀取到,做處理,然後存進資料庫。接到任務的時候人是懵逼的。但是安排的任務也得硬著頭皮完成。現將做的東西記錄如下,方便以後查詢。 這個小demo的原型是在網上找的,demo連結如下 http://www.cnblogs.

利用python mysql 資料進行抽取並清理成標準格式後 存入MSSql 資料

from pymongo import MongoClientfrom pymysql import connectimport pymssqlfrom datetime import datetime, timedeltaimport time Nagios 資料庫的IP 地址 NagiosDB_IP

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇)

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇) 2018年03月04日 23:05:29 冰 河 閱讀數:1602更多 所屬專欄: Hadoop生態 版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https:/

用sqoopmysql資料匯入到hive中,原理分析

Sqoop 將 Mysql 的資料匯入到 Hive 中 準備Mysql 資料 如圖所示,準備一張表,資料隨便造一些,當然我這裡的資料很簡單。 編寫命令 編寫引數檔案 個人習慣問題,我喜歡把引數寫到檔案裡,然後再命令列引用。 vim mysql-info, #

pythonmysql資料抽取到另一個mysql庫中,持續更新抽取到oracle中

import MySQLdb import ConfigParser class Mysql2Mysql(object):     def getConn(self,filename,dbname):         cf = ConfigParser.ConfigPars

用sqoopmysql資料匯入到hive

用sqoop將mysql的資料匯入到hive表中 1:先將mysql一張表的資料用sqoop匯入到hdfs中 準備一張表    需求 將 bbs_product 表中的前100條資料導 匯出來  只要id  brand_id和 na

應對sharding-jdbc結合mybatis實現分庫分表功能 分的聯合查詢採用mysql資料同步到elasticsearch進行篩選

應對sharding-jdbc結合mybatis實現分庫分表功能  分表的聯合查詢採用將mysql的資料同步到elasticsearch進行篩選 安裝操作指南:(1)、(2) 其中windows目錄展示如下: 版本控制:1. 需要jdk:1.8(1.8.0_60)

利用SqoopMySQL海量測試資料匯入HDFS和HBase

宣告:作者原創,轉載註明出處。 一、安裝Sqoop 1、下載sqoop,解壓、資料夾重新命名 wget http://mirror.bit.edu.cn/apache/sqoop/1.4.6/sqoop-1.4.6.bin_

利用pyhivehive查詢資料匯入到mysql

在大資料工作中經常碰到需要將hive查詢資料匯入到mysql的需求,常見的方法主要有兩種,一是sqoop,另一種則是pyhive。本文主要講的就是python的pyhive庫的安裝與使用。 pyhive作用 遠端連線hive資料庫,執行hive sql,而

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(程式案例篇)

一、前言二、簡單介紹為了方便,這裡我們只是簡單的向/home/flume/log.log中追加單詞,每行一個單詞,利用Storm接收每個單詞,將單詞計數更新到資料庫,具體的邏輯為,如果資料庫中沒有相關單詞,則將資料插入資料庫,如果存在相關單詞,則更新資料庫中的計數。具體SQL

利用SqoopMySQL數據導入Hive中

sqoop參考http://www.cnblogs.com/iPeng0564/p/3215055.htmlhttp://www.tuicool.com/articles/j2yayyjhttp://blog.csdn.net/jxlhc09/article/details/168568731.list da

利用pandasmysql查詢出得結果寫入到excel文件

pandas excel#!/usr/bin/env python3import pandas as pdimport pymysql#返回SQL結果的函數def getrel(sql): conn = pymysql.connect(host='localhost',user=