kylin與superset整合實現資料視覺化
Apache kylin是一個開源分散式引擎,提供Hadoop之上的SQL查詢介面及多維分析(OLAP)能力以支援超大規模資料。而superset是airbnb開源的一款資料視覺化工具。
kylin在超大資料規模下仍然可以提供秒級甚至毫秒級sql響應的OLAP多維分析查詢服務。而且對伺服器記憶體的要求也不像spark sql那麼高,經過多方面的優化,資料膨脹率甚至可以控制在100%以內。它利用hive做預計算,然後建立多維的資料立方體,並存在hbase中,從而提供了實時查詢的能力。
superset也就是早先的caravel,提供了豐富的圖表供使用者配置。只要連上資料來源,勾幾個簡單的配置,或者寫點sql。使用者就可以輕易的構建基於d3、nvd3、mapbox-gl等的炫酷圖表。
我廠也是選擇了kylin和superset,遺憾的是superset支援多種資料來源,包括druid、hive、impala、sparkSQL、presto以及多種主流關係型資料庫,但是並不支援kylin。於是我們對其進行了改進。
首先觀察superset的原始碼,它後臺使用Flask App Builder搭建的,資料訪問層用sqlalchemy實現。也就是說,它本質上可以支援所有資料來源,只要實現一套kylin的dialect即可。而同時github上有一個pykylin專案,就是實現的這個dialect。這極大增強了我解決這個問題的信心。
正好前幾周,superset出了一個新的prod版本airbnb_prod.0.15.5.0。裝好它和pykylin之後,匯入kylin資料來源,成功!
但是點開sqllab想敲點sql驗證一下時,卻出了異常。
Debug了pykylin程式碼,發現get_table_names方法的入參connection實際已經是sqlalchemy的Engine物件了,這可能是最新sqlalchemy的版本升級造成的。總之,將原來的程式碼:
- def get_table_names(self, connection, schema=None, **kw):
- returnconnection.connection.list_tables()
改成:
- def get_table_names(self, engine, schema=None, **kw):
- connection = engine.contextual_connect()
- returnconnection.connection.list_tables()
即可。
順便我們看到這裡它擴充套件了sqlalchemy的list_tables方法,sqllab左上方的table選擇區還有列出所有schema的下拉框,於是我們順帶把list_schama方法也實現。connection.py新增:
- def list_schemas(self):
- route = 'tables_and_columns'
- params = {'project': self.project}
- tables = self.proxy.get(route, params=params)
- return [t['table_SCHEM'] for t in tables]
dialect.py新增:
- def get_schema_names(self, engine, schema=None, **kw):
- connection = engine.contextual_connect()
- returnconnection.connection.list_schemas()
之後執行sql還是有錯:
pykylin在每次呼叫kylin的api時會首先登入,以獲得JSESSIONID,並存入cookie中,這裡是登入失敗,檢查程式碼,發現這裡問題還挺多的,首先proxy.py中的login方法作者寫的是self.password = user應改成password。dialect.py中create_connect_args方法改為:
- def create_connect_args(self, url):
- opts = url.translate_connect_args()
- api_prefix = 'kylin/api/'
- args = {
- 'username': opts['username'],
- 'password': opts['password'],
- 'endpoint': 'http://%s:%s/%s' % (opts['host'], opts['port'], api_prefix)
- }
- args.update(url.query)
- return [], args
這樣大部分sql查詢沒有問題,但是有的查詢結果有部分值是null,這樣也會出錯。修改cursor.py的_type_mapped方法:
- def _type_mapped(self, result):
- meta = self.description
- size = len(meta)
- for i in range(0, size):
- column = meta[i]
- tpe = column[1]
- val = result[i]
- if val is None:
- pass
- elif tpe == 'DATE':
- val = parser.parse(val)
- elif tpe == 'BIGINT'or tpe == 'INT'or tpe == 'TINYINT':
- val = int(val)
- elif tpe == 'DOUBLE'or tpe == 'FLOAT':
- val = float(val)
- elif tpe == 'BOOLEAN':
- val = (val == 'true')
- result[i] = val
- return result
這樣在sqllab中執行sql基本沒問題了。
下一步開始自定義slice,定製自己的視覺化dashboard。
在這裡再次遇到問題,superset會自動把count函式計算的列設定別名叫count,而count是kylin的關鍵字,因此導致查詢失敗。修改superset的models.py的sqla_col方法:
- @property
- def sqla_col(self):
- name = self.metric_name
- if name == 'count':
- name = 'total_count'
- return literal_column(self.expression).label(name)
另外在slice中還經常會遇到pandas丟擲的KeyError異常。這是因為在superset裡面所有的關鍵字都是小寫,然而kylin返回的所有的資料metadata全是大寫,導致superset在kylin的返回結果中查詢關鍵字的時候出現找不到關鍵字的錯誤。
修改pykylin的cursor.py的execute方法。
- def execute(self, operation, parameters={}, acceptPartial=True, limit=None, offset=0):
- sql = operation % parameters
- data = {
- 'sql': sql,
- 'offset': offset,
- 'limit': limit or self.connection.limit,
- 'acceptPartial': acceptPartial,
- 'project': self.connection.project
- }
- logger.debug("QUERY KYLIN: %s" % sql)
- resp = self.connection.proxy.post('query', json=data)
- column_metas = resp['columnMetas']
- for c in column_metas:
- c['label'] = str(c['label']).lower()
- c['name'] = str(c['name']).lower()
- self.description = [
- [c['label'], c['columnTypeName'],
- c['displaySize'], 0,
- c['precision'], c['scale'], c['isNullable']]
- for c in column_metas
- ]
- self.results = [self._type_mapped(r) for r in resp['results']]
- self.rowcount = len(self.results)
- self.fetched_rows = 0
- return self.rowcount
最後,我發現在查詢的欄位包含kylin中的date型別時也會出錯。點選slice頁面右上角的query按鈕,可以檢視superset最終傳送的sql。
將它直接拷貝到kylin的insight頁面去執行,發現報錯。原來kylin的date型別只支援年月日,而superset在新增日期搜尋條件時為了實現定時重新整理圖表而在sql的日期條件中都精確到了時分秒。後來我發現superset支援在列的設定頁面為一個日期列新增自定義的格式轉換函式
於是我在這裡設定日期列格式
- TO_DATE(‘{}’, ‘yyyy-MM-dd’)
然後可以看到slice這裡sql中的該列都變成了to_date函式形式
最後的工作就是修改kylin原始碼,新增對日期函式的支援。hive sql是支援to_date等日期格式轉換函式的,kylin憑什麼不支援?
大致debug了一下kylin的原始碼,kylin處理sql的入口在server-base模組下的QueryController.java的query方法中。我發現在最終呼叫jdbc驅動執行sql之前,kylin會調QueryUtil類的massageSql方法來優化sql。主要是加上limit和offset引數。最後調內部類DefaultQueryTransformer的transform方法改掉sql中的一些通病,比如SUM(1)改成count(1)等。日期轉換函式的處理放在這後面我覺得是最合適的。
新增正則表示式,以匹配日期函式:
- private static final String TO_DATE = "(to_date|TO_DATE)\\(['|\"]([^'|\"]*)['|\"],\\s?['|\"]([^'|\"]*)['|\"]\\)";
- private static final Pattern FN_TO_DATE = Pattern.compile(TO_DATE);
新增日期轉行函式解析:
- private static String executeFN(String sql) {
- Matcher m;
- while (true) {
- m = FN_TO_DATE.matcher(sql);
- if (!m.find())
- break;
- String dateTime = m.group(2);
- String format = m.group(3);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date dt = null;
- try {
- dt = sdf.parse(dateTime);
- } catch (ParseException e) {
- logger.error("Parse date error", e);
- }
- sdf = new SimpleDateFormat(format);
- String date = sdf.format(dt);
- String begin = sql.substring(0, m.start());
- String end = sql.substring(m.end(), sql.length());
- sql = begin + "'" + date + "'" + end;
- }
- return sql;
- }
然後在massageSql方法中呼叫這個函式,kylin就可以支援上面sql的執行了