1. 程式人生 > >Apache Flink 零基礎入門(十一)Flink transformation

Apache Flink 零基礎入門(十一)Flink transformation

前面講了常用的DataSource的用法,DataSource其實是把資料載入進來,載入進來之後就需要做Transformation操作了。

Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.

 資料轉化可以將一個或多個DataSets轉化到一個新的DataSet。就是一個演算法的綜合使用。

Map Function

Scala

新建一個Object

object DataSetTransformationApp {

  def main(args: Array[String]): Unit = {
    val environment = ExecutionEnvironment.getExecutionEnvironment

  }

  def mapFunction(env: ExecutionEnvironment): Unit = {
    val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))
  }

}

這裡的資料來源是一個1到10的list集合。Map的原理是:假設data資料集中有N個元素,將每一個元素進行轉化:

data.map { x => x.toInt }

好比:y=f(x)

    // 對data中的每一個元素都去做一個+1操作
    data.map((x:Int) => x + 1 ).print()

然後對每一個元素都做一個+1操作。

簡單寫法:

如果這個裡面只有一個元素,就可以直接寫成下面形式:

data.map((x) => x + 1).print()

更簡潔的寫法:

data.map(x => x + 1).print()

更簡潔的方法:

data.map(_ + 1).print()

Java

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        mapFunction(executionEnvironment);
    }

    public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i + "");
        }
        DataSource<String> data = executionEnvironment.fromCollection(list);
        data.map(new MapFunction<String, Integer>() {
            public Integer map(String input) {
                return Integer.parseInt(input) + 1;
            }
        }).print();
    }

因為我們定義的List是一個String的泛型,因此MapFunction的泛型是<String, Integer>,第一個引數表示輸入的型別,第二個引數表示輸出是一個Integer型別。

Filter Function

將每個元素執行+1操作,並取出大於5的元素。

Scala

  def filterFunction(env: ExecutionEnvironment): Unit = {
    val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    data.map(_ + 1).filter(_ > 5).print()
  }

filter只會返回滿足條件的記錄。

Java

    public static void filterFunction(ExecutionEnvironment env) throws Exception {
        List<Integer> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }
        DataSource<Integer> data = env.fromCollection(list);
        data.map(new MapFunction<Integer, Integer>() {
            public Integer map(Integer input) {
                return input + 1;
            }
        }).filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer input) throws Exception {
                return input > 5;
            }
        }).print();
    }

MapPartition Function

map function 與 MapPartition function有什麼區別?

需求:DataSource 中有100個元素,把結果儲存在資料庫中

如果使用map function ,那麼實現方法如下:

  // DataSource 中有100個元素,把結果儲存在資料庫中
  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {
    val students = new ListBuffer[String]
    for (i <- 1 to 100) {
      students.append("Student" + i)
    }
    val data = env.fromCollection(students)
    data.map(x=>{
      // 每一個元素要儲存到資料庫中去,肯定需要先獲取到connection
      val connection = DBUtils.getConnection()
      println(connection + " ... ")
      // TODO .... 儲存資料到DB
      DBUtils.returnConnection(connection)
    }).print()
  }

列印結果,將會列印100個獲取DBUtils.getConnection()的請求。如果資料量增多,顯然不停的獲取連線是不現實的。

因此MapPartition就應運而生了,轉換一個分割槽裡面的資料,也就是說一個分割槽中的資料呼叫一次。

因此要首先設定分割槽:

val data = env.fromCollection(students).setParallelism(4)

設定4個分割槽,也就是並行度,然後使用mapPartition來處理:

data.mapPartition(x => {
      val connection = DBUtils.getConnection()
      println(connection + " ... ")
      // TODO .... 儲存資料到DB
      DBUtils.returnConnection(connection)
      x
    }).print()

那麼就會的到4次連線請求,每一個分割槽獲取一個connection。

Java

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            list.add("student:" + i);
        }
        DataSource<String> data = env.fromCollection(list);
        /*data.map(new MapFunction<String, String>() {
            @Override
            public String map(String input) throws Exception {
                String connection = DBUtils.getConnection();
                System.out.println("connection = [" + connection + "]");
                DBUtils.returnConnection(connection);
                return input;
            }
        }).print();*/
        data.mapPartition(new MapPartitionFunction<String, Object>() {
            @Override
            public void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {
                String connection = DBUtils.getConnection();
                System.out.println("connection = [" + connection + "]");
                DBUtils.returnConnection(connection);
            }
        }).print();
    }

first   groupBy sortGroup

first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內排序

def firstFunction(env:ExecutionEnvironment): Unit = {
    val info = ListBuffer[(Int, String)]()
    info.append((1, "hadoop"))
    info.append((1, "spark"))
    info.append((1, "flink"))
    info.append((2, "java"))
    info.append((2, "springboot"))
    info.append((3, "linux"))
    info.append((4, "vue"))
    val data = env.fromCollection(info)
    data.first(3).print()
    data.groupBy(0).first(2).print()
    data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print()
  }