java操作kafka傳送訊息和接收訊息
<!-- java程式的Kakfa-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>
public class KafKaConsumer {
private final ConsumerConnector consumer;
private KafKaConsumer() {
Properties properties = new Properties();
//zooKeeper配置
properties.put("zookeeper.connect", "127.0.0.1:2181");
//group代表一個消費組
properties.put("group.id", "lingroup");
//properties.put("zookeeper.sync.time.ms", "2000");
properties.put("rebalance.max.retries", "5");
properties.put("rebalance.backoff.ms", "1200");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "smallest");
//序列化類
properties.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
}
void consume(){
Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
topicCountMap.put(KafKaProducer.TOPIC,new Integer(1));
StringDecoder keyDecoder=new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder=new StringDecoder(new VerifiableProperties());
Map<String,List<KafkaStream<String,String>>> consumerMap=consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream stream=consumerMap.get(KafKaProducer.TOPIC).get(0);
ConsumerIterator<String,String> iterator=stream.iterator();
while (iterator.hasNext()){
System.out.println("接受訊息>>>"+iterator.next().message());
}
}
public static void main(String[] args){
new KafKaConsumer().consume();
}
}
生產者:
public class KafKaProducer {
private final Producer<String,String> producer;
public final static String TOPIC="linlin";
private KafKaProducer(){
Properties properties=new Properties();
//此處設定kafka的埠
properties.put("metadata.broker.list","127.0.0.1:9092");
// properties.put("zk.connect","127.0.0.1:2181");
//配置value的序列化類
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("key.serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks", "-1");
producer=new Producer<String, String>(new ProducerConfig(properties));
}
void produce(){
int messageNo=1000;
final int count=10000;
while (messageNo<count){
String key=String.valueOf(messageNo);
String data ="hello kafka message"+key;
producer.send(new KeyedMessage<String, String>(TOPIC,key,data));
System.out.println("傳送訊息:"+data);
messageNo++;
}
}
public static void main(String[] args){
new KafKaProducer().produce();
}
}
相關推薦
java操作kafka傳送訊息和接收訊息
<!-- java程式的Kakfa--> <dependency> <groupId>org.apache.kafka</groupId>
RabbitMQ與SpringMVC整合並實現傳送訊息和接收訊息(持久化)方案二
RabbitMQ的大約的介紹,上一篇已經有介紹了,這篇不介紹,直接描述RabbitMQ與SpringMVC整合並實現傳送訊息和接收訊息(持久化)。 使用了Spring-rabbit 傳送訊息和接收訊息,我們使用的Maven來管理Jar包,在Maven的pom.xml
java呼叫rabbitmq訊息佇列傳送和接收訊息例項
消費者Consumer.java程式碼 package com.rabbitMQ.pro; import com.rabbitmq.client.ConnectionFactory; impor
kafka單機不能傳送資訊和消費訊息(轉)
zookeeper和kafaka服務0.8版本以後預設是不需要配置的,但是本文遇到了一種要修改配置的情形。 終端A開啟zookeeper和kafaka服務後 傳送訊息 bin/kafka-console-producer.sh --broker-list localho
利用API傳送和接收訊息,本程式碼測試全通過
~_~一個程式設計師的淘寶店: 點選開啟連結 using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; namespace Set
C#傳送和接收訊息
using System; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using S
Spring整合JMS、IBM MQ傳送和接收訊息
最近才接觸到MQ,由於之前完全不知道是幹嘛用的,還是很花了一點時間研究的~先來簡單解釋一下名詞啦 一、名詞解釋 MQ MQ(message queue)指訊息佇列,是應用程式對應用程式的通訊方法。可以利用訊息佇列暫存資料報文。 MQ的原理其實就是生產者
spring的jms:用spring的JmsTemplate傳送和接收訊息
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE beans PUBLIC "-//SPRING/DTD BEAN/EN" "http://www.springframework.org/dtd/spring-beans.dtd"><
rabbitmq訊息傳送確認和消費訊息手動刪除訊息
0.application.properties新增如下配置 # 訊息傳送至exchange callback spring.rabbitmq.publisher-confirms=true # 訊息傳送至queue 失敗才callback spring.rabbitmq.publi
python操作redis進行釋出和訂閱訊息
在遠端伺服器安裝redis,並啟動 訂閱端程式碼 import redis pool=redis.ConnectionPool(host='192.168.100.30',
Java之UDP傳輸小Demo(無執行緒即傳送端和接收端為兩個獨立程序):傳送端
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import j
ROS---傳送自定義訊息,接收訊息
上一篇已經建好了hello包及其內部的檔案,本篇講訊息 接收訊息 hello_node.cpp 作一些修改 #include "iostream" #include "../include/hello/add.h" #include "ros/ros.h
PHP7.0微信公眾平臺開發4: 例項一:接收普通訊息和接收事件推送
<?php define("TOKEN", "peng"); $wechatObj = new wechatCallbackapiTest(); if (!isset($_GET['echostr'])) { $wechatObj->responseMsg(); //呼叫respon
用 Java 模擬 UDP 傳輸的傳送端和接收端
一、建立 UDP 傳輸的傳送端 建立 UDP 的 Socket 服務; 將要傳送的資料封裝到資料包中; 通過 UDP 的 Socket 服務將資料包傳送出去; 關閉 Socket 服務。 import java.io.IOException; impor
java-TCP協議發送和接收數據
amr throws while user class get client put println 服務器: public class FileServerDemo { public static void main(String[] args) throws IO
robotframework 學習(2) :使用RIDE進行介面測試之傳送請求和接收資料斷言
一、RIDE的介紹: RIDE是robotframework圖形操作前端,也可以理解為一種編輯器,它以cell的形式來進行定義資料和方法,返回結果等,我們可以使用它進行建立測試用例和編寫測試指令碼,並且執行自動化測試。
springboot框架開發微信公眾號(三)之訊息的使用以文字訊息和圖文訊息為例
流程圖 程式碼實現 封裝各種訊息的屬性 響應訊息基類 package com.gzc.weixin.message.response; /** * * @Description: 響應訊息基類(公眾賬號→普通使用者) * @Parameters: * @Return: *
整合EaseUI3.0 線上訊息和離線訊息
整合環信的過程中,遇到各種坑,各種折磨。至於環信文件神馬的我就不吐槽了。 EaseUI3.0作為Moduel匯入, 一些不要的so檔案該去掉,還是去掉吧,apk 會很大的有木有, 1.線上訊息 1.1 簡單的會話列表 在你新建的Activ
JAVA學習---QQ傳送郵件與接收
QQ郵箱的設定 PS:菜鳥一個,只是為了防止犯下同一個錯誤,所以,寫了這個部落格,歡迎大家多提意見,雖然我不一定會聽,但我一定認真考慮。 第一步:QQ郵箱的設定 話不多說,直接配置: 登入QQ郵箱後,進入設定頁面, 在最上面一行, 賬戶選單下,最下邊有四個
Java操作MongoDB模糊查詢和分頁查詢
模糊查詢條件:1、完全匹配Pattern pattern = Pattern.compile("^name$", Pattern.CASE_INSENSITIVE);2、右匹配Pattern pattern = Pattern.compile("^.*name$", P