1. 程式人生 > >Flink和Kafka整合Demo以及DeserializationSchema.class找不到的解決方法

Flink和Kafka整合Demo以及DeserializationSchema.class找不到的解決方法

這裡用的是用官網提供的maven命令構建的flink1.4.0的flink-quick-start工程,具體構建工程命令如下

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.0

然後會獲取到一個java的初始工程。

下面是一個flink連線kafka的demo

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache
.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import java.util.Properties
; public class connectkafka { private static String ZOOKEEPER_HOST="your-zookeeper-cluster"; private static String KAFKA_BROKER="your-kafka-cluster"; private static String GROUP="your-comsumer-group-name"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment
(); connectkafka conn=new connectkafka(); //test message:String. Real message :Tuple3<uid,itemid,rating> Properties kafkaprops=new Properties(); kafkaprops.setProperty("zookeeper.connect",ZOOKEEPER_HOST); kafkaprops.setProperty("bootstrap.servers",KAFKA_BROKER); kafkaprops.setProperty("group.id",GROUP); DataStream<String> messagestream=env.addSource(new FlinkKafkaConsumer010<String>( "test",//這裡是你的topic name new SimpleStringSchema(), kafkaprops)); //show the message messagestream.print(); env.execute();

這樣一來簡單的demo就完成了,但是跑起來的時候會報錯,具體如下:

這裡寫圖片描述

這個錯誤怎麼解決呢?具體做法就是修改你的pom.xml檔案,將這一段的provide給註釋掉。這樣就不會報錯了,並且能完美執行。

這裡寫圖片描述

執行結果圖如下:

這裡寫圖片描述