1. 程式人生 > >Kafka 入門教程之二: Java連線Kafka之生產者

Kafka 入門教程之二: Java連線Kafka之生產者

1. 檢查service配置檔案

修改引數 advertised.listeners=PLAINTEXT://tjtestrac1:9092
注意防火牆對埠的遮蔽
[[email protected] config]$ vi server.properties 
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 #listeners=PLAINTEXT:tjtestrac1:9092 # Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://tjtestrac1:9092

2. 重啟kafka 服務

這裡採用最簡單粗暴的方式關閉 生產千萬不要這樣
[[email protected] config]$ jps
12048 Jps
30323 QuorumPeerMain
4739 Kafka 
[
[email protected]
config]
$ kill -9 30323 [[email protected] config]$ kill -9 4739 [[email protected] config]$ jps 12727 Jps
確認關閉乾淨後 重新啟動
[[email protected] config]$ zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties & 
[1] 16757
[[email protected] config]$ jps
17093 Jps
16757 QuorumPeerMain

[[email protected] config]$  kafka-server-start.sh $KAFKA_HOME/config/server.properties &
[[email protected] config]$ jps
19858 Kafka
16757 QuorumPeerMain
20600 Jps

2. Kafka producer 的體系結構和基本概念

Producer component

Producer

Producer 提供了很多不同場景下的 API 介面:
1. 對於傳統的credit card 交易系統是不允許丟失資料以及產生錯誤的重複資料
1. 對於網際網路資料使用者行為跟蹤等資料的採集,系統是可以容忍資料丟失以及錯誤資料的產生的

3. Eclipse 構建Kafka 專案 (Maven 構建)

a) 建立名為kafkaClient 的專案

Kafka Client Project

b) 配置pom.xml 檔案

https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.0

Pom.xml 檔案設定

配置slf4j 服務

https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.8.0-beta2
在這裡插入圖片描述

https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12/1.8.0-beta2

在這裡插入圖片描述

https://mvnrepository.com/artifact/org.slf4j/slf4j-nop/1.8.0-beta2
在這裡插入圖片描述
https://mvnrepository.com/artifact/commons-logging/commons-logging
在這裡插入圖片描述
https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.8.0-beta2
在這裡插入圖片描述
https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14/1.8.0-beta2
在這裡插入圖片描述

在pom 檔案中加入
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  <modelVersion>4.0.0</modelVersion>
  <groupId>KafkaClient</groupId>
  <artifactId>KafkaClient</artifactId>
  <version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.8.0-beta2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.8.0-beta2</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
<dependency>
    <groupId>commons-logging</groupId>
    <artifactId>commons-logging</artifactId>
    <version>1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.8.0-beta2</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.8.0-beta2</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-jdk14</artifactId>
    <version>1.8.0-beta2</version>
    <scope>test</scope>
</dependency>
</dependencies>
 

log4j.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
  <appender name="console" class="org.apache.log4j.ConsoleAppender"> 
    <param name="Target" value="System.out"/> 
    <layout class="org.apache.log4j.PatternLayout"> 
      <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/> 
    </layout> 
  </appender> 

  <root> 
    <priority value ="debug" /> 
    <appender-ref ref="console" /> 
  </root>

</log4j:configuration>

c) 建立一個生產者的類 MsgSender

MsgSender

生產者幾個配置的必要資訊如下:

parameter name parameter desc
bootstrap.servers 配置連線Kafka 伺服器的連線字串資訊
key.serializer Key 的class: 介面org.apache.kafka.common.serialization.Serializer 的實現類
value.serializer Value 的 class: 生產者指定固定的類的物件向broker 傳送資訊
java 程式碼:
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MsgSender {

  /**
   * @param args
   */
  public static String TOPIC = "TestMsg";
  public static void main(String[] args)throws Exception {
  	Properties kafkaProps = new Properties();
  	kafkaProps.setProperty("bootstrap.servers", "tjtestrac1:9092");
  	kafkaProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  	kafkaProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  	Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
  	ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "123123123",
  			"Welcome to my home!!! ");
  	try {
  		System.out.print("sending start..............");
  		Future future  = producer.send(record);
  		future.get();
  		System.out.print("sending end..............");
  	} catch (Exception e) {
  		e.printStackTrace();
  	}finally {
  		producer.close();
  		
  	}

  }

} 

控制檯輸出

INFO  ProducerConfig - ProducerConfig values: 
   acks = 1
   batch.size = 16384
   bootstrap.servers = [tjtestrac1:9092]
   buffer.memory = 33554432
   client.dns.lookup = default
   client.id = 
   compression.type = none
   connections.max.idle.ms = 540000
   delivery.timeout.ms = 120000
   enable.idempotence = false
   interceptor.classes = []
   key.serializer = class org.apache.kafka.common.serialization.StringSerializer
   linger.ms = 0
   max.block.ms = 60000
   max.in.flight.requests.per.connection = 5
   max.request.size = 1048576
   metadata.max.age.ms = 300000
   metric.reporters = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
   receive.buffer.bytes = 32768
   reconnect.backoff.max.ms = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retries = 2147483647
   retry.backoff.ms = 100
   sasl.client.callback.handler.class = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   send.buffer.bytes = 131072
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
   ssl.endpoint.identification.algorithm = https
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLS
   ssl.provider = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   transaction.timeout.ms = 60000
   transactional.id = null
   value.serializer = class org.apache.kafka.common.serialization.StringSerializer

登陸伺服器 驗證訊息是否傳送成功

[[email protected] config]$ kafka-topics.sh --list --zookeeper localhost:2181
TestMsg
__consumer_offsets
test
[[email protected] config]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestMsg --from-beginning
Welcome to my home!!! 
Welcome to my home!!! 
Welcome to my home!!!