流計算技術-PipelineDB 快速入門
背景
PipelineDB基於PostgreSQL資料庫改造而來,是一款開源的流式計算資料庫。它允許我們通過sql的方式,對資料流做操作,並把操作結果持續儲存到表中。
官方介紹:
PipelineDB is built to run SQL queries continuously on streaming data. The output of these continuous queries is stored in regular tables which can be queried like any other table or view. Thus continuous queries can be thought of as very high-throughput, incrementally updated materialized views. As with any data processing system, PipelineDB is built to shine under particular workloads, and simply not intended for others.
主要特性:允許只使用 SQL 進行實時資料處理而沒有應用程式碼,相容 PostgreSQL,無 ETL,高效可持續。
大致理解就是資料不停的增量的更新,不用跑批處理。方便之處是流的處理是使用SQL語句,很方便構建資料口徑,方便與業務人員交流。
安裝
參照:ofollow,noindex">http://docs.pipelinedb.com/installation.html
下面以OSX環境介紹,從下載地址 獲得安裝包,雙機安裝。
下一步初始化PipelineDB,需指定一個目錄用於存放資料檔案。
pipeline-init -D <data directory>
接下來,使用pipeline-ctl
執行PipelineDB服務。命令:
pipeline-ctl -D <data directory> -l pipelinedb.log start
-l
引數用於指定log日誌路徑。
停止服務的命令:
pipeline-ctl -D <data directory> stop
更多關於pipeline-ctl
參考命令pipeline-ctl --help
。
另一種連線pipelineDB
的方式是使用postgresql
,注: PipelineDB的預設埠號是5432。命令是:
psql -p 5432 -h localhost pipeline
快速入門
初始化資料目錄,啟動PipelineDB服務:
pipeline-init -D <data directory> pipeline-ctl -D <data directory> -l <log directory>
連線到pipeline db:
psql -h localhost -p 5432 -d pipeline
建立STREAM:
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
建立CONTINUOUS VIEW:
CREATE CONTINUOUS VIEW wiki_stats AS SELECT hour, project, count(*) AS total_pages, sum(view_count) AS total_views, min(view_count) AS min_views, max(view_count) AS max_views, avg(view_count) AS avg_views, percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views, sum(size) AS total_bytes_served FROM wiki_stream GROUP BY hour, project;
從外部獲取資料實時寫入pipelinedb(資料量很大,隨時可以停止資料流入):
curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \ psql -h localhost -p 5432 -d pipeline -c " COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
查詢結果:
SELECT * FROM wiki_stats ORDER BY total_views DESC;
基礎操作
- 連線pipelinedb
psql -h localhost -p 5432 -d pipeline
- 命令幫助
# psql命令幫助 \? # SQL命令幫助 \h
- 列出Database
\l 或 \l+
- 建立Schema
CREATE SCHEMA dw_bihell AUTHORIZATION username;
- 列出Schema
\dn 或 \dn+
- 切換Schema
SET search_path TO dw_bihell;
- 列出表、檢視等
# 預設shema下的 \d 或 \d+ # 指定shema \dp [PATTERN] 或 \z [PATTERN] 比如 \z dw_order.*
核心概念
Streams
streams是Continuous Views的資料入口,向view推送資料,可以看做是常規資料表的一行資料,或者當做一個事件。
但與常規資料表的錶行資料有著根本不同: 存在於stream中的事件在被所有的views消費以後就會‘消失’,無法被使用者通過select
語句查詢到,即steam專門作為Continuous Views的資料輸入源而存在。
- 建立語法:
CREATE STREAM stream_name ( [{ column_name data_type [ COLLATE collation ] | LIKE parent_stream } [, ... ]] )
-- 可以直接支援json資料 CREATE STREAM dw_bihell.rt_oreder_bihell (log json); -- 或者直接接收文字(kafka發資料的時候根據分隔符分割行) CREATE STREAM dw_bihell.rt_oreder_bihell (collect_date text,record_status integer,operate_type integer,update_mask integer,order_date text,bill_date text,order_id bigint,order_type)
- 增加欄位
ALTER STREAM stream ADD COLUMN x integer;
- 刪除
DROP STREAM
- 查詢已建立STREAM
SELECT * FROM pipeline_streams() ORDER BY schema;
Continuous Views
Continuous Views是PipelineDB的基礎核心概念,類似view,從stream和table中獲得輸入資料,增量的、實時的更新資料。
- 建立語法
CREATE CONTINUOUS VIEW name AS query
其中 query是一個pg 的select 格式的語法,格式如下:
SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ] expression [ [ AS ] output_name ] [, ...] [ FROM from_item [, ...] ] [ WHERE condition ] [ GROUP BY expression [, ...] ] [ WINDOW window_name AS ( window_definition ) [, ...] ] where from_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] from_item [ NATURAL ] join_type from_item [ ON join_condition ]
- 刪除操作
DROP CONTINUOUS VIEW name
- 清資料操作
SELECT truncate_continuous_view('name');
- 檢視資料操作
SELECT * FROM pipeline_views();
- 暫定/啟動操作
SELECT activate('continuous_view_or_transform'); SELECT deactivate('continuous_view_or_transform');
important# 暫停操作會導致丟失資料,即使重新啟動後,也不能讀到暫停期間的流入資料
Continuous Transforms
Continuous Transforms是傳輸通道,不儲存資料,也不支援聚合資料操作。一般內用來做stream的通道,或者將資料流轉/儲存到一張實體表中。
- 建立語法
CREATE CONTINUOUS TRANSFORM name AS query [ THEN EXECUTE PROCEDURE function_name ( arguments ) ]
- 刪除操作
DROP CONTINUOUS TRANSFORM name
- 檢視transform:
SELECT * FROM pipeline_transforms();
- 內建的transform觸發器
CREATE CONTINUOUS TRANSFORM t AS SELECT x::int, y::int FROM stream WHERE mod(x, 2) = 0 THEN EXECUTE PROCEDURE pipeline_stream_insert('even_stream');
- 自定義觸發器
CREATE TABLE t (user text, value int); CREATE OR REPLACE FUNCTION insert_into_t() RETURNS trigger AS $$ BEGIN INSERT INTO t (user, value) VALUES (NEW.user, NEW.value); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE CONTINUOUS TRANSFORM ct AS SELECT user::text, value::int FROM stream WHERE value > 100 THEN EXECUTE PROCEDURE insert_into_t();