1. 程式人生 > >kylin與superset整合實現資料視覺化

kylin與superset整合實現資料視覺化

Apache kylin是一個開源分散式引擎,提供Hadoop之上的SQL查詢介面及多維分析(OLAP)能力以支援超大規模資料。而superset是airbnb開源的一款資料視覺化工具。

kylin在超大資料規模下仍然可以提供秒級甚至毫秒級sql響應的OLAP多維分析查詢服務。而且對伺服器記憶體的要求也不像spark sql那麼高,經過多方面的優化,資料膨脹率甚至可以控制在100%以內。它利用hive做預計算,然後建立多維的資料立方體,並存在hbase中,從而提供了實時查詢的能力。

superset也就是早先的caravel,提供了豐富的圖表供使用者配置。只要連上資料來源,勾幾個簡單的配置,或者寫點sql。使用者就可以輕易的構建基於d3、nvd3、mapbox-gl等的炫酷圖表。

我廠也是選擇了kylin和superset,遺憾的是superset支援多種資料來源,包括druid、hive、impala、sparkSQL、presto以及多種主流關係型資料庫,但是並不支援kylin。於是我們對其進行了改進。

首先觀察superset的原始碼,它後臺使用Flask App Builder搭建的,資料訪問層用sqlalchemy實現。也就是說,它本質上可以支援所有資料來源,只要實現一套kylin的dialect即可。而同時github上有一個pykylin專案,就是實現的這個dialect。這極大增強了我解決這個問題的信心。

正好前幾周,superset出了一個新的prod版本airbnb_prod.0.15.5.0。裝好它和pykylin之後,匯入kylin資料來源,成功!

但是點開sqllab想敲點sql驗證一下時,卻出了異常。

Debug了pykylin程式碼,發現get_table_names方法的入參connection實際已經是sqlalchemy的Engine物件了,這可能是最新sqlalchemy的版本升級造成的。總之,將原來的程式碼:

  1. def get_table_names(self, connectionschema=None, **kw): 
  2. returnconnection.connection.list_tables() 

改成:

  1. def get_table_names(self, engine, schema=None, **kw): 
  2. connection = engine.contextual_connect() 
  3. returnconnection.connection.list_tables() 

即可。

順便我們看到這裡它擴充套件了sqlalchemy的list_tables方法,sqllab左上方的table選擇區還有列出所有schema的下拉框,於是我們順帶把list_schama方法也實現。connection.py新增:

  1. def list_schemas(self):   
  2.         route = 'tables_and_columns'
  3.         params = {'project': self.project} 
  4.         tables = self.proxy.get(route, params=params) 
  5.         return [t['table_SCHEM'for t in tables] 

dialect.py新增:

  1. def get_schema_names(self, engine, schema=None, **kw): 
  2. connection = engine.contextual_connect() 
  3. returnconnection.connection.list_schemas() 

之後執行sql還是有錯:

pykylin在每次呼叫kylin的api時會首先登入,以獲得JSESSIONID,並存入cookie中,這裡是登入失敗,檢查程式碼,發現這裡問題還挺多的,首先proxy.py中的login方法作者寫的是self.password = user應改成password。dialect.py中create_connect_args方法改為:

  1. def create_connect_args(self, url):   
  2.         opts = url.translate_connect_args() 
  3.         api_prefix = 'kylin/api/'
  4.         args = { 
  5.             'username': opts['username'], 
  6.             'password': opts['password'], 
  7.             'endpoint''http://%s:%s/%s' % (opts['host'], opts['port'], api_prefix) 
  8.         } 
  9.         args.update(url.query) 
  10.         return [], args 

這樣大部分sql查詢沒有問題,但是有的查詢結果有部分值是null,這樣也會出錯。修改cursor.py的_type_mapped方法:

  1. def _type_mapped(self, result):   
  2.         meta = self.description 
  3.         size = len(meta) 
  4.         for i in range(0, size): 
  5.             column = meta[i] 
  6.             tpe = column[1] 
  7.             val = result[i] 
  8.             if val is None: 
  9.                 pass 
  10.             elif tpe == 'DATE'
  11.                 val = parser.parse(val) 
  12.             elif tpe == 'BIGINT'or tpe == 'INT'or tpe == 'TINYINT'
  13.                 val = int(val) 
  14.             elif tpe == 'DOUBLE'or tpe == 'FLOAT'
  15.                 val = float(val) 
  16.             elif tpe == 'BOOLEAN'
  17.                 val = (val == 'true'
  18.             result[i] = val 
  19.         return result 

這樣在sqllab中執行sql基本沒問題了。

下一步開始自定義slice,定製自己的視覺化dashboard。

在這裡再次遇到問題,superset會自動把count函式計算的列設定別名叫count,而count是kylin的關鍵字,因此導致查詢失敗。修改superset的models.py的sqla_col方法:

  1. @property 
  2.    def sqla_col(self): 
  3.        name = self.metric_name 
  4.        if name == 'count'
  5.            name = 'total_count'
  6.        return literal_column(self.expression).label(name

另外在slice中還經常會遇到pandas丟擲的KeyError異常。這是因為在superset裡面所有的關鍵字都是小寫,然而kylin返回的所有的資料metadata全是大寫,導致superset在kylin的返回結果中查詢關鍵字的時候出現找不到關鍵字的錯誤。

修改pykylin的cursor.py的execute方法。

  1. def execute(self, operation, parameters={}, acceptPartial=True, limit=None, offset=0):   
  2.         sql = operation % parameters 
  3.         data = { 
  4.             'sql': sql, 
  5.             'offset': offset, 
  6.             'limit': limit or self.connection.limit, 
  7.             'acceptPartial': acceptPartial, 
  8.             'project': self.connection.project 
  9.         } 
  10.         logger.debug("QUERY KYLIN: %s" % sql) 
  11.         resp = self.connection.proxy.post('query', json=data) 
  12.         column_metas = resp['columnMetas'
  13.         for c in column_metas: 
  14.             c['label'] = str(c['label']).lower() 
  15.             c['name'] = str(c['name']).lower() 
  16.         self.description = [ 
  17.             [c['label'], c['columnTypeName'], 
  18.              c['displaySize'], 0, 
  19.              c['precision'], c['scale'], c['isNullable']] 
  20.             for c in column_metas 
  21.         ] 
  22.         self.results = [self._type_mapped(r) for r in resp['results']] 
  23.         self.rowcount = len(self.results) 
  24.         self.fetched_rows = 0 
  25.         return self.rowcount 

最後,我發現在查詢的欄位包含kylin中的date型別時也會出錯。點選slice頁面右上角的query按鈕,可以檢視superset最終傳送的sql。

將它直接拷貝到kylin的insight頁面去執行,發現報錯。原來kylin的date型別只支援年月日,而superset在新增日期搜尋條件時為了實現定時重新整理圖表而在sql的日期條件中都精確到了時分秒。後來我發現superset支援在列的設定頁面為一個日期列新增自定義的格式轉換函式

於是我在這裡設定日期列格式

  1. TO_DATE(‘{}’, ‘yyyy-MM-dd’) 

然後可以看到slice這裡sql中的該列都變成了to_date函式形式

最後的工作就是修改kylin原始碼,新增對日期函式的支援。hive sql是支援to_date等日期格式轉換函式的,kylin憑什麼不支援?

大致debug了一下kylin的原始碼,kylin處理sql的入口在server-base模組下的QueryController.java的query方法中。我發現在最終呼叫jdbc驅動執行sql之前,kylin會調QueryUtil類的massageSql方法來優化sql。主要是加上limit和offset引數。最後調內部類DefaultQueryTransformer的transform方法改掉sql中的一些通病,比如SUM(1)改成count(1)等。日期轉換函式的處理放在這後面我覺得是最合適的。

新增正則表示式,以匹配日期函式:

  1. private static final String TO_DATE = "(to_date|TO_DATE)\\(['|\"]([^'|\"]*)['|\"],\\s?['|\"]([^'|\"]*)['|\"]\\)"
  2. private static final Pattern FN_TO_DATE = Pattern.compile(TO_DATE); 

新增日期轉行函式解析:

  1. private static String executeFN(String sql) {   
  2.             Matcher m; 
  3.             while (true) { 
  4.                 m = FN_TO_DATE.matcher(sql); 
  5.                 if (!m.find()) 
  6.                     break; 
  7.                 String dateTime = m.group(2); 
  8.                 String format = m.group(3); 
  9.                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
  10.                 Date dt = null
  11.                 try { 
  12.                     dt = sdf.parse(dateTime); 
  13.                 } catch (ParseException e) { 
  14.                     logger.error("Parse date error", e); 
  15.                 } 
  16.                 sdf = new SimpleDateFormat(format); 
  17.                 String date = sdf.format(dt); 
  18.                 String begin = sql.substring(0, m.start()); 
  19.                 String end = sql.substring(m.end(), sql.length()); 
  20.                 sql = begin + "'" + date + "'" + end
  21.             } 
  22.             return sql; 
  23.         } 

然後在massageSql方法中呼叫這個函式,kylin就可以支援上面sql的執行了