1. 程式人生 > >flink window的early計算

flink window的early計算

Tumbing Windows:滾動視窗,視窗之間時間點不重疊。它是按照固定的時間,或固定的事件個數劃分的,分別可以叫做滾動時間視窗和滾動事件視窗。
Sliding Windows:滑動視窗,視窗之間時間點存在重疊。對於某些應用,它們需要的時間是不間斷的,需要平滑的進行視窗聚合。

         例如,可以每30s記算一次最近1分鐘使用者所購買的商品數量的總數,這個就是時間滑動視窗;或者每10個客戶點選購買,然後就計算一下最近100個客戶購買的商品的總和,這個就是事件滑動視窗。
Session Windows:會話視窗,經過一段設定時間無資料認為視窗完成。

在預設的場景下,所有的視窗都是到達時間語義上的windown end time後觸發對整個視窗元素的計算,但是在部分場景的情況下,業務方需要在視窗時間沒有結束的情況下也可以獲得當前的聚合結果,比如每隔五分鐘獲取當前小時的sum值,這種情況下,官方提供了對於上述視窗的定製化計算器ContinuousEventTimeTrigger

ContinuousProcessingTimeTrigger

下面是一個使用ContinuousProcessingTimeTrigger的簡單例子:

public class ContinueTriggerDemo {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        String hostName = "localhost";
        Integer port = Integer.parseInt("8001
"); ; // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // 從指定socket獲取輸入資料 DataStream<String> text = env.socketTextStream(hostName, port); text.flatMap(
new LineSplitter()) //資料語句分詞 .keyBy(0) // 流按照單詞分割槽 .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 設定一個120s的滾動視窗 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//視窗每統計一次當前計算結果 .sum(1)// count求和 .map(new Mapdemo())//輸出結果加上時間戳 .print(); env.execute("Java WordCount from SocketTextStream Example"); } /** * Implements the string tokenizer that splits sentences into words as a * user-defined FlatMapFunction. The function takes a line (String) and * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, * Integer>). */ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } public static final class Mapdemo implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { @Override public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value) throws Exception { // TODO Auto-generated method stub DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String s = format2.format(new Date()); return new Tuple3<String, String, Integer>(value.f0, s, value.f1); } } }

在本地啟動埠 :nc -lk 8001 並啟動flink程式
輸入資料:

           aa
           aa
           bb

觀察程式資料結果日誌

5> (aa,2018-07-30 16:08:20,2)
5> (bb,2018-07-30 16:08:20,1)
5> (aa,2018-07-30 16:08:40,2)
5> (bb,2018-07-30 16:08:40,1)
5> (aa,2018-07-30 16:09:00,2)
5> (bb,2018-07-30 16:09:00,1)
5> (aa,2018-07-30 16:09:20,2)
5> (bb,2018-07-30 16:09:20,1)
5> (aa,2018-07-30 16:09:40,2)
5> (bb,2018-07-30 16:09:40,1)

在上述輸入後繼續輸入

    aa

日誌結果統計為

5> (aa,2018-07-30 16:10:00,3)
5> (bb,2018-07-30 16:10:00,1)

根據日誌資料可見,flink輕鬆實現了一個視窗時間長度為120s並每20s向下遊傳送一次視窗當前聚合結果的功能。

附原始碼:

原始碼路徑:flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\windowing\triggers\ContinuousProcessingTimeTrigger.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
 * A {@link Trigger} that continuously fires based on a given time interval as measured by
 * the clock of the machine on which the job is running.
 *
 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 */
@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long interval;

    /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    private ContinuousProcessingTimeTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        long timestamp = fireTimestamp.get();
        ctx.deleteProcessingTimeTimer(timestamp);
        fireTimestamp.clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window,
            OnMergeContext ctx) {
        ctx.mergePartitionedState(stateDesc);
    }

    @VisibleForTesting
    public long getInterval() {
        return interval;
    }

    @Override
    public String toString() {
        return "ContinuousProcessingTimeTrigger(" + interval + ")";
    }

    /**
     * Creates a trigger that continuously fires based on the given interval.
     *
     * @param interval The time interval at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
        return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

原始碼路徑:flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\windowing\triggers\ContinuousEventTimeTrigger.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
 * A {@link Trigger} that continuously fires based on a given time interval. This fires based
 * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 *
 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 */
@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long interval;

    /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    private ContinuousEventTimeTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerEventTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

        if (time == window.maxTimestamp()){
            return TriggerResult.FIRE;
        }

        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.get();

        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.clear();
            fireTimestampState.add(time + interval);
            ctx.registerEventTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        Long timestamp = fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
        Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
        if (nextFireTimestamp != null) {
            ctx.registerEventTimeTimer(nextFireTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ContinuousEventTimeTrigger(" + interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return interval;
    }

    /**
     * Creates a trigger that continuously fires based on the given interval.
     *
     * @param interval The time interval at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
        return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

相關推薦

Flink計算隨筆(1)

實現思路 class 不能 inter oss 最終 sta 而是 連續存儲 相比 Spark Stream、Kafka Stream、Storm 等,為什麽阿裏會選擇 Flink 作為新一代流式計算引擎?前期經過了哪些調研和對比? 大沙:我們是 2015 年開始調研新一代

flink計算隨筆(6)

starting nts 取消 add multi nvi handle input .sh ?生成,編譯模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.

flink計算隨筆(3)

容錯 額外 text 之間 方式 project 平臺 一對一 put Stateful Computations over Data Streams(在數據流的有狀態計算)Apache Flink是一個用於分布式流和批處理數據的開源平臺。Flink的核心是一個流數據流引擎

Flink計算隨筆(1)

Apache Flink(下簡稱Flink)專案是大資料處理領域最近冉冉升起的一顆新星,其不同於其他大資料專案的諸多特性吸引了越來越多人的關注。本文將深入分析Flink的一些關鍵技術與特性,希望能夠幫助讀者對Flink有更加深入的瞭解,對其他大資料系統開發者也能有所裨益。本文

flink計算隨筆(2)

MACOS下安裝flink: $ brew install apache-flink ... $ flink --version MACOS下啟動flink: $cd /usr/local/Cellar/apache-flink/1.6.0 $./libexec/bi

flink計算隨筆(5)

Windows 聚合事件(例如計數、和)在流上的工作方式與批處理不同。例如,不可能計算流中的所有元素,因為流通常是無限的(無界的)。相反,流上的聚合(計數、和等)是由視窗限定作用域的,例如“過去5分鐘的

flink計算隨筆(7)

Flink中的DataStream程式是在資料流上實現轉換的常規程式(例如,過濾、更新狀態、定義視窗、聚合)。資料流最初是從各種來源(例如,訊息佇列、套接字流、檔案)建立的。結果通過sink返回,它可以

flink計算隨筆(8)

SQL標準的Apache Calcite statement: setStatement | resetStatement | explain | describe | insert | update |

flink計算隨筆(9)

​生成,編譯模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-q

flink內部計算指標的95線-99線等的實現

15年在某電商從0設計了一個通用的API監控系統,當時只是計算了成功率+平均耗時,沒有算75,90,95,99,999,9999線,這次單位需要,所以促使我去思考這個問題,問了單位CAT維護人員,大致瞭解了計算方式,跟我在18年7月份在單位內網BBS發表的文章思路是一致的,所以就直接寫了下面的程式碼 P

Flink計算程式設計--watermark(水位線)簡介

1、watermark的概念 watermark是一種衡量Event Time進展的機制,它是資料本身的一個隱藏屬性。通常基於Event Time的資料,自身都包含一個timestamp,例如1472693399700(2016-09-01 09:29:59.

Flink計算中SQL表的概念和原理

文章目錄 前言 動態表和動態查詢的概念 動態表的時間屬性 引用 前言 Fink在新發布的1.7版本中,不斷完善和加強了SQL&Table API方面的功能支援。這使得在流計算過程中,使用者同樣能夠運用熟悉的SQL

Flink計算程式設計--Kafka+Flink整合demo

1、簡介 1.1、Kafka Consumer提供了2種API:high level與low level(SimpleConsumer)。 (1)high level consumer的API較為簡單,不需要關心offset、partition、broker

Flink計算程式設計--看看別人怎麼用Session Window

1、簡介 流處理在實際生產中體現的價值越來越大,Apache Flink這個純流式計算框架也正在被越來越多的公司所關注並嘗試使用其流上的功能。 在2017年波蘭華沙大資料峰會上,有一家叫做GetInData的公司,分享了一個關於他們內部如何使用Flink的s

Flink計算程式設計--Flink中allowedLateness詳細介紹及思考

1、簡介 Flink中藉助watermark以及window和trigger來處理基於event time的亂序問題,那麼如何處理“late element”呢? 也許有人會問,out-of-order element與late element有什麼區別?不

Flink計算與時序資料庫Influxdb+grafana

1、簡介 關於Influxdb和grafana,可以參考:介紹或者influxdb官方文件,grafana官方文件。這裡預設已經將influxdb和grafana安裝完成。 2、Flink sink to Influxdb influxdb不屬於Fli

Flink計算程式設計--Session Window實戰

1、session window簡介 Flink從1.1開始支援Session window,它是屬於基於時間的視窗。 這裡以EventTime為例,基於時間的視窗,可以分為3種:TumblingEventTimeWindows,SlidingEventTi

Flink計算程式設計--Flink sink to Oracle

1、Flink connectors 關於Flink connectors,Flink 1.1提供了許多內建的第三方聯結器,這些connectors包括: Apache Kafka (sink/source) Elasticsearch (sink) E

Flink計算程式設計--在雙流中體會joinedStream與coGroupedStream

一、joinedStream與coGroupedStream簡介 在實際的流計算中,我們經常會遇到多個流進行join的情況,Flink提供了2個Transformations來實現。 如下圖: 注意:Join(Cogroups) two data st

7、Flink計算處理和批處理平臺

一、Flink 基本概念 Flink 是一個批處理和流處理結合的統一計算框架,其核心是一個提供了資料分發以及並行化計算的流資料處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。Flink 與 Storm 類似,屬於事件驅動型實時流系統。 所謂說事件驅動型指