1. 程式人生 > >Kafka 2.3 Producer (0.9以後版本適用)

Kafka 2.3 Producer (0.9以後版本適用)

kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。

這裡直接使用最新2.3版本,0.9以後的版本都適用。

注意引用的包為:org.apache.kafka.clients.producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
		properties.put("acks", "all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		kafkaProducer.send(new ProducerRecord<>("topic", "value"));
		kafkaProducer.close();

	}
	
}

0.11.0以後增加了事務,事務producer的示例程式碼如下,需要適用於0.11.0以後的版本:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionsProducerDemo {

	public static void main(String[] args) {

		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("transactional.id", "my-transactional-id");
		Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

		producer.initTransactions();

		try {
			producer.beginTransaction();
			for (int i = 0; i < 100; i++)
				producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
			producer.commitTransaction();
		} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
			// We can't recover from these exceptions, so our only option is to close the producer and exit.
			producer.close();
		} catch (KafkaException e) {
			// For all other exceptions, just abort the transaction and try again.
			producer.abortTransaction();
		}
		producer.close();

	}
	
}

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

相關推薦

Kafka 2.3 Producer (0.9以後版本適用)

kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。 這裡直接使用最新2.3版本,0.9

Kafka 0.8 Producer (0.9以前版本適用)

Kafka舊版本producer由scala編寫,0.9以後已經廢除 示例程式碼如下: import kafka.producer

凝思4.2 安裝thrift 0.9.3

bison version >= 2.5 bison 版本需大於等於2.5 刪除當前版本bison rm /usr/bin/bison rm /usr/bin/yacc rm /usr/lib/liby.a 安裝bison 3.2.1 https://ft

Kafka 0.9版本consumer客戶端使用介紹

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一

js中sum(2)(3)(4)返回9和sum(2,3)和sum(2)(3)都返回5並要求擴展性

lang itl ron var 就會 bsp tle 關於 網上 網上有很多關於sum(1)(2)(3),sum(1,2,3)之類的面試題要求輸出相同的結果6並要求可以滿足擴展,即有多個參數時也能符合題設的要求,所以自己寫了部分例子可以大概滿足這些面試題的要求 <!

ThinkPHP 3.2.3~5.0.10 快取函式設計缺陷後臺GetShell實戰

0×00 前言 ThinkPHP是為了簡化企業級應用開發和敏捷WEB應用開發而誕生的,由於其簡單易用,很多cms都基於該框架改寫。然而 Thinkphp在快取使用卻存在缺陷,生成快取時,Thinkphp會將資料序列化存進一個php檔案,這就產生了很大的安全問題。 0×01 環境搭建 工具

webstorm 2018.2.3(64位)版本啟用碼-----有效期至2019.7.31

AWAC5NN6E4-eyJsaWNlbnNlSWQiOiJBV0FDNU5ONkU0IiwibGljZW5zZWVOYW1lIjoibGIgb2QiLCJhc3NpZ25lZU5hbWUiOiIiLCJhc3NpZ25lZUVtYWlsIjoiIiwibGlj

PEACHPIE 0.9.11 版本釋出,可以上生產了

0.9.11是第一個非預覽版本,也就是說可以用於生產了,編譯本身快速且使用者友好(更好的錯誤訊息),有一個重大改進的文件(https://docs.peachpie.io/)和新的.NET Core 的 PeachPie Project ,可以和 Visual Studio(> = 2017更新6)

MySQL資料庫驅動mysql-connector-java的8.0.9-rc版本連線MySQL資料庫

  之前我的MySQL資料庫驅動mysql-connector-java版本號為5.1.34,在升級成8.0.9-rc版本後,發現原來的連線方式報錯了。故在這裡記錄一下新版本的MySQL資料庫驅動的連線使用方式。   先貼出來以前舊版本(5.1.34)的連線方

hbase1.2.3+zookeeper3.4.9+hadoop2.7.3完全分散式部署遇到的問題

啟動start-hbase.sh 後hbase沒有啟動 檢視日誌如下: ERROR [main] master.HMasterCommandLine: Master exiting java.io.IOException: Could not start ZK with 3

記一發Hive on tez的配置(Hive 3.1.1, Hadoop 3.0.3, Tez 0.9.1)

sta 麻煩 參考 手動 需要 version test log all 直接下載Tez的binary包部署安裝是有問題的,因為默認支持hadoop版本為2.7,2.7以上的就需要手動編譯了。 下載Tez源碼 CD到源碼文件夾,mvn install -Dhadoop.v

94、tensorflow實現語音識別0,1,2,3,4,5,6,7,8,9

結果 test amp building pre cti fun ner edi ‘‘‘ Created on 2017年7月23日 @author: weizhen ‘‘‘ #導入庫 from __future__ import division,print_func

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <

java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10)

前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。 1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafk

QTrace 0.2.3 版本釋出

下載地址: http://www.pc6.com/softview/SoftView_614309.html   主要修改: 1.對話方塊顯示優化 2.本地GNU搜尋優化 3.遠端搜尋,本地搜尋採用列表顯示,結果可讀性更好; 4.遠端搜尋介面優化,當搜尋錯誤時提

Redis Desktop Manager 0.9.3 版本下載(官方最新版需要訂閱,好像要給錢才行)

下載地址:https://pan.baidu.com/s/1P856NPusJLUSFwQjjPdltA 密碼: 12d3   版本是兩三個月前,我從官網下載的,然後順便存到了我的行動硬碟上。0.9.3.817.exe github 上有 redis destop

python相關軟體安裝流程圖解——Windows下安裝Redis以及視覺化工具——Redis-x64-3.2.100——redis-desktop-manager-0.9.3.817

  https://www.2cto.com/database/201708/666191.html https://github.com/MicrosoftArchive/redis/releases          

ACMNO.21 C語言-逆序輸出 輸入10個數字,然後逆序輸出。 輸入 十個整數 輸出 逆序輸出,空格分開 樣例輸入 1 2 3 4 5 6 7 8 9 0

題目描述 輸入10個數字,然後逆序輸出。 輸入 十個整數 輸出 逆序輸出,空格分開 樣例輸入 1 2 3 4 5 6 7 8 9 0 樣例輸出 0 9 8 7 6 5 4 3 2 1 提示 陣列?堆疊? 來源/分類 C語言

Kafka 0.11新版本釋出:主要的功能變更介紹:支援 EOS, 事務和冪等producer

Apache Kafka近日推出0.11版本。這是一個里程碑式的大版本,特別是Kafka從這個版本開始支援“exactly-once”語義(下稱EOS, exactly-once semantics