1. 程式人生 > >kafka客戶端類

kafka客戶端類

我的kafka客戶端測試類


一個專案中實際用到的跨平臺的kafka客戶端類,vs2013和sentos7實際測試可用

客戶端

封裝幾個簡單的函式,給自己留備份,僅供參考。
是針對kafka叢集的客戶端,而且根據id,做雜湊計算後,寫入對應的kafka分割槽,本例子中kafka分割槽預設為5,複製貼上的同學自己修改程式碼。

標頭檔案"kafka_control.h"


#ifndef KAFKA_CONTROL_H
#define KAFKA_CONTROL_H #ifdef _WIN32 #include ".\librdkafka\include\rdkafka.h" #else #include "/home/xby/librdkafka-master/src/rdkafka.h" #endif #include <string.h> class kafka_control { private: rd_kafka_t *rk; /*Producer instance handle*/ rd_kafka_topic_t *rkt; /*topic物件*/ rd_kafka_conf_t *
conf; /*臨時配置物件*/ //向kafka傳送資料,引數為:① Hadoopid;②要傳送的資料 int SendData(const char* pcWtgHadoopId, const char *pcData, int nPartition); size_t my_Hash(const char* pcPara); static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque); public: //初始化kafka,pcBrobers是kafka的ip叢集 int
InitBrokers(const char *topic, const char *pcBrokers); //傳送風機scada資料 int SendWtgScadaData(const char* pcWtgHadoopId, const char *pcData); //傳送原始資料(或叫波形資料),引數為:① Hadoopid;②要傳送的資料 int SendWaveFormData(const char* pcWtgHadoopId, const char *pcData); //傳送預警報警資料 int SendAlarmData( const char *pcData); //傳送指標資料(或叫特徵資料) int SendCharacterData(const char *pcData); }; #endif

原始檔"kafka_control.cpp"

// An highlighted block
#ifdef _WIN64 
#pragma comment(lib, ".\\librdkafka\\dll\\x64\\librdkafka.lib")
#else
#pragma comment(lib, ".\\librdkafka\\dll\\x86\\librdkafka.lib")
#endif

#pragma comment(lib,"wsock32.lib")


#include "kafka_control.h"



void kafka_control::dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
	if (rkmessage->err)
		fprintf(stderr, "%% Message delivery failed: %s\n",
		rd_kafka_err2str(rkmessage->err));
	else
	{
/*fprintf(stderr,
		"%% Message delivered (%zd bytes, "
		"partition %"PRId32")\n",
		rkmessage->len, rkmessage->partition);*/
		
	/* rkmessage被librdkafka自動銷燬*/

	//printf("send kafka successed ,partition is %d,data len is %d. \r\n", rkmessage->partition, rkmessage->len);
	}
		
}


int kafka_control::InitBrokers(const char *topic, const char *pcBrokers)
{
	if (topic == NULL || pcBrokers == NULL){
		printf("config info have an error :topic name is '%s' ,brokers is '%s'", topic, pcBrokers);
		return -1;
	}
#ifdef _WIN32

	WORD wVerisonRequested;
	WSADATA wsaData;
	int err;
	wVerisonRequested = MAKEWORD(1, 1);
	err = WSAStartup(wVerisonRequested, &wsaData);
	if (err != 0){
		return -1;
	}
#endif
	//rd_kafka_t *rk;            /*Producer instance handle*/
	//rd_kafka_topic_t *rkt;     /*topic物件*/
	//rd_kafka_conf_t *conf;     /*臨時配置物件*/
	char errstr[512];
	char brokers[512];
	//const char *topic;
	//CMS資料接入測試環境 TOPIC
	//原始資料:  --partitions 5 --topic cmsdata_test
	//	指標資料:  --partitions 1 --topic cmsmetrics_test
	//	預警資料:  --partitions 1 --topic cmsalarm_test


	//	CMS資料接入生產環境 TOPIC
	//	原始資料:  --partitions 5 --topic cmsdata
	//	指標資料:  --partitions 1 --topic cmsmetrics
	//	預警資料:  --partitions 1 --topic cmsalarm

	//192.168.190.152:9092,192.168.190.152 : 9092,192.168.190.152 : 9092
	"192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092";
	//brokers = "192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092";
	strcpy(brokers,pcBrokers);
	//brokers = "hadoop2:9092,hadoop3:9092,hadoop4:9092";
	//	topic = "scada_test";
	//const char *topic = "cmsalarm_test";//cmsdata_test //cmsmetrics_test

	/* 建立一個kafka配置佔位 */
	conf = rd_kafka_conf_new();

	/*建立broker叢集*/
	if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
		sizeof(errstr)) != RD_KAFKA_CONF_OK){
		fprintf(stderr, "%s\n", errstr);
		return 1;
	}

	/*設定傳送報告回撥函式,rd_kafka_produce()接收的每條訊息都會呼叫一次該回調函式
	*應用程式需要定期呼叫rd_kafka_poll()來服務排隊的傳送報告回撥函式*/
	rd_kafka_conf_set_dr_msg_cb(conf, this->dr_msg_cb);

	/*建立producer例項
	rd_kafka_new()獲取conf物件的所有權,應用程式在此呼叫之後不得再次引用它*/
	rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
	if (!rk){
		fprintf(stderr, " Failed to create new producer:%s\r\n", errstr);
		return 1;
	}

	/*例項化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic
	物件儲存topic特定的配置,並在內部填充所有可用分割槽和leader brokers,*/
	rkt = rd_kafka_topic_new(rk, topic, NULL);
	if (!rkt)
	{
		fprintf(stderr, "%% Failed to create topic object: %s\n",
			rd_kafka_err2str(rd_kafka_last_error()));
		rd_kafka_destroy(rk);
		return 1;
	}

	return 0;
}

int kafka_control::SendAlarmData(const char *pcData)
{
	this->SendData(NULL, pcData, 0);
	return 0;
}
int kafka_control::SendCharacterData(const char *pcData)
{
	this->SendData(NULL, pcData, 0);
	return 0;
}
int kafka_control::SendWaveFormData(const char* pcWtgHadoopId, const char *pcData)
{
	this->SendData(pcWtgHadoopId, pcData, 5);
	return 0;
}
int kafka_control::SendWtgScadaData(const char* pcWtgHadoopId, const char *pcData)
{
	this->SendData(pcWtgHadoopId, pcData, 5);
	return 0;
}
int kafka_control::SendData(const char* pcWtgHadoopId, const char *pcData, int nPartition)
{
	const char* pcTopic = rd_kafka_topic_name(this->rkt);
	int nPartition1 = 0;
	if (pcWtgHadoopId == NULL){
		nPartition1 = 0;
	}
	else{
		if (nPartition <= 0){
			printf("kafka nPartition have error\r\n");
		}
		 size_t nHashValue = this->my_Hash(pcWtgHadoopId);
		 nPartition1 = nHashValue % nPartition;
	}

	int len = (int)strlen(pcData);
	if (-1 == rd_kafka_produce(
		/* Topic object */
		rkt,
		/*使用內建的分割槽來選擇分割槽*/
		nPartition1,
		//1,
		/*生成payload的副本*/
		RD_KAFKA_MSG_F_COPY,
		/*訊息體和長度*/
		(void*)pcData, len,
		/*可選鍵及其長度*/
		NULL, 0,
		NULL))
	{
		fprintf(stderr,
			"%% Failed to produce to topic %s: %s\n",
			rd_kafka_topic_name(rkt),
			rd_kafka_err2str(rd_kafka_last_error()));

		if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
			/*如果內部佇列滿,等待訊息傳輸完成並retry,
			內部隊列表示要傳送的訊息和已傳送或失敗的訊息,
			內部佇列受限於queue.buffering.max.messages配置項*/
			rd_kafka_poll(rk, 1000);

		}
	}
	else{
		//fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",
		//len, rd_kafka_topic_name(rkt));

	}

	/*producer應用程式應不斷地通過以頻繁的間隔呼叫rd_kafka_poll()來為
	傳送報告佇列提供服務。在沒有生成訊息以確定先前生成的訊息已傳送了其
	傳送報告回撥函式(和其他註冊過的回撥函式)期間,要確保rd_kafka_poll()
	仍然被呼叫*/
	rd_kafka_poll(rk, 0);

	return 0;
}

size_t kafka_control::my_Hash(const char* pcPara)
{
	/*std::hash<const char*> h1;
	size_t nR = h1(pcPara);
	return nR;*/
	int i, l;
	unsigned long ret = 0;
	unsigned short *s;
	if (pcPara == NULL) return(0);
	l = (strlen(pcPara) + 1) / 2;
	s = (unsigned short *)pcPara;
	for (i = 0; i<l; i++)
		ret ^= (s[i] << (i & 0x0f));
	return(ret);
}

例子:

// An highlighted block
	//kafka
	kafka_control* pkafka_class = new kafka_control();
	printf("");
#ifdef CONVERTER
	pkafka_class->InitBrokers("cvtdata","192.168.190.152:9092,192.168.190.153:9092,192.168.190.154:9092");
	pkafka_class->SendWtgScadaData("HadoopId", "data");

部分內容參考了下面文章:
他的更簡練,我的符合自己的需求
https://blog.csdn.net/lijinqi1987/article/details/76582067