1. 程式人生 > >Flink安裝、部署、KafkaSource、SinKToMysql

Flink安裝、部署、KafkaSource、SinKToMysql

flink安裝、部署、測試

下載flink安裝包

flink下載地址

https://archive.apache.org/dist/flink/flink-1.5.0/

因為例子不需要hadoop,下載flink-1.5.0-bin-scala_2.11.tgz即可

上傳至機器的/opt目錄下

解壓

tar -zxf flink-1.5.0-bin-scala_2.11.tgz -C ../opt/

配置master節點

選擇一個 master節點(JobManager)然後在conf/flink-conf.yaml中設定jobmanager.rpc.address 配置項為該節點的IP 或者主機名。確保所有節點有有一樣的jobmanager.rpc.address 配置。

jobmanager.rpc.address: node1

(配置埠如果被佔用也要改 如預設8080已經被spark佔用,改成了8088)

rest.port: 8088

本次安裝 master節點為node1,因為單機,slave節點也為node1

配置slaves

將所有的 worker 節點 (TaskManager)的IP 或者主機名(一行一個)填入conf/slaves 檔案中。

啟動flink叢集

bin/start-cluster.sh

開啟 http://node1:8088 檢視web頁面

Task Managers代表當前的flink只有一個節點,每個task還有兩個slots

測試

依賴

    <groupId>com.rz.flinkdemo</groupId>
    <artifactId>Flink-programe</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.5.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

測試程式碼

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        final String hostName;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
            hostName = params.get("hostname");
        } catch (Exception e) {
            System.err.println("No port or hostname specified. Please run 'SocketWindowWordCount --port <port> --hostname <hostname>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostName, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);


        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}


打包mvn clean install (如果打包過程中報錯java.lang.OutOfMemoryError)

在命令列set MAVEN_OPTS= -Xms128m -Xmx512m

繼續執行mvn clean install

生成FlinkTest.jar

找到打成的jar,並upload,開始上傳

執行引數介紹

提交結束之後去overview介面看,可以看到,可用的slots變成了一個,因為我們的socket程式佔用了一個,正在running的job變成了一個

傳送資料

[[email protected] flink-1.5.0]# nc -l 8099
aaa bbb
aaa ccc
aaa bbb
bbb ccc

點開running的job,你可以看見接收的位元組數等資訊

到log目錄下可以清楚的看見輸出

[[email protected] log]# tail -f flink-root-taskexecutor-2-localhost.out
aaa : 1
ccc : 1
ccc : 1
bbb : 1
ccc : 1
bbb : 1
bbb : 1
ccc : 1
bbb : 1
ccc : 1

除了可以在介面提交,還可以將jar上傳的linux中進行提交任務

執行flink上傳的jar

bin/flink run -c com.rz.flinkdemo.SocketWindowWordCount jars/FlinkTest.jar --port 8099 --hostname node1

其他步驟一致。

使用kafka作為source

加上依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.5.0</version>
</dependency>
public class KakfaSource010 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","node1:9092");
        properties.setProperty("group.id","test");

        //DataStream<String> test = env.addSource(new FlinkKafkaConsumer010<String>("topic", new SimpleStringSchema(), properties));
        //可以通過正則表示式來匹配合適的topic
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>(java.util.regex.Pattern.compile("test-[0-9]"), new SimpleStringSchema(), properties);
        //配置從最新的地方開始消費
        kafkaSource.setStartFromLatest();

        //使用addsource,將kafka的輸入轉變為datastream
        DataStream<String> consume = env.addSource(kafkaSource);

        ...
        //process  and   sink

        env.execute("KakfaSource010");

    }
}

使用mysql作為sink

flink本身並沒有提供datastream輸出到mysql,需要我們自己去實現

首先,匯入依賴

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.30</version>
</dependency>

自定義sink,首先想到的是extends SinkFunction,整合flink自帶的sinkfunction,再當中實現方法,實現如下

public class MysqlSink implements
        SinkFunction<Tuple2<String,String>> {

    private static final long serialVersionUID = 1L;
    private Connection connection;
    private PreparedStatement preparedStatement;
    String username = "mysql.user";
    String password = "mysql.password";
    String drivername = "mysql.driver";
    String dburl = "mysql.url";

    @Override
    public void invoke(Tuple2<String,String> value) throws Exception {
        Class.forName(drivername);
        connection = DriverManager.getConnection(dburl, username, password);
        String sql = "insert into table(name,nickname) values(?,?)";
        preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, value.f0);
        preparedStatement.setString(2, value.f1);
        preparedStatement.executeUpdate();
        if (preparedStatement != null) {
            preparedStatement.close();
        }
        if (connection != null) {
            connection.close();
        }

    }

}

這樣實現有個問題,每一條資料,都要開啟mysql連線,再關閉,比較耗時,這個可以使用flink中比較好的Rich方式來實現,程式碼如下

public class MysqlSink extends RichSinkFunction<Tuple2<String,String>> {

    private Connection connection = null;
    private PreparedStatement preparedStatement = null;
    private String userName = null;
    private String password = null;
    private String driverName = null;
    private String DBUrl = null;

    public MysqlSink() {
        userName = "mysql.username";
        password = "mysql.password";
        driverName = "mysql.driverName";
        DBUrl = "mysql.DBUrl";
    }

    public void invoke(Tuple2<String,String> value) throws Exception {
        if(connection==null){
            Class.forName(driverName);
            connection = DriverManager.getConnection(DBUrl, userName, password);
        }
        String sql ="insert into table(name,nickname) values(?,?)";
        preparedStatement = connection.prepareStatement(sql);

        preparedStatement.setString(1,value.f0);
        preparedStatement.setString(2,value.f1);

        preparedStatement.executeUpdate();//返回成功的話就是一個,否則就是0
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(driverName);
        connection = DriverManager.getConnection(DBUrl, userName, password);
    }

    @Override
    public void close() throws Exception {
        if(preparedStatement!=null){
            preparedStatement.close();
        }
        if(connection!=null){
            connection.close();
        }
    }
}

Rich方式的優點在於,有個open和close方法,在初始化的時候建立一次連線,之後一直使用這個連線即可,縮短建立和關閉連線的時間,也可以使用連線池實現,這裡只是提供這樣一種思路。

使用這個mysqlsink也非常簡單

//直接addsink,即可輸出到自定義的mysql中,也可以將mysql的欄位等寫成可配置的,更加方便和通用
proceDataStream.addSink(new MysqlSink());

總結

本次的筆記做了簡單的部署、測試、kafkademo,以及自定義實現mysqlsink的一些內容,其中比較重要的是Rich的使用,希望大家能有所收穫。