1. 程式人生 > >java操作kafka傳送訊息和接收訊息

java操作kafka傳送訊息和接收訊息

<!-- java程式的Kakfa-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.10.0.0</version>
    </dependency>




/*消費者*/
public class KafKaConsumer {

  private final ConsumerConnector consumer;

    private KafKaConsumer() {
        Properties properties = new Properties();
        //zooKeeper配置
       properties.put("zookeeper.connect", "127.0.0.1:2181");

        //group代表一個消費組
        properties.put("group.id", "lingroup");
        //properties.put("zookeeper.sync.time.ms", "2000");
        properties.put("rebalance.max.retries", "5");
        properties.put("rebalance.backoff.ms", "1200");


        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "smallest");

        //序列化類
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
    }

    void consume(){
        Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
        topicCountMap.put(KafKaProducer.TOPIC,new Integer(1));
        StringDecoder keyDecoder=new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder=new StringDecoder(new VerifiableProperties());
        Map<String,List<KafkaStream<String,String>>> consumerMap=consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

         KafkaStream stream=consumerMap.get(KafKaProducer.TOPIC).get(0);
        ConsumerIterator<String,String> iterator=stream.iterator();
         while (iterator.hasNext()){


             System.out.println("接受訊息>>>"+iterator.next().message());

    }

    }

  public static void main(String[] args){
      new KafKaConsumer().consume();
  }

}

生產者:

public class KafKaProducer {
    private final Producer<String,String> producer;
    public final static String TOPIC="linlin";

    private KafKaProducer(){
        Properties properties=new Properties();
        //此處設定kafka的埠
        properties.put("metadata.broker.list","127.0.0.1:9092");
       // properties.put("zk.connect","127.0.0.1:2181");
        //配置value的序列化類
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("key.serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "-1");

        producer=new Producer<String, String>(new ProducerConfig(properties));
    }


    void produce(){
        int messageNo=1000;
        final int count=10000;
        while (messageNo<count){
            String key=String.valueOf(messageNo);
            String data ="hello kafka message"+key;
            producer.send(new KeyedMessage<String, String>(TOPIC,key,data));
            System.out.println("傳送訊息:"+data);
            messageNo++;
        }


    }

  public static void main(String[] args){
       new KafKaProducer().produce();
  }

}

相關推薦

java操作kafka傳送訊息接收訊息

<!-- java程式的Kakfa-->    <dependency>      <groupId>org.apache.kafka</groupId>

RabbitMQ與SpringMVC整合並實現傳送訊息接收訊息(持久化)方案二

   RabbitMQ的大約的介紹,上一篇已經有介紹了,這篇不介紹,直接描述RabbitMQ與SpringMVC整合並實現傳送訊息和接收訊息(持久化)。  使用了Spring-rabbit 傳送訊息和接收訊息,我們使用的Maven來管理Jar包,在Maven的pom.xml

java呼叫rabbitmq訊息佇列傳送接收訊息例項

消費者Consumer.java程式碼 package com.rabbitMQ.pro; import com.rabbitmq.client.ConnectionFactory; impor

kafka單機不能傳送資訊消費訊息(轉)

zookeeper和kafaka服務0.8版本以後預設是不需要配置的,但是本文遇到了一種要修改配置的情形。   終端A開啟zookeeper和kafaka服務後 傳送訊息 bin/kafka-console-producer.sh --broker-list localho

利用API傳送接收訊息,本程式碼測試全通過

~_~一個程式設計師的淘寶店: 點選開啟連結 using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; namespace Set

C#傳送接收訊息

using System; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using S

Spring整合JMS、IBM MQ傳送接收訊息

最近才接觸到MQ,由於之前完全不知道是幹嘛用的,還是很花了一點時間研究的~先來簡單解釋一下名詞啦 一、名詞解釋 MQ MQ(message queue)指訊息佇列,是應用程式對應用程式的通訊方法。可以利用訊息佇列暫存資料報文。 MQ的原理其實就是生產者

spring的jms:用spring的JmsTemplate傳送接收訊息

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE beans PUBLIC "-//SPRING/DTD BEAN/EN" "http://www.springframework.org/dtd/spring-beans.dtd"><

rabbitmq訊息傳送確認消費訊息手動刪除訊息

0.application.properties新增如下配置 # 訊息傳送至exchange callback spring.rabbitmq.publisher-confirms=true # 訊息傳送至queue 失敗才callback spring.rabbitmq.publi

python操作redis進行釋出訂閱訊息

在遠端伺服器安裝redis,並啟動 訂閱端程式碼 import redis pool=redis.ConnectionPool(host='192.168.100.30',

Java之UDP傳輸小Demo(無執行緒即傳送接收端為兩個獨立程序):傳送

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import j

ROS---傳送自定義訊息接收訊息

上一篇已經建好了hello包及其內部的檔案,本篇講訊息 接收訊息 hello_node.cpp 作一些修改 #include "iostream" #include "../include/hello/add.h" #include "ros/ros.h

PHP7.0微信公眾平臺開發4: 例項一:接收普通訊息接收事件推送

<?php define("TOKEN", "peng"); $wechatObj = new wechatCallbackapiTest(); if (!isset($_GET['echostr'])) { $wechatObj->responseMsg(); //呼叫respon

Java 模擬 UDP 傳輸的傳送接收

一、建立 UDP 傳輸的傳送端 建立 UDP 的 Socket 服務; 將要傳送的資料封裝到資料包中; 通過 UDP 的 Socket 服務將資料包傳送出去; 關閉 Socket 服務。 import java.io.IOException; impor

java-TCP協議發送接收數據

amr throws while user class get client put println 服務器: public class FileServerDemo { public static void main(String[] args) throws IO

robotframework 學習(2) :使用RIDE進行介面測試之傳送請求接收資料斷言

一、RIDE的介紹:         RIDE是robotframework圖形操作前端,也可以理解為一種編輯器,它以cell的形式來進行定義資料和方法,返回結果等,我們可以使用它進行建立測試用例和編寫測試指令碼,並且執行自動化測試。  

springboot框架開發微信公眾號(三)之訊息的使用以文字訊息圖文訊息為例

流程圖 程式碼實現 封裝各種訊息的屬性 響應訊息基類 package com.gzc.weixin.message.response; /** * * @Description: 響應訊息基類(公眾賬號→普通使用者) * @Parameters: * @Return: *

整合EaseUI3.0 線上訊息離線訊息

整合環信的過程中,遇到各種坑,各種折磨。至於環信文件神馬的我就不吐槽了。 EaseUI3.0作為Moduel匯入,     一些不要的so檔案該去掉,還是去掉吧,apk 會很大的有木有, 1.線上訊息 1.1 簡單的會話列表 在你新建的Activ

JAVA學習---QQ傳送郵件與接收

QQ郵箱的設定 PS:菜鳥一個,只是為了防止犯下同一個錯誤,所以,寫了這個部落格,歡迎大家多提意見,雖然我不一定會聽,但我一定認真考慮。 第一步:QQ郵箱的設定 話不多說,直接配置: 登入QQ郵箱後,進入設定頁面, 在最上面一行, 賬戶選單下,最下邊有四個

Java操作MongoDB模糊查詢分頁查詢

模糊查詢條件:1、完全匹配Pattern pattern = Pattern.compile("^name$", Pattern.CASE_INSENSITIVE);2、右匹配Pattern pattern = Pattern.compile("^.*name$", P