1. 程式人生 > >flink中文文件-快速開始 安裝部署 v1.4

flink中文文件-快速開始 安裝部署 v1.4

Flink入門及實戰-上:

Flink入門及實戰-下:

  • 下載啟動flink   
  • 檢視程式碼
  • 執行例子
  • 下一步

下載啟動flink

flink可以在Linux, Mac OS X, 和Windows平臺上執行。為了執行flink,只需要安裝JAVA7.x(或者更高版本)。windows使用者,請點選此連結檢視相關文件

你可以使用下面命令檢查安裝的java版本

 java -version

如果你已經安裝了java8,你將會看到下面的資料。

java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

下面以在linux上安裝為例(mac上安裝也可以參考這個):

  1. 點此連結下載flink安裝包。你可以選擇任何hadoop/scala的組合。如果你計劃使用本地檔案系統(安裝本地叢集),那麼你選擇任何hadoop版本對應的flink都可以。如果是生產環境,那麼建議根據你叢集上的hadoop版本選擇對應的flink版本。
  2. 進入檔案的下載目錄
  3. 解壓檔案
$ cd ~/Downloads        # 進入檔案的下載目錄
$ tar xzf flink-*.tgz   # 解壓下載的壓縮包
$ cd flink-1.4.1

安裝本地flink叢集

./bin/start-local.sh  # 啟動 Flink 叢集

你也可以在log日誌目錄中檢查系統執行情況

$ tail log/flink-*-jobmanager-*.log
INFO ... - Starting JobManager
INFO ... - Starting JobManager web frontend
INFO ... - Web frontend listening at 127.0.0.1:8081
INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)

檢視程式碼

你可以在github上發現SocketWindowWordCount 

編譯好的javascala原始碼

scala程式碼

object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // port 表示需要連線的埠
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                return
            }
        }

        // 獲取執行環境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // 連線此socket獲取輸入資料
        val text = env.socketTextStream("localhost", port, '\n')

        // 解析資料, 分組, 視窗化, 並且聚合求SUM
        import org.apache.flink.api.scala._ //需要加上這一行隱式轉換 否則在呼叫flatmap方法的時候會報錯
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // 使用一個單執行緒列印結果
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // 定義一個數據型別儲存單詞出現的次數
    case class WordWithCount(word: String, count: Long)
}

java程式碼

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // port 表示需要連線的埠
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // 獲取執行環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 連線此socket獲取輸入資料
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // 解析資料, 分組, 視窗化, 並且聚合求SUM
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

        // 使用一個單執行緒列印結果
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // 定義一個數據型別儲存單詞出現的次數
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

執行這個例子

現在,我們將要執行這個flink例子。它將會從socket獲取資料,並且每隔5秒列印一次計算的單詞出現的次數。

  • 首先,我們使用netcat啟動一個本地socket
$ nc -l 9000
  • 提交flink程式
    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    
    Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
    Using address 127.0.0.1:6123 to connect to JobManager.
    JobManager web interface address http://127.0.0.1:8081
    Starting execution of program
    Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion.
    Connected to JobManager at Actor[akka.tcp://[email protected]:6123/user/jobmanager#297388688]
    11/04/2016 14:04:50     Job execution switched to status RUNNING.
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
    11/04/2016 14:04:50     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
    11/04/2016 14:04:51     Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
    這個程式連線到socket,然後等待資料。你可以通過webui介面檢視job的執行情況

  • 每5秒計算一次單詞,並且列印到控制檯。監控taskmanager的日誌檔案輸出,並且在nc控制檯輸入一些內容,每一行輸入完成以後需要輸入回車。
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye

這個.out檔案將會打印出來在指定時間內單詞出現的次數

$ tail -f log/flink-*-taskmanager-*.out
lorem : 1
bye : 1
ipsum : 4

實驗結束,停止flink。

$ ./bin/stop-local.sh

下一步

檢視更多例子來熟悉flink程式的api。當你已經做完這些的時候,繼續讀下面的流處理指南

獲取更多大資料資料,視訊以及技術交流請加群:

QQ群號1:295505811(已滿)

QQ群號2:54902210

QQ群號3:555684318