1. 程式人生 > >淺談PipelineDB系列一: Stream數據是如何寫到Continuous View中的

淺談PipelineDB系列一: Stream數據是如何寫到Continuous View中的

out 物理 獲取 handler ddr fun .cn node xpl

PipelineDB Version:0.9.7

PostgreSQL Version:9.5.3

PipelineDB的數據處理組件:

技術分享

從上圖來看主要就是pipeline_streams,stream_fdw,Continuous View,Transform。

其實就是運用了Postgres的FDW功能來實現的stream功能。

從數據庫也能看到這個FDW

pipeline=# \des
                  List of foreign servers
       Name       |      Owner      | Foreign-data wrapper
------------------+-----------------+----------------------
 pipeline_streams | unknown (OID=0) | stream_fdw
(1 row)

數據流轉入下圖

技術分享

可以看到數據流轉都是通過ZeroMQ來實現的(前面的版本0.8.2之前是通過TupleBuff來實現)

數據插入到Stream後然後調用ForiegnInsert,插入到初始化的IPC裏面去,在數據庫目錄下面有個pipeline/zmq

TransForm其實就是把數據的dest指向了Stream,數據庫默認有個pipeline_stream_insert其實這個是個Trigger,把tuple再扔到目標stream裏面。

或者你可以自己寫UDF,就是寫個trigger,數據可以寫到表或者別的FDW裏面,或者是自己封裝的消息隊列IPC都沒問題,這塊自由發揮的空間就比較大。

首先我們來創建個STREAM跟CV

pipeline=# create stream my_stream(x bigint,y bigint,z bigint);
CREATE STREAM
pipeline=# create continuous view v_1 as select x,y,z from my_stream;
CREATE CONTINUOUS VIEW
pipeline=#

插入一條數據:

pipeline=# insert into my_stream(x,y,z) values(1,2,3);
INSERT 0 1
pipeline=# select * from v_1;
 x | y | z
---+---+---
 1 | 2 | 3
(1 row)

pipeline=#

數據插入到CV中了,我們現在來看看PipelineDB是如何插入的。

上面有介紹了Stream就是個FDW。我們來看看他的handler(source:src/backend/pipeline/stream_fdw.c)

/*
 * stream_fdw_handler
 */
Datum
stream_fdw_handler(PG_FUNCTION_ARGS)
{
	FdwRoutine *routine = makeNode(FdwRoutine);

	/* Stream SELECTS (only used by continuous query procs) */
	routine->GetForeignRelSize = GetStreamSize;
	routine->GetForeignPaths = GetStreamPaths;
	routine->GetForeignPlan = GetStreamScanPlan;
	routine->BeginForeignScan = BeginStreamScan;
	routine->IterateForeignScan = IterateStreamScan;
	routine->ReScanForeignScan = ReScanStreamScan;
	routine->EndForeignScan = EndStreamScan;

	/* Streams INSERTs */
	routine->PlanForeignModify = PlanStreamModify;
	routine->BeginForeignModify = BeginStreamModify;
	routine->ExecForeignInsert = ExecStreamInsert;
	routine->EndForeignModify = EndStreamModify;

	routine->ExplainForeignScan = NULL;
	routine->ExplainForeignModify = NULL;

	PG_RETURN_POINTER(routine);
}

主要是關註Streams Inserts這幾個函數.

每個worker process啟動的時候都會初始化一個recv_id,其實這個就是ZeroMQ的ID

數據會發送到對應的隊列裏面去,worker process就去這個IPC裏面去獲取數據

source:src/backend/pipeline/ipc/microbath.c

void
microbatch_send_to_worker(microbatch_t *mb, int worker_id)
{
    ......

			worker_id = rand() % continuous_query_num_workers;
		}
	}

	recv_id = db_meta->db_procs[worker_id].pzmq_id;

	microbatch_send(mb, recv_id, async, db_meta);
	microbatch_reset(mb);
} 

首先是獲取worker_id 這個是隨機獲取的一個worker進程。stream數據隨機發到一worker process裏面去了

recv_id這個就是從初始化的IPC隊列獲取ID,數據就發送到該隊列裏面

最後就調用

pzmq_send(recv_id, buf, len, true)

數據就推送到了IPC中了。

(gdb) p	recv_id
$12 = 1404688165
(gdb)

這部分就是數據生產者部分。

下面就是數據消費者CV

數據接受還是通過ZMQ的API來接受的

這個主要是worker process來幹活的

srouce:src/backend/pipeline/ipc/pzmq.c&reader.c

(gdb) p *zmq_state->me
$8 = {id = 1404688165, type = 7 ‘\a‘, sock = 0x1139ba0, addr = "ipc:///home/pipeline/db_0.9.7/pipeline/zmq/1404688165.sock", ‘\000‘ <repeats 965 times>}
(gdb)

可以看到這個數據是從1404688165裏面獲取的 ,並且把IPC的addr也給出來了,這個就是我數據庫目錄

獲取到是個buf,然後unpack,從消息裏面獲取到對應的Tuple.

獲取到了tuple後,然後就找所有的CV跟這個stream相關的target。遍歷他們,然後執行CV中對應的SQL。

執行流程跟標準SQL差不多也是初始化執行計劃然後ExecutePlan然後endplan 。

數據會到Combiner裏面,如果是AGG還會有一系列操作的。

如果數據符合CV的SQL邏輯,那麽數據就插入到對應的物理表。

這就是Stream的一個簡單的工作原理。

謝謝

淺談PipelineDB系列一: Stream數據是如何寫到Continuous View中的