1. 程式人生 > >Kafka 生產者消費者 Java API 程式設計

Kafka 生產者消費者 Java API 程式設計

我們先建立一個topic,然後啟動生產者和消費者,進行訊息通訊,然後在使用Kafka API程式設計的方式實現,筆者使用的ZK和Kafka都是單節點,你也可以使用叢集方式。

啟動Zookeeper

zkServer.sh start

啟動Kafka

kafka-server-start.sh $KAFKA_HOME/config/server.properties

建立topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_api

檢視topic詳細資訊

[[email protected] ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_api
Topic:kafka_api PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafka_api    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

啟動生產者和消費者,測試訊息通訊

# 生產者
kafka-console-producer.sh
--broker-list localhost:9092 --topic kafka_api

這裡寫圖片描述

# 消費者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api

這裡寫圖片描述

Java API 程式設計實現
1.建立maven專案,pom.xml中引入kafka依賴
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId
>
kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
2.建立KafkaProperties類,配置Kafka相關屬性
package com.bigdata.kafka;

/**
 * Kafka 相關屬性配置類
 */
public interface KafkaProperties {

    // zookeeper連線,與server.properties中的zookeeper.connect屬性一致,多個用逗號隔開,例如:zk01:2181,zk02:2181
    public static final String ZK = "Master:2181";

    // 如果是多個blocker,用逗號分隔即可,例如:kafka01::9092,kafka02:9093
    public static final String BLOCK_LIST = "Master:9092";

    // 主題
    public static final String TOPIC = "kafka_api";
}
3.Kafka Producer API 開發

生產者API中常用的類如下
Producer:生產者
ProducerConfig:生產者對應的配置
KeyedMessage:封裝的訊息物件

建立KafkaProducer類,程式碼如下

package com.bigdata.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Kafka 生產者
 */
public class KafkaProducer extends Thread {

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaProperties.BLOCK_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    @Override
    public void run() {
        int messageNo = 1;

        while(true) {
            String message = "message_" + messageNo;

            System.out.println("Send:" + message);

            producer.send(new KeyedMessage<Integer, String>(topic, message));

            messageNo ++;

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();
    }
}

執行上述程式碼,在控制檯中使用命令啟動一個消費者,觀察控制檯是否能接收到訊息

這裡寫圖片描述

4.Kafka Consumer API 開發

消費者API中常用的類如下
Consumer:消費者
ConsumerConnector:消費者聯結器
ConsumerConfig:消費者對應的配置
KafkaStream:資料流

建立KafkaConsumer類,程式碼如下

package com.bigdata.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Kafuka 消費者
 */
public class KafkaConsumer extends Thread{

    private String topic;

    public KafkaConsumer(String topic) {
        this.topic = topic;
    }

    private ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", KafkaProperties.ZK);
        properties.setProperty("group.id", "testGroup");

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        // 建立Consumer
        ConsumerConnector consumer = createConsumer();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

        // 獲取每次接受到的資料
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        // 不停地從stream中讀取最新接收到的資料
        while(iterator.hasNext()){
            String message = String.valueOf(iterator.next().message());

            System.out.println("message:" + message);
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}

執行生產者及消費者程式碼,觀察控制檯

生產者控制檯(部分結果):

Send:message_5
Send:message_6
Send:message_7
Send:message_8
Send:message_9
Send:message_10

消費者控制檯(部分結果):只接收最新的資料

message:message_7
message:message_8
message:message_9
message:message_10

相關推薦

Kafka 生產者消費者 Java API 程式設計

我們先建立一個topic,然後啟動生產者和消費者,進行訊息通訊,然後在使用Kafka API程式設計的方式實現,筆者使用的ZK和Kafka都是單節點,你也可以使用叢集方式。 啟動Zookeeper zkServer.sh start 啟動Kafka ka

kafka 和storm Java api程式設計中 pom檔案範例

要注意的是執行的時候可能會遇到日誌檔案jar包重複的情況,這裡要用到<exclusions>排除如下 <exclusion> <groupId>org.slf4j</groupId>

Kafka生產者消費者java示例(包含Avro序列化)

文章內容包含Kafka未進行序列化生產消費java示例,和使用Avro序列化資料進行生產和消費的示例,掌握這些之後就對Kafka的生產消費有基本開發基礎。 1.未序列化 生產者示例: import java.util.Properties; import kafka.ja

Kafka消費者Java API

廢話不多說,直接上程式碼 consumer.java package cn.ysjh; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.cons

kafka叢集搭建和使用Javakafka生產者消費者

http://czj4451.iteye.com/blog/2041096 server.properties 需要配置 broker.id=110 host.name=192.168.1.108 zookeeper.connect=192.168.1.108:2181 log.dirs=/

Kafka生產者消費者Java操作示例

本文提供Java對Kafka生產者、消費者操作的簡單示例: 1.首先看下pom依賴: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>ka

kafka生產者消費者API 與sparkStreaming 整合(scala版)

maven配置檔案 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency>

Spark-利用SparkLauncher 類以JAVA API 程式設計的方式提交spark job

一.環境說明和使用軟體的版本說明: hadoop-version:hadoop-2.9.0.tar.gz  spark-version:spark-2.2.0-bin-hadoop2.7.tgz java-version:jdk1.8.0_151 叢集環境:單機偽分散式環

Spring Boot、kafka、spring-kafka 生產者消費者實踐(從搭建kafka叢集開始)

一、搭建kafka叢集 參考文件:http://kafka.apache.org/quickstart 官方文件講的很詳細,而且沒坑,照著做很快就可以搭好 注意點 or 建議: 1、在Linux下,啟動的kafka叢集經常無故退出,看日誌也沒有報錯,就是啟動了關閉流程,正常關閉

使用ZooKeeper Java API程式設計

https://www.cnblogs.com/IcanFixIt/p/7882107.htmlhttps://blog.csdn.net/qiushisoftware/article/details/79043379https://blog.csdn.net/wo54107

Apache Kafka系列(三) Java API使用

摘要:   Apache Kafka Java Client API 一、基本概念   Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如:     1.建立Topic     2.羅列出已存在的Topic

Kafka生產者消費者例項

主要實現Kafka消費者和生產者最基礎功能。 消費者例項: public class MyKafkaConsumer implements Runnable {private String topic;public MyKafkaConsumer(String topic)

Kafka java api-消費者程式碼與消費分析、生產者消費者配置檔案詳解

1、消費者程式碼 用到消費者,所以也必須先把前面寫過的生產者程式碼也貼一下吧 生產者程式碼與自定義partition 使用maven導包 <dependencies> <dependency> &

Hadoop生態圈-KafkaAPI生產者-消費者

HA size ron 作品 消費 消費者 hadoop ado 原創                     Hadoop生態圈-Kafka的API之生產者-消費者                                             作者:尹正傑 版權

Java併發程式設計(4)--生產者消費者模式介紹

一、前言   這種模式在生活是最常見的,那麼它的場景是什麼樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,如果沒有了則等待生產,如果有就先消費直至消費完成;而生產者每天的工作就是先檢測倉庫是否有庫存,如果沒有就開始生產,滿倉了就停止生產等待

(十六)java併發程式設計--執行緒的死鎖解決方案(生產者消費者幾種實現方式)

上一篇中,主要了解了什麼時候死鎖,並且提出死鎖的一個解決方案,多個鎖要按照一定的順序來。 本片主要是利用生產者消費者模式解決執行緒的死鎖。 多執行緒生產者和消費者一個典型的多執行緒程式。一個生產者生產提供消費的東西,但是生產速度和消費速度是不同的。這就需要讓

kafka消費者生產者Java實現

參考:https://www.cnblogs.com/zlslch/p/5966004.html1、KafkaProducerOps.java//執行成功 import org.apache.kafka.clients.producer.KafkaProducer; impo

kafka叢集配置和java編寫生產者消費者操作例子

kafka 安裝 修改配置檔案 java操作kafka kafka kafka的操作相對來說簡單很多 安裝 下載kafka http://kafka.apache.org/downloads tar -zxvf kafka_2.12-2.1

kafka集群配置和java編寫生產者消費者操作例子

tor http dep org create comm getname fig exp kafka 安裝 修改配置文件 java操作kafka kafka kafka的操作相對來說簡單很多 安裝 下載kafka http://kafka.apache.org

Java多執行緒程式設計生產者-消費者模式的詳解

生產者-消費者模式是一個經典的多執行緒設計模式,它為多執行緒的協作提供了良好的解決方案。在生產者-消費者模式中,通常有兩類執行緒,即若干個生產者執行緒和若干個消費者執行緒。生產者執行緒負責提交使用者請求,消費者執行緒負責處理使用者請求。生產者和消費者之間通過共享記憶體緩衝區