除錯 Flink 原始碼
本文主要是講講flink的原始碼編譯,案例執行,flink原始碼除錯過程。除錯flink的原始碼及案例,需要先clone工程,編一下原始碼,去掉規範檢查,修改工程,最後才是除錯執行。
1. clone工程
首先複製flink的github地址
[email protected]:apache/flink.git
接著在idea點選路徑
File--->New--->Project from Version Control--->git
彈出視窗
把剛剛複製的flink的github地址貼上到url輸入欄,點選clone按鈕,然後等待構建完成,工程有點大需要點時間。
工程clone完成之後,可以在idea 的右下角切換到自己所用的分支,我的分支是1.6.
切換完成之後,分支顯示為:
2. 編譯原始碼
原始碼編譯可以直接用idea的maven外掛。
報錯如下:
修改一下根目錄下的pom.xml檔案
去掉程式碼風格檢查,註釋掉這個的主要原因是我們要改原始碼,不註釋掉無法編譯通過。
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <version>2.17</version> <dependencies> <dependency> <groupId>com.puppycrawl.tools</groupId> <artifactId>checkstyle</artifactId> <!-- Note: match version with docs/internals/ide_setup.md --> <version>8.4</version> </dependency> </dependencies> <executions> <execution> <id>validate</id> <phase>validate</phase> <goals> <goal>check</goal> </goals> </execution> </executions> <configuration> <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> <includeTestSourceDirectory>true</includeTestSourceDirectory> <configLocation>/tools/maven/checkstyle.xml</configLocation> <logViolationsToConsole>true</logViolationsToConsole> <failOnViolation>true</failOnViolation> </configuration> </plugin>
再次編譯,即可。
3. 執行kafka案例
點開工程欄,找到flink-examples模組,然後找到kafka案例,如下:
將kafka的example修改為可執行的案例,官方demo是通過打包提交到叢集的方式執行,需要傳參的,而我們直接在idea中執行,不需要穿引數。程式碼修改如下:
Properties props = new Properties(); props.put("bootstrap.servers", "mt-mdh.local:9093"); props.put("zookeeper.connect","localhost:2181"); props.put("group.id","test"); props.put("metadata.fetch.timeout.ms","10000"); props.put("metadata.max.age.ms","30000"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<KafkaEvent> input = env .addSource( new FlinkKafkaConsumer010<>( "", new KafkaEventSchema(), props) .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())) .keyBy("word") .map(new RollingAdditionMapper()); input.addSink( new FlinkKafkaProducer010<>( "bar", new KafkaEventSchema(), props)); env.execute("Kafka 0.10 Example");
然後,右鍵,run。發現,並不能順心如意的執行,還是報了一堆錯誤。。。
實際上,只需要改一些run的執行配置即可避免該錯誤。
在導航欄,run---> Edit Configurations
修改為
再執行,就ok了。
關於debug,只要run執行成功之後,直接可以debug的。。。
flink的原始碼除錯debug及閱讀經驗,敬請期待後續,文章,也可以點選原文閱讀加入浪尖知識星球。
推薦閱讀:
ofollow,noindex">Flink非同步IO第一講
點贊,然後分享給小夥伴吧~