1. 程式人生 > >Flink程式設計練習(一)

Flink程式設計練習(一)

Flink程式設計練習,NYC計程車資料

環境配置

本專案參考這裡setup

  • 首先確保已經下載好flink依賴,並從Github下載程式碼。

  • 下載依賴資料,這裡依賴的是紐約出租車資料,可以使用命令列下載:

    • wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
      wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz
      

由於本專案使用java編譯,而原始檔有scala,為了忽略scala的錯誤,這裡需要指定scala的SDK:

54449085480

專案

資料設定

使用IDEA開啟該maven專案,首先需要找到依賴的檔案路徑:

54448844030

可以在ExerciseBase類中找到依賴的計程車資料檔案路徑並修改:

54448803823

這裡使用的是計程車的 event-time

Taxi Ride Cleansing

utils類中,可以找到GeoUtils,這是用來檢測該GPS點資料是否在紐約市內,具體實現參考isInNYC(float lon, float lat)函式。

RideCleansingExercise

若直接執行com/dataartisans/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java

,會報錯,其錯誤是在NYCFilter中:

54448966261

這裡的MissingSolutionException未定義,需要自己實現:

RideCleansingTest

RideCleansingTest中定義了該類的測試類,我們可以每個函式逐步執行:

54449137550

仔細檢視,這裡測試了兩個函式,分別是測試指定GPS點是否在NYC中,每次將執行的結果與真實結果進行比較(assertEquals),並判斷是否相等。

如何test

  • 由於該檔案由RideCleansingExerciseRideCleansingSolution組成,且exercise是我們需要處理的類,Solution
    是正確的類,因此我們要對兩個類進行比較。
  • 這裡使用javaExercisejavaSolution分別指向這兩個類的main函式,並執行RunApp方法:
    • 54449239193
  • 其中,RunApp又呼叫了execute方法,這裡才是真正執行的函式:
    • 54449246871
  • 在每個測試類中,我們發現其加入source的方法都一樣,如果未指定輸入源(ExerciseBase.rides),則將所有的資料作為資料來源;這裡我們已經指定了特定的GPS點作為資料來源。
  • 這裡會將執行了filter函式的值進行返回,並與grandtruth進行比較。

Revise

  • 知道錯誤以後,我們就應該修改之前RideCleansingExercise類中的filter方法,其實就是使用isinNYC函式,返回一個bool型別:

  • private static class NYCFilter implements FilterFunction<TaxiRide> {
    
    		@Override
    		public boolean filter(TaxiRide taxiRide) throws Exception {
    			return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
    					GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
    		}
    	}
    
  • 再次執行該類,可以得到在NYC的所有GPS點:

  • 54449300060