1. 程式人生 > >kafka+storm整合程式碼

kafka+storm整合程式碼

package com.ljt.stormandkafka.kafkaAndStorm;

import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;

public class KafkaAndStormTopologyMain {
public static void main(String[] args) throws Exception{
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(“kafkaSpout”,
(IRichSpout) new KafkaSpout(new SpoutConfig(
new ZkHosts(“zk01:2181,zk02:2181,zk03:2181”),
“orderMq”,
“/myKafka”,
“kafkaSpout”)),1);
topologyBuilder.setBolt(“mybolt1”,new ParserOrderMqBolt(),1).shuffleGrouping(“kafkaSpout”);

    Config config = new Config();
    config.setNumWorkers(1);

    //3、提交任務  -----兩種模式 本地模式和叢集模式
    if (args.length>0) {
        StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
    }else {
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("storm2kafka", config, topologyBuilder.createTopology());
    }
}

}

package com.ljt.stormandkafka.kafkaAndStorm;

import java.util.HashMap;
import java.util.Map;

import com.google.gson.Gson;
import com.ljt.stormandkafka.order.OrderInfo;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
*
*

Title: ParserOrderMqBolt


*

功能描述::


*

Company: adteach


* @author 劉建濤 *
* @date 2017年8月23日下午7:44:04
* @version 1.0
*/
public class ParserOrderMqBolt extends BaseRichBolt {
private JedisPool pool;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//change “maxActive” -> “maxTotal” and “maxWait” -> “maxWaitMillis” in all examples
JedisPoolConfig config = new JedisPoolConfig();
//控制一個pool最多有多少個狀態為idle(空閒的)的jedis例項。
config.setMaxIdle(5);
//控制一個pool可分配多少個jedis例項,通過pool.getResource()來獲取;
//如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis例項,則此時pool的狀態為exhausted(耗盡)。
//在borrow一個jedis例項時,是否提前進行validate操作;如果為true,則得到的jedis例項均是可用的;
config.setMaxTotal(1000 * 100);
//表示當borrow(引入)一個jedis例項時,最大的等待時間,如果超過等待時間,則直接丟擲JedisConnectionException;
config.setMaxWaitMillis(30);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
/**
*如果你遇到 java.net.SocketTimeoutException: Read timed out exception的異常資訊
*請嘗試在構造JedisPool的時候設定自己的超時值. JedisPool預設的超時時間是2秒(單位毫秒)
*/
pool = new JedisPool(config, “127.0.0.1”, 6379);
}
@Override
public void execute(Tuple input) {
    Jedis jedis = pool.getResource();
    //獲取kafkaSpout傳送過來的資料,是一個json
    String string = new String((byte[]) input.getValue(0));
    //解析json
    OrderInfo orderInfo = (OrderInfo) new  Gson().fromJson(string, OrderInfo.class);
    //整個網站,各個業務線,各個品類,各個店鋪,各個品牌,每個商品
    //獲取整個網站的金額統計指標

// String totalAmount = jedis.get(“totalAmount”);
jedis.incrBy(“totalAmount”,orderInfo.getProductPrice());
//獲取商品所屬業務線的指標資訊
String bid = getBubyProductId(orderInfo.getProductId(),”b”);
// String bAmout = jedis.get(bid+”Amout”);
jedis.incrBy(bid+”Amount”,orderInfo.getProductPrice());
jedis.close();
}

private String getBubyProductId(String productId,String type) {

// key:value
//index:productID:info—->Map
// productId—–<各個業務線,各個品類,各個店鋪,各個品牌,每個商品>
Map

相關推薦

kafka+storm整合程式碼

package com.ljt.stormandkafka.kafkaAndStorm; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig;

flume讀取日誌資料寫入kafka 然後kafka+storm整合

一、flume配置 flume要求1.6以上版本 flume-conf.properties檔案配置內容,sinks的輸出作為kafka的product a1.sources = r1 a1.sinks = k1 a1.cha

kafka+storm整合並執行demo-單機

1,安裝環境jdk1.7,kafka_2.9.2-0.8.1.1.tgz,zookeeper-3.3.6.tar.gz,apache-storm-0.9.2-incubating.tar.gz 2,安裝kafka: I,配置kafka/conf/server.propert

flume+kafka+storm整合實現實時計算小案例

    我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:1.flume收集日誌;2.HDFS輸入路徑儲存日誌;3.MapReduce計算,將結果輸出到HDFS輸出路徑;4.hive+sq

flume+kafka+storm整合00

一、安裝 flume,kafka, storm 的安裝在下面三篇文章: flume:1.6.0 kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大 二、各個部分除錯

storm 整合 kafka之保存MySQL數據庫

ons fin 整合 連接 shu date pri 對數 data 整合Kafka+Storm,消息通過各種方式進入到Kafka消息中間件,比如通過使用Flume來收集的日誌數據,然後暫由Kafka中的路由暫存,然後在由實時計算程序Storm做實時分析,這時候我們需要講S

Storm整合Kafka應用的開發

https://www.cnblogs.com/freeweb/p/5292961.html Storm整合Kafka應用的開發   我們知道storm的作用主要是進行流式計算,對於源源不斷的均勻資料流流入處理是非常有效的,而現實生活中大部分場景並不是均勻的資料流,而是時而多時而少的資料流入

kafkastorm整合

kafka與storm的整合步驟 採用官方storm-kafka-client方式進行整合 一 引入pom依賴 <properties> <project.build.sourceEncoding>UTF-8</

Storm(七)Storm整合kafka

使用kafka-client jar進行Storm Apache Kafka整合 這包括新的Apache Kafka消費者API。相容性 Apache Kafka版本0.10起 引入jar包 &

storm整合kafka舊版API(offset In Zk)示例

編寫主函式啟動類的Topo package com.simon.storm.kafka; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.kafka.

storm整合kafka新版API(offset In Kafka)示例

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&

storm整合kafka新版API(0.8版本之後)

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&l

flume+kafka+storm整合使用

Flume-ng Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。        不過這裡寫寫自己的見解 這個是flume的架構圖  從上圖可以看到幾個名詞: Agent: 一個Agent包含Source、Channel、Sink和其他的元件

storm整合kafka,spout作為kafka的消費者

在之前的部落格中記錄,如何在專案storm中把每條記錄作為訊息傳送到kafka訊息佇列中的。這裡講述如何在storm中消費kafka佇列中的訊息。為何在專案中兩個拓撲檔案校驗和預處理之間要用kafka訊息佇列進行資料的暫存仍需要去落實。 專案中直接使用st

StormKafka整合安裝和測試

1、先保證Storm叢集已經安裝 Storm叢集的安裝可以參考文章:Storm叢集的安裝,Kafka的安裝和測試也可以參考apache kafka官網上的Quick Start 2、安裝Kafka叢集: 2.1 下載Kafka包,我們這裡選擇kafka_2.9.2

storm整合kafka報錯org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for ...

寫了一個storm整合kfaka的程式,kafkaSpout消費的資料作為storm的資料來源。執行報錯如下: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.Keep

2017-08-14 flume+kafka+storm+hdfs整合

基礎環境: Redhat 5.5 64位(我這裡是三臺虛擬機器h40,h41,h42) myeclipse 8.5 jdk1.7.0_25 zookeeper-3.4.5叢集 apache-storm-0.9.5叢集 kafka_2.1

storm整合kafka簡單使用示例2

StormKafkaTopo.java package stormUse.stormUse; import java.util.Properties; import org.apache.storm.Config; import org.apache.

Apache Kafka -7 與Storm整合

Apache Kafka教程 之 與Storm整合 Apache Kafka - 與Storm整合 關於Storm Storm最初是由Nathan Marz和BackType建立的。在短時間內,Apache Storm成為分散式實時

flume-ng+Kafka+Storm+HDFS+jdbc 實時系統搭建的完美整合

一直以來都想接觸Storm實時計算這塊的東西,最近在群裡看到上海一哥們羅寶寫的Flume+Kafka+Storm的實時日誌流系統的搭建文件,自己也跟著整了一遍,之前羅寶的文章中有一些要注意點沒提到的,以後一些寫錯的點,在這邊我會做修正;內容應該說絕大部分引用羅寶的文章的,這裡要謝謝羅寶兄弟,還有寫這篇文章@