kafka+windows+java+springboot中的配置
阿新 • • 發佈:2017-11-28
compiler expired acc color log4j2 get 臺電腦 dep 關閉
1.百度kafka+zookeeper+windows配置
1.1 zookeeper配置
dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0
1.2 kafka server.properties配置
advertised.host.name=IP log.dirs=D:/kafka_2.11-1.0.0/log zookeeper.connect=IP:2181
1.3 windows hosts配置
IP localhost
2.maven構建springboot項目
2.1 intellij idea 新建kafka項目
2.2 kafka配置pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tangxin.kafka</groupId> <artifactId>kafka</artifactId> <version>1.0</version> <name>kafka</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> <resource> <directory>src/main/resources</directory> <!--<excludes>--> <!--<exclude>*</exclude>--> <!--</excludes>--> </resource> </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <compilerArguments> <extdirs>lib</extdirs> </compilerArguments> </configuration> <version>2.3.2</version> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>utf-8</encoding> </configuration> <version>2.4.3</version> </plugin> </plugins> </build> </project>
2.3 新建springboot啟動類Application
package com.tangxin.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
2.4 新建springboot項目中resources目錄的配置文件
application.yml
server: display-name: kafka port: 8888 contextPath: /kafka spring: profiles: active: dev
application-dev.properties
kafka.bootstrap-servers=x.x.x.x:9092
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="trace" dest="/data/logs/work/log.log"> <appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout> <charset>UTF-8</charset> <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> </Console> <RollingFile name="RollingFile" fileName="/data/logs/work/work.log" filePattern="/data/logs/work/work-%d{yyyy-MM-dd}-%i.log"> <PatternLayout> <charset>UTF-8</charset> <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern> </PatternLayout> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="1000 MB"/> </Policies> <DefaultRolloverStrategy max="20"/> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> </RollingFile> <RollingFile name="ErrorFile" fileName="/data/logs/work/error.log" filePattern="/data/logs/work/error.%d{yyyy-MM-dd}.%i.log"> <PatternLayout> <charset>UTF-8</charset> <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%M%n</Pattern> </PatternLayout> <Filters> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> </Filters> <Policies> <TimeBasedTriggeringPolicy /> <SizeBasedTriggeringPolicy size="50 MB"/> </Policies> <DefaultRolloverStrategy fileIndex="min" max="100"/> </RollingFile> </appenders> <loggers> <Root level="info"> <appender-ref ref="Console"/> <appender-ref ref="RollingFile" level="info"/> </Root> <Logger name="com.tangxin.kafka"> <appender-ref ref="ErrorFile" /> </Logger> </loggers> </Configuration>
2.5 kafka配置類
package com.tangxin.kafka.service; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.bootstrap-servers}") private String bootstrapServers; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }
2.6 controller層調用kafka發送
package com.tangxin.kafka.web; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class KafkaController { @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/send", method = { RequestMethod.GET, RequestMethod.POST }) public String callFeedInfo() throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { try { kafkaTemplate.send("feed-info","1000"); } catch (Exception e) { e.printStackTrace(); } }); return "send done!"; } }
3.windows啟動zookeeper和kafka
4.遇到的問題
2017-11-27 17:55:38.484 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener - Exception thrown when sending a message with key=‘null‘ and payload=‘1‘ to topic mytopic: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for mytopic-1
之所以寫這個隨筆就是因為這個問題,本地訪問沒有問題因為本機localhost和ip映射估計沒問題,如果你兩臺電腦,一臺想做server,一臺想做開發就可能會遇到這樣的問題,開始可能你各種官方各種百度可能都無法解決這個問題,這個問題涉及hostname主機名,說實話網絡這塊確實不熟悉,之前弄hadoop和spark入門時也遇到類似問題糾結很久。總結下可能存在的。
1. 防火墻是否關閉
2.windows下是否安裝了vmware軟件,控制面板\網絡和 Internet\網絡連接 禁用vmware network adapter
3.kafka配置
advertised.host.name=IP
log.dirs=D:/kafka_2.11-1.0.0/log
zookeeper.connect=IP:2181
windows hosts配置
IP localhost
kafka+windows+java+springboot中的配置