Flink程式設計練習(一)
阿新 • • 發佈:2018-12-24
Flink程式設計練習,NYC計程車資料
環境配置
-
首先確保已經下載好flink依賴,並從Github下載程式碼。
-
下載依賴資料,這裡依賴的是紐約出租車資料,可以使用命令列下載:
-
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz
-
由於本專案使用java編譯,而原始檔有scala,為了忽略scala的錯誤,這裡需要指定scala的SDK:
專案
資料設定
使用IDEA開啟該maven專案,首先需要找到依賴的檔案路徑:
可以在ExerciseBase
類中找到依賴的計程車資料檔案路徑並修改:
這裡使用的是計程車的 event-time
Taxi Ride Cleansing
在utils
類中,可以找到GeoUtils
,這是用來檢測該GPS點資料是否在紐約市內,具體實現參考isInNYC(float lon, float lat)
函式。
RideCleansingExercise
若直接執行com/dataartisans/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
NYCFilter
中:
這裡的MissingSolutionException
未定義,需要自己實現:
RideCleansingTest
在RideCleansingTest
中定義了該類的測試類,我們可以每個函式逐步執行:
仔細檢視,這裡測試了兩個函式,分別是測試指定GPS點是否在NYC中,每次將執行的結果與真實結果進行比較(assertEquals
),並判斷是否相等。
如何test
- 由於該檔案由
RideCleansingExercise
和RideCleansingSolution
組成,且exercise
是我們需要處理的類,Solution
- 這裡使用
javaExercise
和javaSolution
分別指向這兩個類的main函式,並執行RunApp方法: - 其中,RunApp又呼叫了execute方法,這裡才是真正執行的函式:
- 在每個測試類中,我們發現其加入source的方法都一樣,如果未指定輸入源(
ExerciseBase.rides
),則將所有的資料作為資料來源;這裡我們已經指定了特定的GPS點作為資料來源。 - 這裡會將執行了
filter
函式的值進行返回,並與grandtruth進行比較。
Revise
-
知道錯誤以後,我們就應該修改之前
RideCleansingExercise
類中的filter方法,其實就是使用isinNYC
函式,返回一個bool型別: -
private static class NYCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) throws Exception { return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); } }
-
再次執行該類,可以得到在NYC的所有GPS點: