kafka客戶端類
阿新 • • 發佈:2018-11-01
我的kafka客戶端測試類
一個專案中實際用到的跨平臺的kafka客戶端類,vs2013和sentos7實際測試可用
客戶端
封裝幾個簡單的函式,給自己留備份,僅供參考。
是針對kafka叢集的客戶端,而且根據id,做雜湊計算後,寫入對應的kafka分割槽,本例子中kafka分割槽預設為5,複製貼上的同學自己修改程式碼。
標頭檔案"kafka_control.h"
#ifndef KAFKA_CONTROL_H
#define KAFKA_CONTROL_H
#ifdef _WIN32
#include ".\librdkafka\include\rdkafka.h"
#else
#include "/home/xby/librdkafka-master/src/rdkafka.h"
#endif
#include <string.h>
class kafka_control
{
private:
rd_kafka_t *rk; /*Producer instance handle*/
rd_kafka_topic_t *rkt; /*topic物件*/
rd_kafka_conf_t * conf; /*臨時配置物件*/
//向kafka傳送資料,引數為:① Hadoopid;②要傳送的資料
int SendData(const char* pcWtgHadoopId, const char *pcData, int nPartition);
size_t my_Hash(const char* pcPara);
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque);
public:
//初始化kafka,pcBrobers是kafka的ip叢集
int InitBrokers(const char *topic, const char *pcBrokers);
//傳送風機scada資料
int SendWtgScadaData(const char* pcWtgHadoopId, const char *pcData);
//傳送原始資料(或叫波形資料),引數為:① Hadoopid;②要傳送的資料
int SendWaveFormData(const char* pcWtgHadoopId, const char *pcData);
//傳送預警報警資料
int SendAlarmData( const char *pcData);
//傳送指標資料(或叫特徵資料)
int SendCharacterData(const char *pcData);
};
#endif
原始檔"kafka_control.cpp"
// An highlighted block
#ifdef _WIN64
#pragma comment(lib, ".\\librdkafka\\dll\\x64\\librdkafka.lib")
#else
#pragma comment(lib, ".\\librdkafka\\dll\\x86\\librdkafka.lib")
#endif
#pragma comment(lib,"wsock32.lib")
#include "kafka_control.h"
void kafka_control::dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
{
/*fprintf(stderr,
"%% Message delivered (%zd bytes, "
"partition %"PRId32")\n",
rkmessage->len, rkmessage->partition);*/
/* rkmessage被librdkafka自動銷燬*/
//printf("send kafka successed ,partition is %d,data len is %d. \r\n", rkmessage->partition, rkmessage->len);
}
}
int kafka_control::InitBrokers(const char *topic, const char *pcBrokers)
{
if (topic == NULL || pcBrokers == NULL){
printf("config info have an error :topic name is '%s' ,brokers is '%s'", topic, pcBrokers);
return -1;
}
#ifdef _WIN32
WORD wVerisonRequested;
WSADATA wsaData;
int err;
wVerisonRequested = MAKEWORD(1, 1);
err = WSAStartup(wVerisonRequested, &wsaData);
if (err != 0){
return -1;
}
#endif
//rd_kafka_t *rk; /*Producer instance handle*/
//rd_kafka_topic_t *rkt; /*topic物件*/
//rd_kafka_conf_t *conf; /*臨時配置物件*/
char errstr[512];
char brokers[512];
//const char *topic;
//CMS資料接入測試環境 TOPIC
//原始資料: --partitions 5 --topic cmsdata_test
// 指標資料: --partitions 1 --topic cmsmetrics_test
// 預警資料: --partitions 1 --topic cmsalarm_test
// CMS資料接入生產環境 TOPIC
// 原始資料: --partitions 5 --topic cmsdata
// 指標資料: --partitions 1 --topic cmsmetrics
// 預警資料: --partitions 1 --topic cmsalarm
//192.168.190.152:9092,192.168.190.152 : 9092,192.168.190.152 : 9092
"192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092";
//brokers = "192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092";
strcpy(brokers,pcBrokers);
//brokers = "hadoop2:9092,hadoop3:9092,hadoop4:9092";
// topic = "scada_test";
//const char *topic = "cmsalarm_test";//cmsdata_test //cmsmetrics_test
/* 建立一個kafka配置佔位 */
conf = rd_kafka_conf_new();
/*建立broker叢集*/
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK){
fprintf(stderr, "%s\n", errstr);
return 1;
}
/*設定傳送報告回撥函式,rd_kafka_produce()接收的每條訊息都會呼叫一次該回調函式
*應用程式需要定期呼叫rd_kafka_poll()來服務排隊的傳送報告回撥函式*/
rd_kafka_conf_set_dr_msg_cb(conf, this->dr_msg_cb);
/*建立producer例項
rd_kafka_new()獲取conf物件的所有權,應用程式在此呼叫之後不得再次引用它*/
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk){
fprintf(stderr, " Failed to create new producer:%s\r\n", errstr);
return 1;
}
/*例項化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic
物件儲存topic特定的配置,並在內部填充所有可用分割槽和leader brokers,*/
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt)
{
fprintf(stderr, "%% Failed to create topic object: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
return 0;
}
int kafka_control::SendAlarmData(const char *pcData)
{
this->SendData(NULL, pcData, 0);
return 0;
}
int kafka_control::SendCharacterData(const char *pcData)
{
this->SendData(NULL, pcData, 0);
return 0;
}
int kafka_control::SendWaveFormData(const char* pcWtgHadoopId, const char *pcData)
{
this->SendData(pcWtgHadoopId, pcData, 5);
return 0;
}
int kafka_control::SendWtgScadaData(const char* pcWtgHadoopId, const char *pcData)
{
this->SendData(pcWtgHadoopId, pcData, 5);
return 0;
}
int kafka_control::SendData(const char* pcWtgHadoopId, const char *pcData, int nPartition)
{
const char* pcTopic = rd_kafka_topic_name(this->rkt);
int nPartition1 = 0;
if (pcWtgHadoopId == NULL){
nPartition1 = 0;
}
else{
if (nPartition <= 0){
printf("kafka nPartition have error\r\n");
}
size_t nHashValue = this->my_Hash(pcWtgHadoopId);
nPartition1 = nHashValue % nPartition;
}
int len = (int)strlen(pcData);
if (-1 == rd_kafka_produce(
/* Topic object */
rkt,
/*使用內建的分割槽來選擇分割槽*/
nPartition1,
//1,
/*生成payload的副本*/
RD_KAFKA_MSG_F_COPY,
/*訊息體和長度*/
(void*)pcData, len,
/*可選鍵及其長度*/
NULL, 0,
NULL))
{
fprintf(stderr,
"%% Failed to produce to topic %s: %s\n",
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error()));
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
/*如果內部佇列滿,等待訊息傳輸完成並retry,
內部隊列表示要傳送的訊息和已傳送或失敗的訊息,
內部佇列受限於queue.buffering.max.messages配置項*/
rd_kafka_poll(rk, 1000);
}
}
else{
//fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",
//len, rd_kafka_topic_name(rkt));
}
/*producer應用程式應不斷地通過以頻繁的間隔呼叫rd_kafka_poll()來為
傳送報告佇列提供服務。在沒有生成訊息以確定先前生成的訊息已傳送了其
傳送報告回撥函式(和其他註冊過的回撥函式)期間,要確保rd_kafka_poll()
仍然被呼叫*/
rd_kafka_poll(rk, 0);
return 0;
}
size_t kafka_control::my_Hash(const char* pcPara)
{
/*std::hash<const char*> h1;
size_t nR = h1(pcPara);
return nR;*/
int i, l;
unsigned long ret = 0;
unsigned short *s;
if (pcPara == NULL) return(0);
l = (strlen(pcPara) + 1) / 2;
s = (unsigned short *)pcPara;
for (i = 0; i<l; i++)
ret ^= (s[i] << (i & 0x0f));
return(ret);
}
例子:
// An highlighted block
//kafka
kafka_control* pkafka_class = new kafka_control();
printf("");
#ifdef CONVERTER
pkafka_class->InitBrokers("cvtdata","192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092");
pkafka_class->SendWtgScadaData("HadoopId", "data");
部分內容參考了下面文章:
他的更簡練,我的符合自己的需求
https://blog.csdn.net/lijinqi1987/article/details/76582067