1. 程式人生 > >hadoop(十四)kafaka訊息佇列

hadoop(十四)kafaka訊息佇列

kafka筆記 


1/kafka是一個分散式的訊息快取系統
2/kafka叢集中的伺服器都叫做broker
3/kafka有兩類客戶端,一類叫producer(訊息生產者),一類叫做consumer(訊息消費者),客戶端和broker伺服器之間採用tcp協議連線
4/kafka中不同業務系統的訊息可以通過topic進行區分,而且每一個訊息topic都會被分割槽,以分擔訊息讀寫的負載
5/每一個分割槽都可以有多個副本,以防止資料的丟失
6/某一個分割槽中的資料如果需要更新,都必須通過該分割槽所有副本中的leader來更新
7/消費者可以分組,比如有兩個消費者組A和B,共同消費一個topic:order_info,A和B所消費的訊息不會重複
比如 order_info 中有100個訊息,每個訊息有一個id,編號從0-99,那麼,如果A組消費0-49號,B組就消費50-99號
8/消費者在具體消費某個topic中的訊息時,可以指定起始偏移量


叢集安裝


1、解壓
2、修改server.properties
broker.id=1
zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181

3、將zookeeper叢集啟動

4、在每一臺節點上啟動broker
bin/kafka-server-start.sh config/server.properties

5、在kafka叢集中建立一個topic
bin/kafka-topics.sh --create --zookeeper weekend05:2181 --replication-factor 3 --partitions 1 --topic order

6、用一個producer向某一個topic中寫入訊息
bin/kafka-console-producer.sh --broker-list weekend:9092 --topic order

7、用一個comsumer從某一個topic中讀取資訊
bin/kafka-console-consumer.sh --zookeeper weekend05:2181 --from-beginning --topic order

8、檢視一個topic的分割槽及副本狀態資訊
bin/kafka-topics.sh --describe --zookeeper weekend05:2181 --topic order

 java客戶端程式設計

依賴jar

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
		props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);

		// 傳送業務訊息
		// 讀取檔案 讀取記憶體資料庫 讀socket埠
		for (int i = 1; i <= 100; i++) {
			Thread.sleep(500);
			producer.send(new KeyedMessage<String, String>("wordcount",
					"i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));
		}

	}
}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
	private static final String topic = "mysons";
	private static final Integer threads = 1;

	public static void main(String[] args) {
		
		Properties props = new Properties();
		props.put("zookeeper.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
		props.put("group.id", "1111");
		props.put("auto.offset.reset", "smallest");

		ConsumerConfig config = new ConsumerConfig(props);
		ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, 1);
		topicCountMap.put("mygirls", 1);
		topicCountMap.put("myboys", 1);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("mygirls");
		
		for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
			new Thread(new Runnable() {
				@Override
				public void run() {
					for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
						String msg = new String(mm.message());
						System.out.println(msg);
					}
				}
			
			}).start();
		
		}
	}
}

整合storm

 

一般是kafaka將訊息傳遞給storm的spout元件

依賴jar: storm-kafka-0.9.2-incubating.jar 等等。。。。

 

 

config.properties

zkConnect=master:2181
zkSessionTimeoutMs=30000
zkConnectionTimeoutMs=30000
zkSyncTimeMs=5000

scheme=date,id,content
separator=,
target=date

bolt

import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordSpliter extends BaseBasicBolt {

	private static final long serialVersionUID = -5653803832498574866L;

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String line = input.getString(0);
		String[] words = line.split(" ");
		for (String word : words) {
			word = word.trim();
			if (StringUtils.isNotBlank(word)) {
				word = word.toLowerCase();
				collector.emit(new Values(word));
			}
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));

	}

}
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
 * 將資料寫入檔案
 * @author [email protected]
 *
 */
public class WriterBolt extends BaseBasicBolt {

	private static final long serialVersionUID = -6586283337287975719L;
	
	private FileWriter writer = null;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			writer = new FileWriter("c:\\storm-kafka\\" + "wordcount"+UUID.randomUUID().toString());
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
	
	
	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String s = input.getString(0);
		try {
			writer.write(s);
			writer.write("\n");
			writer.flush();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}

spout

import java.io.UnsupportedEncodingException;
import java.util.List;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MessageScheme implements Scheme {
	
	private static final long serialVersionUID = 8423372426211017613L;

	@Override
	public List<Object> deserialize(byte[] bytes) {
			try {
				String msg = new String(bytes, "UTF-8");
				return new Values(msg); 
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
			return null;
	}

	@Override
	public Fields getOutputFields() {
		return new Fields("msg");
	}

}

topology

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import cn.itcast.storm.bolt.WordSpliter;
import cn.itcast.storm.bolt.WriterBolt;
import cn.itcast.storm.spout.MessageScheme;

public class KafkaTopo {

	public static void main(String[] args) throws Exception {
		
		String topic = "wordcount";
		String zkRoot = "/kafka-storm";
		String spoutId = "KafkaSpout";
		BrokerHosts brokerHosts = new ZkHosts("weekend01:2181,weekend02:2181,weekend03:2181"); 
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "wordcount", zkRoot, spoutId);
		spoutConfig.forceFromStart = true;
		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
		TopologyBuilder builder = new TopologyBuilder();
		//設定一個spout用來從kaflka訊息佇列中讀取資料併發送給下一級的bolt元件,此處用的spout元件並非自定義的,而是storm中已經開發好的KafkaSpout
		builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
		builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId);
		builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word"));
		Config conf = new Config();
		conf.setNumWorkers(4);
		conf.setNumAckers(0);
		conf.setDebug(false);
		
		//LocalCluster用來將topology提交到本地模擬器執行,方便開發除錯
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("WordCount", conf, builder.createTopology());
		
		//提交topology到storm叢集中執行
//		StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
	}

}

utils

import java.io.InputStream;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * 屬性配置讀取工具
 */
public class PropertyUtil {

	private static final Log log = LogFactory.getLog(PropertyUtil.class);
	private static Properties pros = new Properties();

	// 載入屬性檔案
	static {
		try {
			InputStream in = PropertyUtil.class.getClassLoader().getResourceAsStream("config.properties");
			pros.load(in);
		} catch (Exception e) {
			log.error("load configuration error", e);
		}
	}

	/**
	 * 讀取配置文中的屬性值
	 * @param key
	 * @return
	 */
	public static String getProperty(String key) {
		return pros.getProperty(key);
	}

}