1. 程式人生 > >Flink自定義一個簡單source

Flink自定義一個簡單source

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.CachingTokenFilter;

import java.util.Random;

public class MySelfSourceTest01 {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Random random = new Random();
                // 迴圈可以不停的讀取靜態資料
                while (true) {
                    int nextInt = random.nextInt(100);
                    ctx.collect("random : " + nextInt);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] sps = value.split(":");
                return new Tuple2<>(value, Integer.parseInt(sps[1].trim()));
            }
        }).keyBy(0).timeWindow(Time.seconds(5));

        SingleOutputStreamOperator<String> apply = window.apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                input.forEach(x -> {
                    System.out.println("apply function -> " + x.f0);
                    out.collect(x.f0);
                });
            }
        });

        apply.print();

        try {
            env.execute("myself_source_test01");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

相關推薦

Flink定義一個簡單source

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.t

springboot 定義一個簡單的 starter

1.新建專案 。 啟動器只用來做依賴匯入; 專門來寫一個自動配置模組; idea 下建立空專案 hello-spring-boot-starter 新增兩個子模組 spring-boot-starter-autoconfigurer, spring-boot-starter-hell

定義一個簡單的迭代器(line_iterator)

STL是容器、迭代器、演算法三位一體的好東西,使用STL編寫的程式看起來非常簡潔。比如從cin輸入若干字串,每一字串佔一行,然後將這些字串按字典序排序並輸出到cout中,相關的程式碼如下所示: /* 迭代器的使用 * created: btwsmile

Visual C++網路程式設計經典案例詳解 第5章 網頁瀏覽器 HTTP響應 實體資料 定義一個簡單的訊息體結構

總之,伺服器返回的響應訊息類似於C++語言中的結構體 訊息頭和訊息體就是這個結構體裡面的元素。 使用者在使用HTTP程式設計時, 可以根據需要自定義一個結構體儲存該訊息資料。 例如,自定義一個簡單的訊息結構體 typedef struct { char *messagehead; /

Django定義一個簡單的中介軟體,並使用此中介軟體

1、在最近做的專案中,需要每個頁面訪問的時候判斷是否登入,沒登入的話就跳轉到登入頁面,因此抽出個公共方法,並自定義箇中間件是很有必要的,這樣就可以用註解方式去使用這個自定義的中介軟體,就如Django自帶的@login_required一樣。 2、因此首先在專案的目錄底下新

定義View(簡單一個畫板)

畫板 程式碼 public class PaintView extends View { private Paint mPaint;//畫筆工具 private Path mPath;//路徑 public PaintView(Conte

定義一個更好用的SwipeRefreshLayout(彈力拉伸效果詳解)(轉載)

dsc drag 常數 lane swipe loading 數據改變 高中數學 tca 轉自: 自定義一個更好用的SwipeRefreshLayout(彈力拉伸效果詳解) 前言 熟悉SwipeRefreshLayout的同學一定知道,SwipeRefreshLayout是

2.Border Layout 定義一個Layout來完成布局。

log 自定義 min int size ger 官方文檔 implement for 目標:          1.每一個被添加到布局裏的控件都是QLayoutItem,我們根據方位添加。 2.定義一個結構體 ItemWrapper。裏面包含QLayoutItem

定義一個校驗器--------------------------完成用戶註冊時候,對username是否符合規則以及時候已經存在於數據庫的校驗

實例 check ajax -- value ava .cn java 數據 實例: <!-- 自定義校驗表單--> $.validator.addMethod( "checkusername", //校驗規則名稱,類似於required

c++primer,定義一個復數類

opera 指針 隨著 per call 拷貝構造函數 會銷 局部變量 eal 1 #include<iostream> 2 #include<string> 3 #include<vector> 4 #include<a

Android零基礎入門第24節:定義View簡單使用

子類 protect jin 討論 我們 @+ amp 進階 運行程序 當我們開發中遇到Android原生的組件無法滿足需求時,這時候就應該自定義View來滿足這些特殊的組件需求。 一、概述 很多初入Android開發的程序員,對於Android自定義View可能比較

關於定義一個上傳的file按鈕

ner receive play display list 之前 引入 image oot 在input中html給我們一個 type file用來做文件上傳的功能,比如 但是這樣的樣式,實在難看,在開發的時候看了layui和bootstrap的點擊上傳,都很不錯。 前

如何定義一個長度可變數組

方式 urn img 數組長度 字符串數組 個數 很多 由於 lac 摘要:本文主要寫了如何自定義一個長度可變數組 數組是在程序設計中,為了處理方便,把具有相同類型的若幹元素按無序的形式組織起來的一種形式 在定義之初,數組的長度就被定義 新建數組有很多方式 下面兩個都可

Java類載入器( CLassLoader ) 死磕5: 定義一個檔案系統的classLoader

【正文】Java類載入器(  CLassLoader ) 死磕5:  自定義一個檔案系統classLoader 本小節目錄 5.1. 自定義類載入器的基本流程 5.2. 入門案例:自定義檔案系統類載入器 5.3. 案例的環境配置 5.4 FileClassLoader 案例實現步驟 5

使用java反射,定義springMvc簡單案例

目前javaWeb開發領域,SpringMvc已經是絕大部分中小公司必選框架,那麼springMvc是如何實現的呢。這裡通過一個簡單的小案例來演示一下。 首先看一下案例的結構圖   目前springBoot專案比較流行,這裡新建一個springBoot專案,先引入專案依賴

flutter - 點選事件(一) - 定義一個方便的點選控制元件

android中,所有View都可以直接setOnClickListener, RN中也有TouchableHightlight這樣的控制元件可以直接套在外面,ios中也可以有UIControl 這樣的控制元件可以直接新增點選事件. 那麼flutter中有嗎? 答案自然是有. Ges

flutter 如何定義一個loadmore / 載入更多

寫在前面 這類的庫在pub上有很多 我為什麼要自定義呢 首先是專案需要,並且這種庫普適性高,抽取出來今後複用也方便點 另外記錄一下編碼思路,方便後續檢視 pub地址 pub國內映象 github 使用說明 匯入說明看這裡 或 中文映象 看看構造方法 一共

Vue 定義一個外掛的用法及案例

1.開發外掛 install有兩個引數,第一個是Vue構造器,第二個引數是一個可選的選項物件   MyPlugin.install = function (Vue, options) {   // 1. 新增全域

React Native:定義一個導航欄,改變狀態列背景,隱藏狀態列

設計開發過程中,導航欄都會有所不同,這時候使用RN就需要自定義一個想要的導航欄了,RN中文網有講專門ios的導航欄(NavigatorIOS),可以不用自定義。 首先定義自定義導航欄的一些屬性的約束,記得npm install --save prop-types然後引入import Prop

python實現使用者定義一個矩形的輸出

補充知識:python中的print 函式在列印的時候末尾會自動補全換行,python3.0以上版本可以用 print(‘abc’,end = ‘’)來換掉換行,直接再一行中列印 如果不加end的話,最後一個預設值是\n,如果加了end=’’,就把\n給去掉了,變成一個空的字串print()