1. 程式人生 > >Apache Kafka系列(四) 多執行緒Consumer方案

Apache Kafka系列(四) 多執行緒Consumer方案

本文的圖片是通過PPT截圖出的,讀者如果修改意見請聯絡我

一、Consumer為何需要實現多執行緒

  假設我們正在開發一個訊息通知模組,該模組允許使用者訂閱其他使用者傳送的通知/訊息。該訊息通知模組採用Apache Kafka,那麼整個架構應該是訊息的釋出者通過Producer呼叫API寫入訊息到Kafka Cluster中,然後訊息的訂閱者通過Consumer讀取訊息,剛開始的時候系統架構圖如下:

          但是,隨著使用者數量的增多,通知的資料也會對應的增長。總會達到一個閾值,在這個點上,Producer產生的數量大於Consumer能夠消費的數量。那麼Broker中未消費的訊息就會逐漸增多。即使Kafka使用了優秀的訊息持久化機制來儲存未被消費的訊息,但是Kafka的訊息保留機制限制

(時間,分割槽大小,訊息Key)也會使得始終未被消費的Message被永久性的刪除。另一方面從業務上講,一個訊息通知系統的高延遲幾乎算作是廢物了。所以多執行緒的Consumer模型是非常有必要的。

二、多執行緒的Kafka Consumer 模型類別

  基於Consumer的多執行緒模型有兩種型別:

  • 模型一:多個Consumer且每一個Consumer有自己的執行緒,對應的架構圖如下:

                         

  • 模型二:一個Consumer且有多個Worker執行緒

                         

     兩種實現方式的優點/缺點比較如下:

名稱優點缺點
模型一

1.Consumer Group容易實現

2.各個Partition的順序實現更容易

1.Consumer的數量不能超過Partition的數量,否則多出的Consumer永遠不會被使用到

2.因沒個Consumer都需要一個TCP連結,會造成大量的系統性能損耗

模型二 1.由於通過執行緒池實現了Consumer,橫向擴充套件更方便

1.在每個Partition上實現順序處理更困難。

例如:同一個Partition上有兩個待處理的Message需要被執行緒池中的2個執行緒消費掉,那這兩個執行緒必須實現同步

三、程式碼實現

3.1 前提

    • Kafka Broker 0.11.0
    • JDK1.8
    • IDEA
    • Maven3
    • Kafka環境搭建及Topic建立修改等請參照本系列的前幾篇文章。

 3.2 原始碼結構

                 

       其中,consumergroup包下面對應的是模型一的程式碼,consumerthread包下是模型二的程式碼。ProducerThread是生產者程式碼。

 3.3 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.randy</groupId>
  <artifactId>kafka_multithread_consumer_model</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>kafka_multithread_consumer_model Maven Webapp</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>kafka_multithread_consumer_model</finalName>
  </build>
</project>

 3.4 方案一:Consumer Group

  ProducerThread.java是一個生產者執行緒,傳送訊息到Broker

  ConsumerThread.java是一個消費者執行緒,由於消費訊息

  ConsumerGroup.java用於產生一組消費者執行緒

  ConsumerGroupMain.java是入口類     

3.4.1 ProducerThread.java 

package com.randy;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  11:41
 * Comment :
 */
public class ProducerThread implements Runnable {
    private final Producer<String,String> kafkaProducer;
    private final String topic;

    public ProducerThread(String brokers,String topic){
        Properties properties = buildKafkaProperty(brokers);
        this.topic = topic;
        this.kafkaProducer = new KafkaProducer<String,String>(properties);

    }

    private static Properties buildKafkaProperty(String brokers){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    @Override
    public void run() {
        System.out.println("start sending message to kafka");
        int i = 0;
        while (true){
            String sendMsg = "Producer message number:"+String.valueOf(++i);
            kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){

                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e != null){
                        e.printStackTrace();
                    }
                    System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset());
                }
            });
            // thread sleep 3 seconds every time
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sending message to kafka");
        }
    }
}
View Code

 3.4.2 ConsumerThread.java

package com.randy.consumergroup;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  12:03
 * Comment :
 */
public class ConsumerThread implements Runnable {
    private static KafkaConsumer<String,String> kafkaConsumer;
    private final String topic;

    public ConsumerThread(String brokers,String groupId,String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.topic = topic;
        this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
    }

    private static Properties buildKafkaProperty(String brokers,String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    @Override
    public void run() {
        while (true){
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> item : consumerRecords){
                System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset());
            }
        }
    }
}
View Code

3.4.3 ConsumerGroup.java

package com.randy.consumergroup;

import java.util.ArrayList;
import java.util.List;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:09
 * Comment :
 */
public class ConsumerGroup {
    private final String brokers;
    private final String groupId;
    private final String topic;
    private final int consumerNumber;
    private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>();

    public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){
        this.groupId = groupId;
        this.topic = topic;
        this.brokers = brokers;
        this.consumerNumber = consumerNumber;
        for(int i = 0; i< consumerNumber;i++){
            ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
            consumerThreadList.add(consumerThread);
        }
    }

    public void start(){
        for (ConsumerThread item : consumerThreadList){
            Thread thread = new Thread(item);
            thread.start();
        }
    }
}
View Code

3.4.4 ConsumerGroupMain.java  

package com.randy.consumergroup;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:18
 * Comment :
 */
public class ConsumerGroupMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;

        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber);
        consumerGroup.start();
    }
}
View Code

3.5 方案二:多執行緒的Consumer

  ConsumerThreadHandler.java用於處理髮送到消費者的訊息

  ConsumerThread.java是消費者使用執行緒池的方式初始化消費者執行緒

  ConsumerThreadMain.java是入口類

3.5.1 ConsumerThreadHandler.java

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:29
 * Comment :
 */
public class ConsumerThreadHandler implements Runnable {
    private ConsumerRecord consumerRecord;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord){
        this.consumerRecord = consumerRecord;
    }

    @Override
    public void run() {
        System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset());
    }
}
View Code

3.5.2 ConsumerThread.java

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:42
 * Comment :
 */
public class ConsumerThread {

    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    // Threadpool of consumers
    private ExecutorService executor;


    public ConsumerThread(String brokers, String groupId, String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.consumer = new KafkaConsumer<>(properties);
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public void start(int threadNumber){
        executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for (ConsumerRecord<String,String> item : consumerRecords){
                executor.submit(new ConsumerThreadHandler(item));
            }
        }
    }

    private static Properties buildKafkaProperty(String brokers, String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }


}
View Code

3.5.3 ConsumerThreadMain.java

package com.randy.consumerthread;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:49
 * Comment :
 */
public class ConsumerThreadMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;


        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
        consumerThread.start(3);


    }
}
View Code

四. 總結

  本篇文章列舉了兩種不同的消費者模式。兩者各有利弊。所有程式碼都上傳到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑問或者錯誤請指正

相關推薦

Apache Kafka系列() 執行Consumer方案

本文的圖片是通過PPT截圖出的,讀者如果修改意見請聯絡我 一、Consumer為何需要實現多執行緒   假設我們正在開發一個訊息通知模組,該模組允許使用者訂閱其他使用者傳送的通知/訊息。該訊息通知模組採用Apache Kafka,那麼整個架構應該是訊息的釋出者通過Producer呼叫API寫入訊息到Kafk

【紮實基本功】Java基礎教程系列執行

1. 多執行緒的概念 1.1 程序、執行緒、多程序的概念 程序:正在進行中的程式(直譯)。 執行緒是程式執行的一條路徑, 一個程序中可以包含多條執行緒。 一個應用程式可以理解成就是一個程序。 多執行緒併發執行可以提高程式的效率, 可以同時完成多項工作。 1.

Python併發程式設計系列執行

1引言 2 建立執行緒   2.1 函式的方式建立執行緒   2.2 類的方式建立執行緒 3 Thread類的常用屬性和方法   3.1 守護執行緒:Deamon   3.2 join()方法 4 執行緒間的同步機制   4.1 互斥鎖:Lock   4.2 遞迴鎖:RLock   4.3

Java入門系列-21-執行

什麼是執行緒 在作業系統中,一個應用程式的執行例項就是程序,程序有獨立的記憶體空間和系統資源,在工作管理員中可以看到程序。 執行緒是CPU排程和分派的基本單位,也是程序中執行運算的最小單位,可完成一個獨立的順序控制流程,當然一個程序中可以有多個執行緒。 多執行緒:一個程序中同時運行了多個執行緒,每個執行緒用來

C#基礎系列執行的常見用法詳解

前言:此篇就主要從博主使用過的幾種多執行緒的用法從應用層面大概介紹下。文中觀點都是博主個人的理解,如果有不對的地方望大家指正~~ 1、多執行緒:使用多個處理控制代碼同時對多個任務進行控制處理的一種技術。據博主的理解,多執行緒就是該應用的主執行緒任命其他多個執行緒去協

JAVA筆試面試題系列之----①執行

1.      程序和執行緒: 程序:正在進行的程式。每一個程序執行都有一個執行順序,該順序是一個執行路徑,或者叫一個控制單元。 執行緒:程序內部的一條執行路徑或者一個控制單元。 兩者的區別: 一個程序至少有一個執行緒 程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享

C#執行學習() 執行的自動管理(執行池)

在多執行緒的程式中,經常會出現兩種情況: 一種情況: 應用程式中,執行緒把大部分的時間花費在等待狀態,等待某個事件發生,然後才能給予響應 這一般使用ThreadPool(執行緒池)來解決; 另一種情況:執行緒平時都處於休眠狀態,只是週期性地被喚醒 這一般使用Tim

Thread執行系列執行下載

瞭解了這麼多與執行緒相關的知識,那麼我們也要實戰一下了(在學習本篇知識之前,如果對java中的網路基礎連結不太熟悉的,建議先去學一下java網路程式設計,再來看本文章。)因為本篇是多執行緒下載的demo,所以就直接附上程式碼,裡面都寫好了註釋,不影響對本篇的學習。packag

java執行系列01——執行基礎

import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class NewThreadDemo {

[C#] C#執行學習() 執行的自動管理(執行池)

Thread Pool Sample: Queuing  10  items to Thread Pool Queue to Thread Pool  0 Queue to Thread Pool  1 Queue to Thread Pool  2 Queue to Thread Pool  3 Q

Flex和Bison的C++可重進入—執行解決方案

這樣user可以在解析程式中使用driver or parm這兩個變數.這兩個變數都是從外面通過引數的形式傳給解析程式,那此時bison如何跟flex通訊呢?這時候就不會透過全域性變數yylval等進行通訊了,加入%pure_parser新生成的程式碼中已經沒有yylval這個全域性變量了,在解析器的內部實際

java網路程式設計(一):java傳統的阻塞IO以及執行解決方案

最近在看一些IO模型相關的東西,被同步IO、非同步IO、阻塞IO、非阻塞IO概念弄的有點暈,後面再慢慢學習和領悟。我們以socket IO程式設計為例子,我用的是JDK1.7.0_80,測試工具用的是

C# 執行學習系列之取消、超時子執行操作

1、簡介 雖然ThreadPool、Thread能開啟子執行緒將一些任務交給子執行緒去承擔,但是很多時候,因為某種原因,比如子執行緒發生異常、或者子執行緒的業務邏輯不符合我們的預期,那麼這個時候我們必須關閉它,而不是讓它繼續執行,消耗資源.讓CPU不在把時間和資源花在沒有意義的程式碼上.  

Java執行系列--“JUC執行池”05之 執行池原理()

概要 本章介紹執行緒池的拒絕策略。內容包括: 拒絕策略介紹 拒絕策略對比和示例 拒絕策略介紹 執行緒池的拒絕策略,是指當任務新增到執行緒池中被拒絕,而採取的處理措施。 當任務新增到執行緒池中之所以被拒絕,可能是由於:第一,執行緒池異常關閉。第二,任務數量

java執行系列之模式|第三篇: Producer-Consumer pattern

生產者-消費者模式 含義:顧名思義,生產者用來生產資料,可能有一到多個,消費者用來消費資料,也可能有多個,中間會有一個“橋樑參與者”,作為資料的存放以及執行緒之間的同步和協調。 範例程式行為: 廚師(MakerThread)做蛋糕,做好後放在桌子(Table)上 桌子

C++執行系列(C++11)-uniqu_lock(

Data 2018/11/12 Add By  WJB 在多執行緒中,有時候會出現一個方法中又一斷或者多段程式碼需要加鎖,但是並非整個方法程式碼加鎖,那麼我們就需要一個靈活的鎖-unique_lock;說明:unique_lock會降低程式碼執行效率,不推薦使用。 我們接

Java總結篇系列:Java執行

多個執行緒同步執行ping ip示例package com.ebao.pojo;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import j

Java執行乾貨系列—()volatile關鍵字

今天介紹下volatile關鍵字,volatile這個關鍵字可能很多朋友都聽說過,或許也都用過。在Java 5之前,它是一個備受爭議的關鍵字,因為在程式中使用它往往會導致出人意料的結果。在Java 5之後,volatile關鍵字才得以重獲生機。 正文 volatile關鍵字雖然從字面上理解起來比較簡單,但是

執行學習-----執行同步()

兩個執行緒要執行的程式碼片段要實現同步互斥的效果,它們必須用同一個Lock物件,鎖是上在代表要操作的資源類的內部方法中的,而不是線上程程式碼中, 問題:子執行緒迴圈10次,回到主執行緒執行100次,接著又回到子執行緒執行10次,再回到主執行緒執行100次,如此迴圈50次,該如何實現? p

Java 執行)—— 單例模式

這篇部落格介紹執行緒安全的應用——單例模式。 單例模式   單例模式,是一種常用的軟體設計模式。在它的核心結構中只包含一個被稱為單例的特殊類。通過單例模式可以保證系統中,應用該模式的類一個類只有一個例項。即一個類只有一個物件例項。 例項: /** * @author