1. 程式人生 > >該死的Kafka,遠程連接Kafka超時以及解決辦法

該死的Kafka,遠程連接Kafka超時以及解決辦法

truct nis 消息 ole struct () 重試 time out 控制臺

關於消息的發布與訂閱,之前一直使用的是activeMQ基於JMS的消息隊列進行操作的,最近聽說有一個更高效的消息的發布與訂閱技術,就是Kafka。

關於kafka的介紹,在這裏就不做過多講解了,因為我自己也不是很了解,大概就知道它與activeMQ一樣,都是具有生產者和消費者的發布與訂閱消息的機制。

具體請參見百度百科Apache Kafka。

今天我想說的就是,初遇kafka所踩的坑,非常大的坑!!

今天第一次學習Kafka,參考的是ORCHome網上的資料。

具體使用,我這裏不過多介紹,具體講我遇到的問題。因為是自學,我采用的是在centOS6.5的虛擬機上安裝的Kafka,由於新版的Kafka自帶有zookeeper,所以就直接使用了。

當我按照教程啟動玩Kafka後,並且在虛擬機服務器裏面是可以正常操作,可是使用JavaAPI遠程進行操作的時候,便一直報連接異常!

Java代碼:

package site.wangxin520.kafkatest;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "http://192.168.211.129:9092"); //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”設置將導致記錄的完整提交阻塞,最慢的,但最持久的設置。 props.put("acks", "all"); //如果請求失敗,生產者也會自動重試,即使設置成0 the producer can automatically retry. props.put("retries", 0); //The producer maintains buffers of unsent records for each partition. props.put("batch.size", 16384); //默認立即發送,這裏這是延時毫秒數 props.put("linger.ms", 1); //生產者緩沖大小,當緩沖區耗盡後,額外的發送調用將被阻塞。時間超過max.block.ms將拋出TimeoutException props.put("buffer.memory", 33554432); //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建kafka的生產者類 Producer<String, String> producer = new KafkaProducer<String, String>(props); //生產者的主要方法 producer.send(new ProducerRecord<String, String>("show", "測試Kafka")); producer.close(); } }

代碼沒問題,但是每次運行就會拋一個time out 異常,總是連接失敗。

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148)
    at java.lang.Thread.run(Thread.java:745)

解決辦法

  • 這裏需要註意的是,因為是遠程連接服務器,所以要看服務器的防火墻是否針對端口9092(默認端口)打開的,剛開始弄了很長時間,我一直沒弄好的原因是因為中午我重啟了服務器,導致防火墻又打開了。
  • 如果防火墻是正常的,就需要改變Kafka的配置:在/config/service.properties中,添加上一句host.name=192.168.211.129

技術分享

這主要是因為,kafka默認是監聽localhost的端口,如果不配置新端口名的話,就解析監聽不到消息。

現在重新啟動一下,看看是不是已經解決了。

在kafka安裝目錄,啟動自帶的zookeeper服務:

bin/zookeeper-server-start.sh config/zookeeper.properties

在同一個地方,啟動kafka服務

bin/kafka-server-start.sh config/server.properties

使用消費者客戶端,監聽show的topic,驗證是否已經啟動了Kafka

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic show --from-beginning

沒有報錯,並且現在服務器端已經在監聽狀態

技術分享

啟動Java客戶端,控制臺沒有報錯

並且在服務器端顯示了剛剛在Java客戶端發送的消息。

技術分享

解決成功!

該死的Kafka,遠程連接Kafka超時以及解決辦法