1. 程式人生 > >日誌服務Flink Connector《支援Exactly Once》

日誌服務Flink Connector《支援Exactly Once》

摘要: Flink log connector是阿里雲日誌服務推出的,用於對接Flink的工具,包含兩塊,分別是消費者和生產者,消費者用於從日誌服務中讀資料,支援exactly once語義,生產者用於將資料寫到日誌服務中,該Connector隱藏了日誌服務的一些概念,比如Shard的分裂合併等,使用者在使用時只需要專注在自己的業務邏輯即可。

阿里雲日誌服務是針對實時資料一站式服務,使用者只需要將精力集中在分析上,過程中資料採集、對接各種儲存計算、資料索引和查詢等瑣碎工作等都可以交給日誌服務完成。

日誌服務中最基礎的功能是LogHub,支援資料實時採集與消費,實時消費家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

圖片描述

Flink Connector
Flink log connector是阿里雲日誌服務提供的,用於對接flink的工具,包括兩部分,消費者(Consumer)和生產者(Producer)。

消費者用於從日誌服務中讀取資料,支援exactly once語義,支援shard負載均衡.
生產者用於將資料寫入日誌服務,使用connector時,需要在專案中新增maven依賴:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId
>
<version>1.3.2</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>flink-log-connector</artifactId> <version>0.1.3</version> </dependency> <dependency
>
<groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.10</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>log-loghub-producer</artifactId> <version>0.1.8</version> </dependency>

程式碼:Github

用法
請參考日誌服務文件,正確建立Logstore。
如果使用子賬號訪問,請確認正確設定了LogStore的RAM策略。參考授權RAM子使用者訪問日誌服務資源。
1. Log Consumer
在Connector中, 類FlinkLogConsumer提供了訂閱日誌服務中某一個LogStore的能力,實現了exactly once語義,在使用時,使用者無需關心LogStore中shard數
量的變化,consumer會自動感知。

flink中每一個子任務負責消費LogStore中部分shard,如果LogStore中shard發生split或者merge,子任務消費的shard也會隨之改變。

1.1 配置啟動引數

Properties configProps = new Properties();
// 設定訪問日誌服務的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 設定訪問ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 設定日誌服務的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 設定日誌服務的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 設定消費日誌服務起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 設定日誌服務的訊息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(
        new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));

上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任務數量和日誌服務LogStore中的shard數量是獨立的,如果shard數量多於子任務數量,每個子任務不重複的消費多個shard,如果少於,

那麼部分子任務就會空閒,等到新的shard產生。

1.2 設定消費起始位置
Flink log consumer支援設定shard的消費起始位置,通過設定屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定製消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:

Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的資料開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的資料開始消費。
UnixTimestamp: 一個整型數值的字串,用1970-01-01到現在的秒數表示, 含義是消費shard中這個時間點之後的資料。
三種取值舉例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 監控:消費進度(可選)
Flink log consumer支援設定消費進度監控,所謂消費進度就是獲取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
文件消費組-檢視狀態,消費組-監控報警

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面程式碼是可選的,如果設定了,consumer會首先建立consumerGroup,如果已經存在,則什麼都不做,consumer中的snapshot會自動同步到日誌服務的consumerGroup中,使用者可以在日誌服務的控制檯檢視consumer的消費進度。

1.4 容災和exactly once語義支援
當開啟Flink的checkpointing功能時,Flink log consumer會週期性的將每個shard的消費進度儲存起來,當作業失敗時,flink會恢復log consumer,並
從儲存的最新的checkpoint開始消費。

寫checkpoint的週期定義了當發生失敗時,最多多少的資料會被回溯,也就是重新消費,使用程式碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟flink exactly once語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s儲存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的細節請參考Flink官方文件Checkpoints。

1.5 補充材料:關聯 API與許可權設定
Flink log consumer 會用到的阿里雲日誌服務介面如下:

GetCursorOrData

用於從shard中拉資料, 注意頻繁的呼叫該介面可能會導致資料超過日誌服務的shard quota, 可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制介面呼叫的時間間隔和每次呼叫拉取的日誌數量,shard的quota參考文章[shard簡介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

ListShards

 用於獲取logStore中所有的shard列表,獲取shard狀態等.如果您的shard經常發生分裂合併,可以通過調整介面的呼叫週期來及時發現shard的變化。
// 設定每30s呼叫一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");

CreateConsumerGroup

該介面呼叫只有當設定消費進度監控時才會發生,功能是建立consumerGroup,用於同步checkpoint。

ConsumerGroupUpdateCheckPoint

該介面使用者將flink的snapshot同步到日誌服務的consumerGroup中。

子使用者使用Flink log consumer需要授權如下幾個RAM Policy:

圖片描述
2. Log Producer
FlinkLogProducer 用於將資料寫到阿里雲日誌服務中。

注意producer只支援Flink at-least-once語義,這就意味著在發生作業失敗的情況下,寫入日誌服務中的資料有可能會重複,但是絕對不會丟失。

用法示例如下,我們將模擬產生的字串寫入日誌服務:

// 將資料序列化成日誌服務的資料格式
class SimpleLogSerializer implements LogSerializationSchema<String> {

    public RawLogGroup serialize(String element) {
        RawLogGroup rlg = new RawLogGroup();
        RawLog rl = new RawLog();
        rl.setTime((int)(System.currentTimeMillis() / 1000));
        rl.addContent("message", element);
        rlg.addLog(rl);
        return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer";
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);


    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);

        DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());

        Properties configProps = new Properties();
        // 設定訪問日誌服務的域名
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        // 設定訪問日誌服務的ak
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        // 設定日誌寫入的日誌服務project
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        // 設定日誌寫入的日誌服務logStore
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);

        FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer");
    }
    // 模擬產生日誌
    public static class EventsGenerator implements SourceFunction<String> {
        private boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.1 初始化
Producer初始化主要需要做兩件事情:

初始化配置引數Properties, 這一步和Consumer類似, Producer有一些定製的引數,一般情況下使用預設值即可,特殊場景可以考慮定製:

// 用於傳送資料的io執行緒的數量,預設是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 該值定義日誌資料被快取傳送的時間,預設是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 快取傳送的包中日誌的數量,預設是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 快取傳送的包的大小,預設是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作業可以使用的記憶體總的大小,預設是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述引數不是必選引數,使用者可以不設定,直接使用預設值。

過載LogSerializationSchema,定義將資料序列化成RawLogGroup的方法。

RawLogGroup是log的集合,每個欄位的含義可以參考文件[日誌資料模型](https://help.aliyun.com/document_detail/29054.html)。

如果使用者需要使用日誌服務的shardHashKey功能,指定資料寫到某一個shard中,可以使用LogPartitioner產生資料的hashKey,用法例子如下:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // 生成32位hash值
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });

注意LogPartitioner是可選的,不設定情況下, 資料會隨機寫入某一個shard。

2.2 許可權設定:RAM Policy
Producer依賴日誌服務的API寫資料,如下:

log:PostLogStoreLogs
log:ListShards
當RAM子使用者使用Producer時,需要對上述兩個API進行授權:

圖片描述