1. 程式人生 > >Kafka 0.8 Producer (0.9以前版本適用)

Kafka 0.8 Producer (0.9以前版本適用)

Kafka舊版本producer由scala編寫,0.9以後已經廢除

示例程式碼如下:

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties;

public class ProducerDemo {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
		properties.put("serializer.class", "kafka.serializer.StringEncoder");
		properties.put("request.requird.acks", "1");
		ProducerConfig config = new ProducerConfig(properties);
		Producer<String, String> producer = new Producer<String, String>(config);
		KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","key","hello");
		producer.send(msg);
	}
	
}

自定義partition示例程式碼如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }

}

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

相關推薦

Kafka 0.8 Producer (0.9以前版本適用)

Kafka舊版本producer由scala編寫,0.9以後已經廢除 示例程式碼如下: import kafka.producer

Kafka 2.3 Producer (0.9以後版本適用)

kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。 這裡直接使用最新2.3版本,0.9

Spark Streaming 和kafka 整合指導(kafka 0.8.2.1 或以上版本

本節介紹一下如何配置Spark Streaming 來接收kafka的資料。有兩個方法: 1、老的方法 -使用Receivers 和kafka的高階API 2、新的方法( Spark 1.3 開始引入)-不適用Receivers。這兩個方式擁有不同的程式設計模型,效能特徵

Dynamics 365 on-premises9.0版本開放下載,附上8.2升級9.0過程

     官網終於在11月2號放出了9.0 on-premises的下載連結,有人關心說這次會不會像8.2那樣是一個升級補丁,顯然不是,是有獨立安裝包的,從下載頁面看到需要的作業系統是windows server2016,但沒提到sql的版本,一開始去查了官網資料,

storm整合kafka新版API(0.8版本之後)

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&l

Dynamics 365 on-premises9.0版本開放下載,附上8.2升級9.0過程

     官網終於在11月2號放出了9.0 on-premises的下載連結,有人關心說這次會不會像8.2那樣是一個升級補丁,顯然不是,是有獨立安裝包的,從下載頁面看到需要的作業系統是windows server2016,但沒提到sql的版本,一開始去查了官網資料,但應該是還

MySQL資料庫驅動mysql-connector-java的8.0.9-rc版本連線MySQL資料庫

  之前我的MySQL資料庫驅動mysql-connector-java版本號為5.1.34,在升級成8.0.9-rc版本後,發現原來的連線方式報錯了。故在這裡記錄一下新版本的MySQL資料庫驅動的連線使用方式。   先貼出來以前舊版本(5.1.34)的連線方

碩思閃客精靈6.0破解版(含7.0破解補丁)6.0/7.0/8.0/9.0版本

碩思閃客精靈6.0破解版(含7.0破解補丁)6.0/7.0/8.0/9.0版本,他可以幫助你把網上下載的FLASH進行反編。 碩思閃客精靈破解版下載 碩思閃客精靈註冊碼版下載 然後通過flash專業進行編輯,改成自己想要的畫面或者文字以及連結等。他能夠輕鬆反編譯一個或是多個SWF格式檔案為FLA/FLE

Kafka 0.9版本consumer客戶端使用介紹

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一

kafka文件(3)----0.8.2-kafka API(java版本

    Apache Kafka包含新的java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了相容性,它們仍將存在一段時間。可以通過一些單獨的jar包呼叫這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。 一、Producer

kafka0.8版本)刪除主題(沒有在配置文件中配置的情況下)

per ble 配置文件 top 標記 屬性 con conf zkcli 在沒有配置kafka 刪除屬性的情況下 使用刪除主題命令 ./bin/kafka-topics.sh --delete --zookeeper 192.168.28.131:2181,192.

Ubuntu 18.04上CUDA 9.0、cuDNN7.0及Tensorflow 1.8的安裝

http amd64 時間 com ++ dnn 7 清華 配置 示例 配置 筆者使用Dell Inspiron 7559筆記本電腦,顯卡為NVIDIA GTX 960M。 目標 由於本機顯卡僅有nvidia-384驅動包能夠良好支持(nvidia-387、nvidia-3

Windows 7 JDK 1.8.171 + Tomcat 9.0.8 環境配置

windows tomcat環境 windows tomcat 安裝服務 Windows環境 C:\>systeminfo|find "OS" OS 名稱: Microsoft Windows 7 專業版 OS 版本: 6.1.7601 Service Pac

Windows基礎環境_安裝配置教程(Windows7 64、JDK1.8、Android SDK23.0、TortoiseSVN 1.9.5)

tools 直接 x86_64 ase php JD network not using Windows基礎環境_安裝配置教程(Windows7 64、JDK1.8、Android SDK23.0、TortoiseSVN 1.9.5) 安裝包版本 1) JDK版

12-帶內管理、帶外管理 //0.8.6(GNS3版本

console 清除 關閉 nag gin 運行 tex password size 帶內管理、帶外管理區別:流量在一根線上走就是帶內管理,有Console線就是帶外管理![]一、實驗拓撲:二、實驗要求:1、R1、R2、R3運行EIGRP 90協議,並配置各自的Loopba

個人筆記本安裝多個jdk(jdk1.7,jdk1.8,jdk1.9,jdk10.0)出現的問題

oracle java 個人筆記 選擇 bubuko 1.9 style 輸入 data 1、個人筆記本已經安裝jdk1.7,jdk1.8,(之前沒有在意這個問題)。最近想學習jdk10.0,安裝以後,環境變量變成了jdk10.0,就是cmd輸入命令java -versio

ijkplayer修改動態庫名稱(目前最新版本0.8.8

       瞭解ijkplayer的朋友都知道,底層有三個動態庫,分別是libijkffmpeg.so、libijksdl.so、libijkplayer.so。一般出於避免重名衝突和便於區分的考慮都會修改它們的名稱。接下來我就依次介紹如何修改三個動態庫

Android5.0 6.0 7.0 8.0 9.0 新特性總結

**5.0** Material Design 支援多種裝置 全新通知中心 支援 64 位 ART 虛擬機器 電池續航改進 全新“最近應用程式” 安全性改進 不同資料獨立儲存 改進搜尋 支援藍芽 4.1、USB Audio

Kafka0.10 Producer 新增timestamp 以及使用配置

轉自:https://segmentfault.com/a/1190000008674900 本文目錄結構: Producer API入門 非同步傳送流程 Producer設計說明 Producer Configuration 1. Produ

1.8 字典 1.9 字典練習 2.0/2.1 流程控制-if條件判斷

字典 字典是python中的唯一的對映型別(雜湊表) 字典物件是可變的,但是字典的鍵必須使用不可變物件,一個字典中可以使用不同型別的鍵值。 字典是無序的 字典的方法: - keys() //以list的形式返回所有key - values() - items() - get() //返回