1. 程式人生 > >Apache Flink 各類關鍵資料格式讀取/SQL支援

Apache Flink 各類關鍵資料格式讀取/SQL支援

目前事件歸併分為兩種,一種為實時的歸併,即基於Kafka內的資料進行歸併和事件生成;一種是週期性的歸併,即基於Hive中的資料進行資料的歸併和事件生成。

基於SQL歸併時Spark Streaming支援的輸入/輸出資料如下:

資料型別

Flink支援情況

Kafka 需要定義schema
HDFS(parquet/csv/textfile)

讀取parquet需要使用AvroParquetInputFormat

csv/textfile有readCsvFile和TextFileInput

Hive

1.需要啟用hive service metastore來提供thrift metastore介面

2.需要依賴flink-hcatalog來進行讀取

JDBC(PostgreSQL) JDBCInputFormat

下面就Apache Flink是否支援上述格式進行測試。

1.Kafka

首先需要定義一個POJO類,用於代表從kafka讀取的dstream裡的內容:

package com.flinklearn.models;
 
/**
 * Created by dell on 2018/10/23.
 */
public class TamAlert {
    private String msg;
 
    public TamAlert(){}
 
    public String getMsg() {
        return msg;
    }
 
    public void setMsg(String msg) {
        this.msg = msg;
    }
}

其次,在Flink的DataStream上執行SQL與Spark比較不同,對於Spark而言一直是stream的transform、registerTempTable動作,而在Flink上需要將DataStream轉換為Table,才能執行相關SQL,而如果要進行transform需要再次將Table轉為DataFrame才可以。

程式碼如下:

package com.flinklearn.main
 
import java.util.Properties
 
import com.alibaba.fastjson.{JSON}
import com.flinklearn.models.TamAlert
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
 
import scala.collection.mutable.ArrayBuffer
 
/**
 * Created by dell on 2018/10/22.
 */
class Main {
  def startApp(): Unit = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "brokerserver")
    properties.setProperty("group.id", "com.flinklearn.main.Main")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    //從kafka讀取資料,得到stream
    val stream:DataStream[TamAlert] = env
      .addSource(new FlinkKafkaConsumer010[String]("mytopic", new SimpleStringSchema(), properties))
      .map(line => {
        var rtn:TamAlert = null
        try{
          val temp = JSON.parseObject(line).getJSONObject("payload")
          rtn = new TamAlert()
          rtn.setMsg(temp.getString("msg"))
        }catch{
          case ex:Exception => {
            ex.printStackTrace()
          }
        }
        rtn
      }).filter(line=>line!=null)
 
    //將stream註冊為temp_alert表,並列印msg欄位
    val tableEnv:StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerDataStream("temp_alert", stream,
      'msg)
    val httpTable  = tableEnv.sqlQuery("select msg from temp_alert")
 
    val httpStream = tableEnv.toAppendStream[(String,String,Integer)](httpTable)
    httpStream.print()
 
    env.execute("Kafka sql test.")
  }
}
object Main {
  def main(args:Array[String]):Unit = {
    new Main().startApp()
  }
}

2.HDFS

2.1 Parquet

對於HDFS Parquet格式的資料,Apache Flink並不如Spark一般有十分方便的read.parquet()介面,需要藉助AvroParquetInputFormat來讀取相應檔案。具體操作步驟如下:

1.在pom.xml中引入相應的依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.10.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-mapreduce-client-core</artifactId>
  <version>3.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>3.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

2.使用avsc檔案定義schema

{"namespace": "com.flinklearn.models",
 "type": "record",
 "name": "AvroTamAlert",
 "fields": [
    {"name": "msg", "type": ["string","null"]}
 ]
}

3.使用avro-tools生成對應的java類,並將java檔案拷貝到專案裡,本例子中是AvroTamAlert.java:

4.使用AvroParquetInputFormat來讀取parquet檔案:

package com.flinklearn.main
 
import java.util.Arrays
 
import com.flinklearn.models.{AvroTamAlert}
import org.apache.avro.Schema
import org.apache.avro.util.Utf8
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.{ExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat}
import org.apache.flink.api.scala._
import org.apache.parquet.avro.AvroParquetInputFormat
import org.apache.flink.table.api.scala._
 
/**
 * Created by dell on 2018/10/23.
 */
class Main {
  def startApp(): Unit ={
    val env = ExecutionEnvironment.getExecutionEnvironment
 
    val job = Job.getInstance()
    val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
    FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/xx.db/yy"))
 
    AvroParquetInputFormat.setAvroReadSchema(job, AvroTamAlert.getClassSchema)
 
    val dataset = env.createInput(dIf).map(line=>line.f1).map(line=>(line.getSip.toString,line.getDip.toString,line.getDport))
 
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerDataSet("tmp_table", dataset, 'msg)
 
    val table = tableEnv.sqlQuery("select msg from tmp_table")
    tableEnv.toDataSet[(String,String,Integer)](table).print()
 
    env.execute("start hdfs parquet test")
  }
}
object Main {
  def main(args:Array[String]):Unit = {
    new Main().startApp()
  }
}

2.2 CSV

需要新增的引數在2.3小節中。

package com.flinklearn.main
 
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
/**
 * Created by dell on 2018/10/25.
 */
object Main {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataset:DataSet[(String,Integer)] = env.readCsvFile("hdfs://ip:8020/mytest")
 
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerDataSet("tmp_table", dataset, 'name, 'num)
    val table = tableEnv.sqlQuery("select name,num from tmp_table")
 
    val rtnDataset = tableEnv.toDataSet[(String,Integer)](table)
    rtnDataset.print()
 
    env.execute("test hdfs csvfile")
  }
}

2.3 TextFile

有幾個關鍵的引數必須加到flink-conf.yaml檔案中:

第一個引數指定Hadoop的配置檔案

第二個引數指定模式為舊模式,因為flink1.6.1是用的scala2.11,使用scala介面會存在一定的問題(報jobgraph生成失敗,目前還不清楚具體原因)

第三個引數指定類載入順序(如果不指定,會報hdfs 不可讀取塊錯誤)

同時,需要將flink-hadoop-compatibility_2.11-1.6.1.jar放到flink/lib資料夾下,pom裡打包沒有用。

以上操作做完,就可以正確的讀取hdfs上的檔案了:

package com.flinklearn.main
 
import org.apache.flink.api.java.io.RowCsvInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.table.api.TableEnvironment
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{CombineTextInputFormat, TextInputFormat, FileInputFormat}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
/**
 * Created by dell on 2018/10/25.
 */
object Main {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataset:DataSet[(LongWritable,Text)] = env.createInput(HadoopInputs.readHadoopFile[LongWritable,Text](
      new CombineTextInputFormat,
      classOf[LongWritable],
      classOf[Text],
      "/mytest"
    ))
    val transDataset = dataset.map(line=>{
      val lines = line._2.toString.split(",")
      if(lines.length == 2){
        (lines(0).toString,lines(1).toInt)
      }else{
        null
      }
    }).filter(line=>line!=null)
 
    print(transDataset.count())
 
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerDataSet("tmp_table", transDataset, 'name, 'num)
    val table = tableEnv.sqlQuery("select name,num from tmp_table")
 
    val rtnDataset = tableEnv.toDataSet[(String,Integer)](table)
    rtnDataset.print()
 
    env.execute("test hdfs textfile")
  }
}

3.Hive

2.在pom檔案中新增依賴:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.7.3</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-fs</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
  <groupId>com.jolbox</groupId>
  <artifactId>bonecp</artifactId>
  <version>0.8.0.RELEASE</version>
</dependency>
<dependency>
  <groupId>com.twitter</groupId>
  <artifactId>parquet-hive-bundle</artifactId>
  <version>1.6.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-metastore</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-cli</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-common</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-service</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-shims</artifactId>
  <version>1.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive.hcatalog</groupId>
  <artifactId>hive-hcatalog-core</artifactId>
  <version>1.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.thrift</groupId>
  <artifactId>libfb303</artifactId>
  <version>0.9.3</version>
  <type>pom</type>
</dependency>

3.在flink-lib中新增下面所有jar:

accumulo-core-1.6.0.jar derby-10.11.1.1.jar hive-serde-1.2.0.jar mail-1.4.1.jar
accumulo-fate-1.6.0.jar derbyclient-10.14.2.0.jar hive-service-1.2.0.jar maven-scm-api-1.4.jar
accumulo-start-1.6.0.jar eigenbase-properties-1.1.5.jar hive-shims-0.20S-1.2.0.jar maven-scm-provider-svn-commons-1.4.jar
accumulo-trace-1.6.0.jar flink-dist_2.11-1.6.1.jar hive-shims-0.23-1.2.0.jar maven-scm-provider-svnexe-1.4.jar
activation-1.1.jar flink-hadoop-compatibility_2.11-1.6.1.jar hive-shims-1.2.0.jar netty-3.7.0.Final.jar
ant-1.9.1.jar flink-python_2.11-1.6.1.jar hive-shims-common-1.2.0.jar opencsv-2.3.jar
ant-launcher-1.9.1.jar flink-shaded-hadoop2-uber-1.6.1.jar hive-shims-scheduler-1.2.0.jar oro-2.0.8.jar
antlr-2.7.7.jar geronimo-annotation_1.0_spec-1.1.1.jar hive-testutils-1.2.0.jar paranamer-2.3.jar
antlr-runtime-3.4.jar geronimo-jaspic_1.0_spec-1.0.jar httpclient-4.4.jar parquet-hadoop-bundle-1.6.0.jar
apache-curator-2.6.0.pom geronimo-jta_1.1_spec-1.1.1.jar httpcore-4.4.jar parquet-hive-bundle-1.6.0.jar
apache-log4j-extras-1.2.17.jar groovy-all-2.1.6.jar ivy-2.4.0.jar pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar
asm-commons-3.1.jar guava-14.0.1.jar janino-2.7.6.jar php
asm-tree-3.1.jar guava-15.0.jar jcommander-1.32.jar plexus-utils-1.5.6.jar
avro-1.7.5.jar hamcrest-core-1.1.jar jdo-api-3.0.1.jar postgresql-42.0.0.jar
bonecp-0.8.0.RELEASE.jar hive-accumulo-handler-1.2.0.jar jetty-all-7.6.0.v20120127.jar py
calcite-avatica-1.2.0-incubating.jar hive-ant-1.2.0.jar jetty-all-server-7.6.0.v20120127.jar regexp-1.3.jar
calcite-core-1.2.0-incubating.jar hive-beeline-1.2.0.jar jline-2.12.jar servlet-api-2.5.jar
calcite-linq4j-1.2.0-incubating.jar hive-cli-1.2.0.jar joda-time-2.5.jar slf4j-log4j12-1.7.7.jar
curator-client-2.6.0.jar hive-common-1.2.0.jar jpam-1.1.jar snappy-java-1.0.5.jar
curator-framework-2.6.0.jar hive-contrib-1.2.0.jar json-20090211.jar ST4-4.0.4.jar
curator-recipes-2.6.0.jar hive-exec-1.2.0.jar jsr305-3.0.0.jar stax-api-1.0.1.jar
datanucleus-api-jdo-3.2.1.jar hive-hbase-handler-1.2.0.jar jta-1.1.jar stringtemplate-3.2.1.jar
datanucleus-api-jdo-3.2.6.jar hive-hcatalog-core-1.2.2.jar junit-4.11.jar super-csv-2.2.0.jar
datanucleus-core-3.2.10.jar hive-hwi-1.2.0.jar libfb303-0.9.2.jar tempus-fugit-1.1.jar
datanucleus-core-3.2.2.jar hive-jdbc-1.2.0.jar libthrift-0.9.2.jar velocity-1.5.jar
datanucleus-rdbms-3.2.1.jar hive-jdbc-1.2.0-standalone.jar log4j-1.2.16.jar xz-1.0.jar
datanucleus-rdbms-3.2.9.jar hive-metastore-1.2.0.jar log4j-1.2.17.jar zookeeper-3.4.6.jar

4.下載hive1.2.0版本(根據自己的需要來),將hive-site.xml拷貝一份到hive/conf目錄下;啟動hive thrift metastore

5.即可以讀取hive表:

package com.flinklearn.main
 
 
import com.flinklearn.models.Alert
import org.apache.flink.api.scala.ExecutionEnvironment
 
import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
 
/**
 * Created by dell on 2018/10/25.
 */
object Main {
  def main(args:Array[String]):Unit = {
    val conf = new Configuration()
    conf.set("hive.metastore.local", "false")
    conf.set("hive.metastore.uris", "thrift://ip:9083")
 
    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataset = env.createInput(new HCatInputFormat[Alert]("db", "tb", conf))
 
    dataset.first(10).print()
 
    env.execute("flink hive test")
  }
}

4.JDBC


package com.flinklearn.main;
 
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
 
/**
 * Created by dell on 2018/10/29.
 */
public class Main {
    public static void main(String[] args){
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat()
                    .setDrivername("org.postgresql.Driver")
                    .setDBUrl("jdbc:postgresql://ip:port/nsc")
                    .setUsername("username")
                    .setPassword("password")
                    .setQuery("select xx,yy from zz")
                    .setRowTypeInfo(new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class)))
                    .finish();
 
            DataSource source = env.createInput(inputFormat);
            source.print();
            env.execute("jdbc test");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}