1. 程式人生 > >實時標籤開發——從零開始搭建實時使用者畫像(五)

實時標籤開發——從零開始搭建實時使用者畫像(五)

![](https://img2020.cnblogs.com/blog/1089984/202006/1089984-20200611082156851-918533310.png) ​ # 資料接入 資料的接入可以通過將資料實時寫入Kafka進行接入,不管是直接的寫入還是通過oracle和mysql的實時接入方式,比如oracle的ogg,mysql的binlog #### ogg Golden Gate(簡稱OGG)提供異構環境下交易資料的實時捕捉、變換、投遞。 通過OGG可以實時的將oracle中的資料寫入Kafka中。 ![](https://img2020.cnblogs.com/blog/1089984/202006/1089984-20200611082218398-1570484911.png) 對生產系統影響小:實時讀取交易日誌,以低資源佔用實現大交易量資料實時複製 以交易為單位複製,保證交易一致性:只同步已提交的資料 高效能 - 智慧的交易重組和操作合併 - 使用資料庫本地介面訪問 - 並行處理體系 #### binlog MySQL 的二進位制日誌 binlog 可以說是 MySQL 最重要的日誌,它記錄了所有的 `DDL` 和 `DML` 語句(除了資料查詢語句select、show等),**以事件形式記錄**,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。binlog 的主要目的是**複製和恢復**。 ![](https://img2020.cnblogs.com/blog/1089984/202006/1089984-20200611082313838-255265173.png) 通過這些手段,可以將資料同步到kafka也就是我們的實時系統中來。 #### Flink接入Kafka資料 Apache Kafka Connector可以方便對kafka資料的接入。 依賴 ``` ``` ##### 構建FlinkKafkaConsumer 必須有的: 1.topic名稱 2.用於反序列化Kafka資料的DeserializationSchema / KafkaDeserializationSchema 3.配置引數:“bootstrap.servers” “group.id” (kafka0.8還需要 “zookeeper.connect”) ``` val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")// only required for Kafka 0.8properties.setProperty("zookeeper.connect", "localhost:2181")properties.setProperty("group.id", "test")stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print() ``` ##### 時間戳和水印 在許多情況下,記錄的時間戳(顯式或隱式)嵌入記錄本身。另外,使用者可能想要週期性地或以不規則的方式發出水印。 我們可以定義好Timestamp Extractors / Watermark Emitters,通過以下方式將其傳遞給消費者 ``` val env = StreamExecutionEnvironment.getExecutionEnvironment()val myConsumer = new FlinkKafkaConsumer[String](...)myConsumer.setStartFromEarliest() // start from the earliest record possiblemyConsumer.setStartFromLatest() // start from the latest recordmyConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)myConsumer.setStartFromGroupOffsets() // the default behaviour//指定位置//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)val stream = env.addSource(myConsumer) ``` ##### 檢查點 啟用Flink的檢查點後,Flink Kafka Consumer將使用主題中的記錄,並以一致的方式定期檢查其所有Kafka偏移以及其他操作的狀態。如果作業失敗,Flink會將流式程式恢復到最新檢查點的狀態,並從儲存在檢查點中的偏移量開始重新使用Kafka的記錄。 如果禁用了檢查點,則Flink Kafka Consumer依賴於內部使用的Kafka客戶端的自動定期偏移提交功能。 如果啟用了檢查點,則Flink Kafka Consumer將在檢查點完成時提交儲存在檢查點狀態中的偏移量。 ``` val env = StreamExecutionEnvironment.getExecutionEnvironment()env.enableCheckpointing(5000) // checkpoint every 5000 msecs ``` Flink消費Kafka完整程式碼: ``` import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaConsumer { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); //構建FlinkKafkaConsumer FlinkKafka