1. 程式人生 > >flume+RabbitMQ+Storm實時日誌運算處理

flume+RabbitMQ+Storm實時日誌運算處理

一.說明
      通常來說,大資料處理hadoop能夠處理絕大部分場景,但是有些需求,比如實時的道路交通情況.實時的訂單情況等等,這類需要實時運算處理的,hadoop處理起來就相對麻煩.
      處理實時資料的框架很多,這裡採用apache storm,佇列的話通常採用kafka,但是因為現有的佇列是RabbitMq叢集,為了這個在單獨搭建一套kafka叢集有點浪費資源,因此採用RabbitMQ作為佇列.而日誌的採集依然採用flume.

二.Storm環境搭建
Storm的工作原理請自行百度,
Strom需要zookeeper,首先安裝zookeeper,前文有安裝描述.此處略
解壓Strom的包.
cd storm/conf
vim storm.yaml

單機配置如下:
storm.zookeeper.servers:      - "192.168.130.132" nimbus.host: "192.168.130.132" storm.local.dir: "/opt/stormDir" supervisor.slots.ports:     - 6700     - 6701     - 6702     - 6703 ui.port: 8089 

儲存配置

按順序啟動
./bin/storm nimbus &./bin/storm supervisor & 
./bin/
storm ui &
jps檢視是否啟動成功.

拓撲的部署方式參考

2.建立scheme


public class RabbitMqScheme implements Scheme{ @Override public List<Object> deserialize(byte[] bytes) { List objs = new ArrayList();    //直接反序列化為string    String str = new String(bytes);    //依次返回UUID,String,Number //    objs.add(UUID.randomUUID().toString());    objs.add(str); //    String numStr = Math.round(Math.random()*8999+1000)+"";  //    objs.add(numStr);    return objs; } @Override public Fields getOutputFields() { return new Fields("str"); }

在拓撲的main方法裡面,建立拓撲配置之前加入
RabbitMqScheme scheme = new RabbitMqScheme(); IRichSpout  spout = new RabbitMQSpout(scheme); ConnectionConfig connectionConfig = new ConnectionConfig("192.168.7.79", 5672, "guest", "guest", "/", 10); // host, port, username, password, virtualHost, heartBeat  ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)                                                        .queue("sdk_pay_trans_queue_key")                                                        .prefetch(200)                                                        .requeueOnFail()                                                        .build(); 好了,接下來在自己的bolt裡面寫邏輯就行了.




四.從flume-RabbitMQ
1.Flume本身不支援RabbitMQ,同樣在GIT上找到前輩們的傑作,不過由於支援的rabbitmq的版本比較老,所以在我除錯後,也支援了雲盤提供的版本.但是目前只支援exchange型別為direct的佇列,即routing-key=queueName的情況.

2.Flume配置如下

a1.sources =  r2 a1.sinks =  k3 a1.channels = c3
a1.sources.r2.type = spooldir a1.sources.r2.spoolDir = /var/log/flume_spoolDir_for_rabbitmq a1.sources.r2.deletePolicy=immediate a1.sources.r2.basenameHeader=true a1.sources.r2.channels=c3 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 200 a1.sinks.k3.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink a1.sinks.k3.host = 192.168.7.79 a1.sinks.k3.port = 5672 a1.sinks.k3.virtual-host = / a1.sinks.k3.username = guest a1.sinks.k3.password = guest a1.sinks.k3.exchange = mq-exchange a1.sinks.k3.routing-key = test_aaa #a1.sinks.k3.publisher-confirms = true a1.sinks.k3.channel = c3 

配置完畢後儲存,往flume監控的資料夾下丟日誌檔案,可以看到被一行行寫入rabbitmq中.


至此,日誌從flume->rabbitMQ->storm已經打通.具體示例以後有空會發出來  


需要的包:
http://pan.baidu.com/s/1nvzUxi5