1. 程式人生 > >Flink 框架下scala與java混合程式設計問題

Flink 框架下scala與java混合程式設計問題

最近在應用Flink做相關業務設計,使用scala與java的混合程式設計,遇到一些問題,在這裡做個記錄.
問題1:

在這裡插入圖片描述

Error:(85, 23) value foreach is not a member of java.util.ArrayList[com.icsoc.report.model.Message[_]]
          for (msg <- messages) {

這是由於在scala中沒有ArrayList這個類,所以在scala中需要引用java的ArrayList物件的的話,需要在scala程式碼中引入一個隱式轉換.
import scala.collection.JavaConversions._
這樣就可以在scala中直接是用java.lang.ArrayList了.
問題2:
在這裡插入圖片描述


Error:(52, 65) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.model.Message[Object]]
val inputStream: DataStream[Message[Object]] = env.addSource(flinkKafkaConsumer)
這個問題主要是在程式裡需要一個隱式引數,我們可以看大上面的addSource在Flink的原始碼實現如下

/**
   * Create a DataStream using a user defined source function for arbitrary
   * source functionality. By default sources have a parallelism of 1. 
   * To enable parallel execution, the user defined source should implement 
   * ParallelSourceFunction or extend RichParallelSourceFunction. 
   * In these cases the resulting source will have the parallelism of the environment. 
   * To change this afterwards call DataStreamSource.setParallelism(int)
   *
   */
  def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
    require(function != null, "Function must not be null.")
    
    val cleanFun = scalaClean(function)
    val typeInfo = implicitly[TypeInformation[T]]
    asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo))
  }

在addSource定義中有一個[T: TypeInformation],但是我們的程式中並沒有指定任何有關隱式引數的定義,這時候無法建立TypeInformantion,所以出現上線的錯誤資訊.
解決方法一:我們可以直接在程式碼中加入下列程式碼:
implicit val typeInfo = TypeInformation.of(new TypeHint[Message[Object]] {})
在這裡插入圖片描述
接著我們執行程式發現,這個錯誤又出現了只是這次出現的是FlatMap上,如下圖:

Error:(59, 17) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.proxy.EventMap]
        .flatMap((message: Message[Object], collector) => {

經過發現,該問題與上面的問題一樣,所以處理可以按照上面的方式一樣,在程式中國加入程式碼如下:

    implicit val typeInfo1 = TypeInformation.of(classOf[EventMap])

這個方法能夠決問題,但是太繁瑣了,如果有像許隱式型別需要轉換,就需要在程式碼中加入很多這種變數.
方法二:
我們只需要在程式碼中引入

import org.apache.flink.streaming.api.scala._

如果資料是有限的(靜態資料集),我們可以引入以下包:

import org.apache.flink.api.scala._

只需要引入這個一個包,在不需要加任何程式碼,無論有多少該型別的隱式轉換,都能夠處理.
問題三:在Flink中從kafka中消費資料使用反序列化,將資料轉成我們實際時常用的資料型別,程式碼如下:

   val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[Object]](
      parameterTool.getRequired("input-topic"),
      new MessageSchema,
      parameterTool.getProperties
    ).assignTimestampsAndWatermarks(new CustomWatermarkExtractor)

其中最重要的是MessageSchema,其實現如下:

/*******************************************************************************
 * 版權資訊:北京中通天鴻武漢分公司
 * @author xuchang
 * Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved.
 * Description:
 ******************************************************************************/
public class MessageSchema implements DeserializationSchema<Message<Object>>, SerializationSchema<Message> {
    @Override
    public Message<Object> deserialize(byte[] bytes) {
        return Message.fromString(new String(bytes));
    }

    @Override
    public boolean isEndOfStream(Message<Object> message) {
        return false;
    }

    @Override
    public byte[] serialize(Message message) {
        return message.toString().getBytes();
    }

    @Override
    public TypeInformation<Message<Object>> getProducedType() {
        return TypeInformation.of(new TypeHint<Message<Object>>() {
        });
    }
}

這個MessageSchema在Java編寫的Flink程式中國是沒有問題的,但是在scala編寫的Flink程式中一直報錯,如下:

Error:(41, 30) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
  (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] <and>
  (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]]
 cannot be applied to (String, com.icsoc.report.flink.MessageSchema, java.util.Properties)
    val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[_]](

檢視Flink的TypeInformation官方文件,在某些API中,手動建立一個TypeInformation類可能是必須的,因為Java泛型的型別擦除特性會使得Flink無法推斷資料型別.所以需要自己手動建立一個TypeInformation,修改後的程式碼如下:

/*******************************************************************************
 * 版權資訊:北京中通天鴻武漢分公司
 * @author xuchang
 * Copyright: Copyright (c) 2007北京中通天鴻武漢分公司,Inc.All Rights Reserved.
 * Description:
 ******************************************************************************/
public class MessageSchema implements DeserializationSchema<Message<Object>>, SerializationSchema<Message> {
    @Override
    public Message<Object> deserialize(byte[] bytes) {
        return Message.fromString(new String(bytes));
    }

    @Override
    public boolean isEndOfStream(Message<Object> message) {
        return false;
    }

    @Override
    public byte[] serialize(Message message) {
        return message.toString().getBytes();
    }

    @Override
    public TypeInformation<Message<Object>> getProducedType() {
        return TypeInformation.of(new TypeHint<Message<Object>>() {
        });
    }
}