1. 程式人生 > >storm從kafka中讀資料

storm從kafka中讀資料

==========================================

定義從kafka中讀出的資料

import java.io.UnsupportedEncodingException;
import java.util.List;


import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;


public class MessageScheme implements Scheme {

private static final long serialVersionUID = 8423372426211017613L;


@Override
public List<Object> deserialize(byte[] bytes) {
try {
String msg = new String(bytes, "UTF-8");
return new Values(msg); 
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}


@Override
public Fields getOutputFields() {
return new Fields("msg");
}


}

==========================================

//storm的拓撲圖

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import cn.itcast.storm.bolt.WordSpliter;
import cn.itcast.storm.bolt.WriterBolt;
import cn.itcast.storm.spout.MessageScheme;


public class KafkaTopo {


public static void main(String[] args) throws Exception {

String topic = "wordcount";
String zkRoot = "/kafka-storm";
String spoutId = "KafkaSpout";
BrokerHosts brokerHosts = new ZkHosts("weekend01:2181,weekend02:2181,weekend03:2181"); 
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "wordcount", zkRoot, spoutId);
spoutConfig.forceFromStart = true;
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
//設定一個spout用來從kaflka訊息佇列中讀取資料併發送給下一級的bolt元件,此處用的spout元件並非自定義的,而是storm中已經開發好的KafkaSpout
builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId);
builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(4);
conf.setNumAckers(0);
conf.setDebug(false);

//LocalCluster用來將topology提交到本地模擬器執行,方便開發除錯
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", conf, builder.createTopology());

//提交topology到storm叢集中執行
//StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
}

============================================

切分欄位的bolt

import org.apache.commons.lang.StringUtils;


import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class WordSpliter extends BaseBasicBolt {


private static final long serialVersionUID = -5653803832498574866L;


@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String line = input.getString(0);
String[] words = line.split(" ");
for (String word : words) {
word = word.trim();
if (StringUtils.isNotBlank(word)) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));


}


}

=========================================

寫資料到磁碟的bolt

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;


import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
 * 將資料寫入檔案
 * @author [email protected]
 *
 */
public class WriterBolt extends BaseBasicBolt {


private static final long serialVersionUID = -6586283337287975719L;

private FileWriter writer = null;

@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
writer = new FileWriter("c:\\storm-kafka\\" + "wordcount"+UUID.randomUUID().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}



@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}


@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String s = input.getString(0);
try {
writer.write(s);
writer.write("\n");
writer.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}



}

相關推薦

stormkafka資料

========================================== 定義從kafka中讀出的資料 import java.io.UnsupportedEncodingException; import java.util.List; import bac

java的apihbase資料

package hbase.com.cn.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.Lis

Kafka系列(四)Kafka消費者:Kafka讀取資料

本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消

已解決問題:Excel資料到資料庫,本地VS執行都成功,網站釋出後不能讀取資料

問題如標題:先上程式碼 String path = Server.MapPath("~/UploadExcel/"); string FileName = path + DateTime.Now.ToString("yyyyMMddHHmmss") + fuload.

十三週OJ2檔案資料

問題的及程式碼 /* 煙臺大學計算機與控制工程學院 檔名稱:從檔案中讀資料 作 者:展一 完成時間:2016年11月28日 題目描述  文字檔案score.dic 中儲存了n名學生的資訊(班級編號,姓名,成績),每個學生資訊佔一行,每行的資料之間使用製表符分割,如下所

Java Excel資料

今天工作時遇到一個需求,將Excel表格中的資料匯入到資料庫中,下面我們來看怎麼實現。 我們要用POI這個技術來實現這項功能,我們需要做的準備是下載以下jar包: 接下來我們來看怎麼將Excel中的資料傳到一個集合中 注意:這裡展示的Excel格式是xlsx也就是E

Spark StreamingKafka獲取資料,並進行實時單詞統計,統計URL出現的次數

1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or

csv資料,並寫到csv

import pymysql import numpy as np import csv def connectViogidata(ebayno): db = pymysql.connect(host='', user='s', passwd='2', db='vi

spark streaming direct 直連方式kafka怎麼拉取資料

我們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 資料時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取資料的,而 fromOffsets 一般都是需要我們自己管理的,而每批次的 untilOffse

python檔案寫(file1讀出資料並計算,然後將結果寫入到file2

要求新建兩個檔案,file1、file2,要求開啟file1檔案,分別對每一行數字進行求和,並將每一行的結果寫在file2中。 file1: 20 30 40 20 52 63 52 52 85 52 8 456 522 25 36 85 96 74 程式原始碼: 定義一個求和函式

使用flumekafka的topic取得資料,然後存入hbase和es

接上一篇部落格,將資料進行處理!!!!!!!!!!!!#HBASEtier2.sources  = HbaseAuditSource HbaseRunSource HdfsAuditSources HdfsRunSources HiveAuditSources HiveRun

Spark整合Kafka原始碼分析——SparkStreamingkafak接收資料

整體概括:要實現SparkStreaming從kafak中接收資料分為以下幾步(其中涉及的類在包org.apache.spark.streaming.kafka中): 1.建立createStream()函式,返回型別為ReceiverInputDStream物件,在cre

Spark StreamingKafka獲取數據,並進行實時單詞統計,統計URL出現的次數

scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka

Pig指令碼Hiveload資料並存入到Hbase

1、我們先建一個Hive表test01: create table test01(name String, age int, phone String,province String, city String) ROW FORMAT DELIMITED FIELDS TERMINATED B

python 檔案讀取資料,同時去除掉空格和換行

從檔案中讀取資料,同時去除掉空格和換行,程式碼如下 import numpy as np def sort(path): w = open(path,'r') l = w.readlines() col=[] for k in l: k = k.strip('\n')

hive獲取資料

MySQL中獲取資料 public RestMsg<Object> getZhen( HttpServletRequest request) { RestMsg<Object> rm = new RestMsg<Object>();

SparkStreaming消費Kafka資料 使用zookeeper和MySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

Java基礎-----Excel獲取資料生成shell指令碼

前言 java讀取Excel的驅動包: 連結:https://pan.baidu.com/s/1ejCR9sS2OUmttFYpQnJkKQ 提取碼:58rm 實現1: 從Excel中讀取表名,由於每個欄位會對應一個表名,故讀取的某列會有若干個連續的表名出現,所以用set集合

Flinkkafka消費資料--解析

一、情況介紹: 基於scala語言的Flink從kafka中消費資料,然後使用protobuf解析,當然預設是使用string解析的,此處需要指定接收的資料格式 package cetc.kakfa2flink import java.io.IOException import j

HiveHDFS載入資料

建表         以手機流量資訊為例插入30w行資料   create table flow(id string,phonenum string,mac string,ip string,num1 int,num2 int,up in