1. 程式人生 > >kafka+windows+java+springboot中的配置

kafka+windows+java+springboot中的配置

compiler expired acc color log4j2 get 臺電腦 dep 關閉

1.百度kafka+zookeeper+windows配置

1.1 zookeeper配置

dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

1.2 kafka server.properties配置

advertised.host.name=IP

log.dirs
=D:/kafka_2.11-1.0.0/log zookeeper.connect=IP:2181

1.3 windows hosts配置

IP localhost

2.maven構建springboot項目

2.1 intellij idea 新建kafka項目

2.2 kafka配置pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
> <modelVersion>4.0.0</modelVersion> <groupId>com.tangxin.kafka</groupId> <artifactId>kafka</artifactId> <version>1.0</version> <name>kafka</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding
> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> <resource> <directory>src/main/resources</directory> <!--<excludes>--> <!--<exclude>*</exclude>--> <!--</excludes>--> </resource> </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <compilerArguments> <extdirs>lib</extdirs> </compilerArguments> </configuration> <version>2.3.2</version> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>utf-8</encoding> </configuration> <version>2.4.3</version> </plugin> </plugins> </build> </project>

2.3 新建springboot啟動類Application

package com.tangxin.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

2.4 新建springboot項目中resources目錄的配置文件

application.yml

server:
  display-name: kafka
  port: 8888
  contextPath: /kafka
  
spring:
    profiles:
        active: dev

application-dev.properties

kafka.bootstrap-servers=x.x.x.x:9092

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="trace" dest="/data/logs/work/log.log">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
            </PatternLayout>
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
        </Console>
        <RollingFile name="RollingFile" fileName="/data/logs/work/work.log"
                     filePattern="/data/logs/work/work-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
            </PatternLayout>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="1000 MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
            <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
        </RollingFile>

        <RollingFile name="ErrorFile" fileName="/data/logs/work/error.log"  filePattern="/data/logs/work/error.%d{yyyy-MM-dd}.%i.log">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%M%n</Pattern>
            </PatternLayout>
            <Filters>
                <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/>
            </Filters>
            <Policies>
                <TimeBasedTriggeringPolicy />
                <SizeBasedTriggeringPolicy size="50 MB"/>
            </Policies>
            <DefaultRolloverStrategy fileIndex="min" max="100"/>
        </RollingFile>
    </appenders>
    <loggers>
        <Root level="info">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"  level="info"/>
        </Root>
        <Logger name="com.tangxin.kafka">
            <appender-ref ref="ErrorFile" />
        </Logger>
    </loggers>
</Configuration>

2.5 kafka配置類

package com.tangxin.kafka.service;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

2.6 controller層調用kafka發送

package com.tangxin.kafka.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping(value = "/send", method = { RequestMethod.GET, RequestMethod.POST })
    public String callFeedInfo() throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            try {
                kafkaTemplate.send("feed-info","1000");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return "send done!";
    }

}

3.windows啟動zookeeper和kafka

4.遇到的問題

2017-11-27 17:55:38.484 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener - Exception thrown when sending a message with key=‘null‘ and payload=‘1‘ to topic mytopic:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for mytopic-1

之所以寫這個隨筆就是因為這個問題,本地訪問沒有問題因為本機localhost和ip映射估計沒問題,如果你兩臺電腦,一臺想做server,一臺想做開發就可能會遇到這樣的問題,開始可能你各種官方各種百度可能都無法解決這個問題,這個問題涉及hostname主機名,說實話網絡這塊確實不熟悉,之前弄hadoop和spark入門時也遇到類似問題糾結很久。總結下可能存在的。

1. 防火墻是否關閉

2.windows下是否安裝了vmware軟件,控制面板\網絡和 Internet\網絡連接 禁用vmware network adapter

3.kafka配置

advertised.host.name=IP

log.dirs=D:/kafka_2.11-1.0.0/log

zookeeper.connect=IP:2181

windows hosts配置
IP localhost

kafka+windows+java+springboot中的配置