1. 程式人生 > >storm trident讀取kafka中資料

storm trident讀取kafka中資料

1. 建立kafka spout

public TransactionalTridentKafkaSpout kafkaSpout(String topic) {
    StormConfig stormConfig = StormConfig.getInstance();

    BrokerHosts hosts = new ZkHosts(stormConfig.getZookeeper());
    TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
    config.scheme = new SchemeAsMultiScheme(new
StringScheme()); return new TransactionalTridentKafkaSpout(config); }

說明:建立TridentKafkaConfig時,上例中傳遞的是zookeeper的地址。實際傳遞kafka broker地址也可以。KafkaUtils中相容了兩種配置,相關原始碼:

public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
    if (conf.hosts instanceof StaticHosts) {
        return
new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation()); } else { return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts); } }

2. 建立Trideng topology

TridentTopology topology = new TridentTopology();

Stream stream1 = topology.newStream("kafkaspout"
, kafkaSpout(topic)); stream1.each(new Fields("str"), new APacketParser(), new Fields("a-value", "a-description")).each(new Fields("a-value", "a-description"), new ProcFunction(), new Fields("whatever")); StormTopology stormTopology = topology.build(); StormSubmitter.submitTopology("trident", new Config(), stormTopology);

說明:從kafka中讀取資訊,submitTopology時不用給任何kafka相關的配置。

相關推薦

storm trident讀取kafka資料

1. 建立kafka spout public TransactionalTridentKafkaSpout kafkaSpout(String topic) { StormConfig stormConfig = StormConfig.getIns

access vba 用recordset讀取資料的簡單方法

'strQuery是表名,查詢名等 Public Function Getrs(Byval strQuery as string) as ADODB.Recordset Dim objRs As New ADODB.Recordset on Error GoTo Er

使用storm trident消費kafka訊息

一、前言   storm通過保證資料至少被處理一次來保證資料的完整性,由於元祖可以重發,對於一些需要資料精確的場景,可以考慮用storm trident實現。    傳統的事物型拓撲中存在幾種bolt: 1.1 BasicBolt  這是最基本的Bolt,BasicBolt每次只能處理一個tuple,而且必

學習筆記-註解+反射讀取Bean資料

我們經常有從資料來源(即javabean中拿資料)的需要,但不同的人對bean中內容的命名五花八門,但利用註解+反射可以寫出通用的提取資料的程式碼。 假設需求是:從一個bean中取出NodeId, NodePId, NodeName三個成員。 假如一個bean是這樣寫的:g

小例子:java利用poi讀取excel資料並匯入資料庫

問題描述: 資料夾下有若干excel檔案,檔名為10.教育局.xls   11.衛生院.xls     ................有很多;中間的漢字為單位名稱,需要匯入資料庫,每個單位名稱要有一個單位id匹配;每個excel中有若干個sheet頁的名字,每個名字即為科

spark 讀取elasticsearch資料不完整問題

使用spark讀取elasticsearch中的資料,使用es提供的api來進行, sc.esRDD("logstash").values 官方網站也是這種方式讀取資料的,但是我測試的時候有時候會出現讀取資料不完整的情況,比如本來讀取的資料是這樣的 Map(msg ->

spark流式讀取hdfs資料

名詞解釋: spark streaming: 定義:一個對實時資料進行高容通量、容錯處理的流式處理系統,可以對多種資料來源進行Map、reduce和join等複雜操作,並將結果儲存到外部檔案系統、

java利用poi讀取excel資料

所需的jar包: 程式碼: /** * * @param cell * 一個單元格的物件 * @return 返回該單元格相應的型別的值 */ public static Object getRightTypeCell(Ce

Kafka系列(四)Kafka消費者:從Kafka讀取資料

本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消

stormkafka資料

========================================== 定義從kafka中讀出的資料 import java.io.UnsupportedEncodingException; import java.util.List; import bac

c++txt檔案的讀取以及在MFC讀取txt座標資料並完成圖形繪製

主要介紹如何讀取txt檔案中的座標資料,並在MFC視窗中繪製出來,工程建立方法和繪圖方法與上一篇博文基本一致,這裡就不再詳贅述,可參考上一篇博文vs2010、MFC視窗中繪製點、線、面。 C++中讀取檔案的方法有兩種,一種是來自於C語言的“檔案指標”方法,另一種是C++中的“檔案流”思想。

python3讀取excel資料

import xlrd import os paths = [r'C:/Users/'] for path in paths: for filename in os.listdir(path): exname = filename.split('.')

讀取資料庫日期為指定月份的資料

比如資料表table中列為date的資料格式為2017-12,那麼我要獲取每年12月份的資料,就要擷取月為12的資料。方法如下: SELECT * FROM TABLE WHERE SUBSTR(date,6,2)='12' SUBSTR(str,pos,len): 從po

springboot兩種讀取application資料的方法

pom.xml配置檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XML

POI實現Excel資料讀取

所需依賴包:poi-3.17.jar、poi-ooxml-3.17.jar、poi-ooxml-schemas-3.17.jar、xmlbeans-2.6.0.jar、commons-collections4-4.1.jar。 依賴包下載地址:http://mvnrepository.com/a

Unity讀取Text的每一行資料存放到字典

//宣告一個存放的字典 public Dictionary<string, string> DressUpNameAndIntroduce = new Dictionary<string, string>(); private void Awake()

SparkStreaming消費Kafka資料 使用zookeeper和MySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

sparkStreaming讀取kafka資料的2種方式

方式一  Receiver           使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spa

python操作txt檔案資料教程[3]-python讀取資料所有txt檔案並將資料轉為csv檔案

python操作txt檔案中資料教程[3]-python讀取資料夾中所有txt檔案並將資料轉為csv檔案 覺得有用的話,歡迎一起討論相互學習~Follow Me 參考文獻 python操作txt檔案中資料教程[1]-使用python讀寫txt檔案 python操作txt檔案中資料教程[2]-pyth

flume將kafkatopic資料匯入hive

一、首先更加資料的表結構在hive中進行表的建立。          create table AREA1(unid string,area_punid string,area_no string,area_name s