淺談PipelineDB系列一: Stream數據是如何寫到Continuous View中的
PipelineDB Version:0.9.7
PostgreSQL Version:9.5.3
PipelineDB的數據處理組件:
從上圖來看主要就是pipeline_streams,stream_fdw,Continuous View,Transform。
其實就是運用了Postgres的FDW功能來實現的stream功能。
從數據庫也能看到這個FDW
pipeline=# \des List of foreign servers Name | Owner | Foreign-data wrapper ------------------+-----------------+---------------------- pipeline_streams | unknown (OID=0) | stream_fdw (1 row)
數據流轉入下圖
可以看到數據流轉都是通過ZeroMQ來實現的(前面的版本0.8.2之前是通過TupleBuff來實現)
數據插入到Stream後然後調用ForiegnInsert,插入到初始化的IPC裏面去,在數據庫目錄下面有個pipeline/zmq
TransForm其實就是把數據的dest指向了Stream,數據庫默認有個pipeline_stream_insert其實這個是個Trigger,把tuple再扔到目標stream裏面。
或者你可以自己寫UDF,就是寫個trigger,數據可以寫到表或者別的FDW裏面,或者是自己封裝的消息隊列IPC都沒問題,這塊自由發揮的空間就比較大。
首先我們來創建個STREAM跟CV
pipeline=# create stream my_stream(x bigint,y bigint,z bigint); CREATE STREAM pipeline=# create continuous view v_1 as select x,y,z from my_stream; CREATE CONTINUOUS VIEW pipeline=#
插入一條數據:
pipeline=# insert into my_stream(x,y,z) values(1,2,3); INSERT 0 1 pipeline=# select * from v_1; x | y | z ---+---+--- 1 | 2 | 3 (1 row) pipeline=#
數據插入到CV中了,我們現在來看看PipelineDB是如何插入的。
上面有介紹了Stream就是個FDW。我們來看看他的handler(source:src/backend/pipeline/stream_fdw.c)
/*
* stream_fdw_handler
*/
Datum
stream_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *routine = makeNode(FdwRoutine);
/* Stream SELECTS (only used by continuous query procs) */
routine->GetForeignRelSize = GetStreamSize;
routine->GetForeignPaths = GetStreamPaths;
routine->GetForeignPlan = GetStreamScanPlan;
routine->BeginForeignScan = BeginStreamScan;
routine->IterateForeignScan = IterateStreamScan;
routine->ReScanForeignScan = ReScanStreamScan;
routine->EndForeignScan = EndStreamScan;
/* Streams INSERTs */
routine->PlanForeignModify = PlanStreamModify;
routine->BeginForeignModify = BeginStreamModify;
routine->ExecForeignInsert = ExecStreamInsert;
routine->EndForeignModify = EndStreamModify;
routine->ExplainForeignScan = NULL;
routine->ExplainForeignModify = NULL;
PG_RETURN_POINTER(routine);
}
主要是關註Streams Inserts這幾個函數.
每個worker process啟動的時候都會初始化一個recv_id,其實這個就是ZeroMQ的ID
數據會發送到對應的隊列裏面去,worker process就去這個IPC裏面去獲取數據
source:src/backend/pipeline/ipc/microbath.c
void microbatch_send_to_worker(microbatch_t *mb, int worker_id) { ...... worker_id = rand() % continuous_query_num_workers; } } recv_id = db_meta->db_procs[worker_id].pzmq_id; microbatch_send(mb, recv_id, async, db_meta); microbatch_reset(mb); }
首先是獲取worker_id 這個是隨機獲取的一個worker進程。stream數據隨機發到一worker process裏面去了
recv_id這個就是從初始化的IPC隊列獲取ID,數據就發送到該隊列裏面
最後就調用
pzmq_send(recv_id, buf, len, true)
數據就推送到了IPC中了。
(gdb) p recv_id $12 = 1404688165 (gdb)
這部分就是數據生產者部分。
下面就是數據消費者CV
數據接受還是通過ZMQ的API來接受的
這個主要是worker process來幹活的
srouce:src/backend/pipeline/ipc/pzmq.c&reader.c
(gdb) p *zmq_state->me $8 = {id = 1404688165, type = 7 ‘\a‘, sock = 0x1139ba0, addr = "ipc:///home/pipeline/db_0.9.7/pipeline/zmq/1404688165.sock", ‘\000‘ <repeats 965 times>} (gdb)
可以看到這個數據是從1404688165裏面獲取的 ,並且把IPC的addr也給出來了,這個就是我數據庫目錄
獲取到是個buf,然後unpack,從消息裏面獲取到對應的Tuple.
獲取到了tuple後,然後就找所有的CV跟這個stream相關的target。遍歷他們,然後執行CV中對應的SQL。
執行流程跟標準SQL差不多也是初始化執行計劃然後ExecutePlan然後endplan 。
數據會到Combiner裏面,如果是AGG還會有一系列操作的。
如果數據符合CV的SQL邏輯,那麽數據就插入到對應的物理表。
這就是Stream的一個簡單的工作原理。
謝謝
淺談PipelineDB系列一: Stream數據是如何寫到Continuous View中的