Flink 框架下scala與java混合程式設計問題
最近在應用Flink做相關業務設計,使用scala與java的混合程式設計,遇到一些問題,在這裡做個記錄.
問題1:
Error:(85, 23) value foreach is not a member of java.util.ArrayList[com.icsoc.report.model.Message[_]]
for (msg <- messages) {
這是由於在scala中沒有ArrayList這個類,所以在scala中需要引用java的ArrayList物件的的話,需要在scala程式碼中引入一個隱式轉換.
import scala.collection.JavaConversions._
這樣就可以在scala中直接是用java.lang.ArrayList了.
問題2:
Error:(52, 65) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.model.Message[Object]]
val inputStream: DataStream[Message[Object]] = env.addSource(flinkKafkaConsumer)
這個問題主要是在程式裡需要一個隱式引數,我們可以看大上面的addSource在Flink的原始碼實現如下
/** * Create a DataStream using a user defined source function for arbitrary * source functionality. By default sources have a parallelism of 1. * To enable parallel execution, the user defined source should implement * ParallelSourceFunction or extend RichParallelSourceFunction. * In these cases the resulting source will have the parallelism of the environment. * To change this afterwards call DataStreamSource.setParallelism(int) * */ def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = { require(function != null, "Function must not be null.") val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo)) }
在addSource定義中有一個[T: TypeInformation],但是我們的程式中並沒有指定任何有關隱式引數的定義,這時候無法建立TypeInformantion,所以出現上線的錯誤資訊.
解決方法一:我們可以直接在程式碼中加入下列程式碼:
implicit val typeInfo = TypeInformation.of(new TypeHint[Message[Object]] {})
接著我們執行程式發現,這個錯誤又出現了只是這次出現的是FlatMap上,如下圖:
Error:(59, 17) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.proxy.EventMap]
.flatMap((message: Message[Object], collector) => {
經過發現,該問題與上面的問題一樣,所以處理可以按照上面的方式一樣,在程式中國加入程式碼如下:
implicit val typeInfo1 = TypeInformation.of(classOf[EventMap])
這個方法能夠決問題,但是太繁瑣了,如果有像許隱式型別需要轉換,就需要在程式碼中加入很多這種變數.
方法二:
我們只需要在程式碼中引入
import org.apache.flink.streaming.api.scala._
如果資料是有限的(靜態資料集),我們可以引入以下包:
import org.apache.flink.api.scala._
只需要引入這個一個包,在不需要加任何程式碼,無論有多少該型別的隱式轉換,都能夠處理.
問題三:在Flink中從kafka中消費資料使用反序列化,將資料轉成我們實際時常用的資料型別,程式碼如下:
val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[Object]](
parameterTool.getRequired("input-topic"),
new MessageSchema,
parameterTool.getProperties
).assignTimestampsAndWatermarks(new CustomWatermarkExtractor)
其中最重要的是MessageSchema,其實現如下:
/*******************************************************************************
* 版權資訊:北京中通天鴻武漢分公司
* @author xuchang
* Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved.
* Description:
******************************************************************************/
public class MessageSchema implements DeserializationSchema<Message<Object>>, SerializationSchema<Message> {
@Override
public Message<Object> deserialize(byte[] bytes) {
return Message.fromString(new String(bytes));
}
@Override
public boolean isEndOfStream(Message<Object> message) {
return false;
}
@Override
public byte[] serialize(Message message) {
return message.toString().getBytes();
}
@Override
public TypeInformation<Message<Object>> getProducedType() {
return TypeInformation.of(new TypeHint<Message<Object>>() {
});
}
}
這個MessageSchema在Java編寫的Flink程式中國是沒有問題的,但是在scala編寫的Flink程式中一直報錯,如下:
Error:(41, 30) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]]
cannot be applied to (String, com.icsoc.report.flink.MessageSchema, java.util.Properties)
val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[_]](
檢視Flink的TypeInformation官方文件,在某些API中,手動建立一個TypeInformation類可能是必須的,因為Java泛型的型別擦除特性會使得Flink無法推斷資料型別.所以需要自己手動建立一個TypeInformation,修改後的程式碼如下:
/*******************************************************************************
* 版權資訊:北京中通天鴻武漢分公司
* @author xuchang
* Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved.
* Description:
******************************************************************************/
public class MessageSchema implements DeserializationSchema<Message<Object>>, SerializationSchema<Message> {
@Override
public Message<Object> deserialize(byte[] bytes) {
return Message.fromString(new String(bytes));
}
@Override
public boolean isEndOfStream(Message<Object> message) {
return false;
}
@Override
public byte[] serialize(Message message) {
return message.toString().getBytes();
}
@Override
public TypeInformation<Message<Object>> getProducedType() {
return TypeInformation.of(new TypeHint<Message<Object>>() {
});
}
}