1. 程式人生 > >kafka集群配置和java編寫生產者消費者操作例子

kafka集群配置和java編寫生產者消費者操作例子

tor http dep org create comm getname fig exp

  • kafka
    • 安裝
    • 修改配置文件
    • java操作kafka

kafka

kafka的操作相對來說簡單很多

安裝

下載kafka http://kafka.apache.org/downloads

tar -zxvf kafka_2.12-2.1.0.tgz
rm kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 kafka

sudo vim /etc/profile
    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

準備 worker1 worker2 worker3 這四臺機器

首先確保你的zookeeper集群能夠正常運行worker1 worker2 worker3為zk集群
具體配置參照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html

修改配置文件

  1. server.properties

    sudo vim server.properties
    添加如下屬性
    broker.id=0 # 3臺機器分別設置成0 1 2
    log.dirs=/usr/local/kafka/logs
    zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  2. 運行

    運行 
        bin/kafka-server-start.sh config/server.properties
    創建topic 
        bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test
    查看topic
        bin/kafka-topics.sh --list --zookeeper worker1:2181
    訂閱topic,利用worker2來訂閱
        bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning
    topic發送消息
        bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test
        鍵入任何消息,worker2都能接收到
    查看topic詳情
        bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test

java操作kafka

  1. 依賴

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
  2. 生產者

    public class Producer 
    {
        public static void main( String[] args ){
            Properties props = new Properties();
            // 服務器ip
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
            // 屬性鍵值對都序列化成字符串
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 創建一個生產者,向test主題發送數據
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            producer.send(new ProducerRecord<String, String>("test", "生產者傳遞的消息"));
            producer.close();
        }
    }
  3. 消費者

    public class Consumer 
    {
        public static void main( String[] args ){
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 消費者對象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between(
                LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now()));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }
        }
    }

kafka集群配置和java編寫生產者消費者操作例子