1. 程式人生 > >PyOdps 0.4版本釋出,從一個故事說起

PyOdps 0.4版本釋出,從一個故事說起

有這麼個故事(如有雷同,純屬巧合)。有一天,某運營同學給某開發同學一個excel檔案,裡面是個客戶清單。

“幫我查下這些使用者的消耗呢”。

開發同學掃了一眼,幾百個使用者。這個事肯定是可以辦的,但是想到麻煩程度,開發同學心裡肯定是有不少羊駝經過的啦。

“有點麻煩啊”,開發同學輕輕抱怨。

“我懂的,把這個表和ODPS裡的表join下就好了嘛。”運營同學努努嘴。

“……”。於是,開發同學把excel資料匯出成文字格式,然後dship上傳到ODPS,ODPS上編寫SQL,dship下載,大功告成。

這裡說得很輕鬆,但其實整個過程真的挺麻煩呢。要是這個過程中還要對excel中的資料進行過濾,最終結果還要繪個圖,還是需要不少時間。

但是,如果這個開發同學使用PyOdps 0.4+版本新特性,一切就都輕鬆寫意了。

為了模擬這個過程,我們拿movielens 100K的資料做例子,現在本地有一個excel表格,裡面有100個需要查詢的使用者,表格包含兩個欄位,分別是使用者ID和年齡。在ODPS上,我們有一張電影評分表,現在我們要求出這100使用者箇中年齡在20-30之間,按每個年齡來求電影評分均值,並用條形圖展現。

可以想象,這個過程如果按照前面的描述,有多麻煩。那麼用PyOdps DataFrame API呢。

首先,我們讀出本地Excel檔案。

QQ20160406_0_2x

In [14]: from odps.df import read_excel

In [15]: users = read_excel('/Users/chine/userids.xlsx')

In [16]: users.head(10)
|==========================================|   1 
/ 1 (100.00%) 0s Out[16]: id age 0 46 27 1 917 22 2 217 22 3 889 24 4 792 40 5 267 23 6 626 23 7 433 27 8 751 24 9 932 58 In [40]: users.count() |==========================================| 1 / 1 (100.00%) 0s 100

然後我們用join語句,過濾出來電影評分表中這些使用者的評分資料。

In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

In [18]: ratings.head(10)
|==========================================|   1 
/ 1 (100.00%) 2s Out[18]: user_id movie_id rating unix_timestamp 0 196 242 3 881250949 1 186 302 3 891717742 2 22 377 1 878887116 3 244 51 2 880606923 4 166 346 1 886397596 5 298 474 4 884182806 6 115 265 2 881171488 7 253 465 5 891628467 8 305 451 3 886324817 9 6 86 3 883603013 In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age] # 這裡做欄位抽取時,可以使用Collection,也可以使用lambda表示式,引數是左右兩個Collection In [26]: filter_ratings.head(10) |==========================================| 1 / 1 (100.00%) 44s Out[26]: user_id movie_id rating unix_timestamp age 0 3 350 3 889237076 23 1 3 332 1 889237224 23 2 3 327 4 889237455 23 3 3 341 1 889237055 23 4 3 317 2 889237482 23 5 3 336 1 889237198 23 6 3 322 3 889237269 23 7 3 323 2 889237269 23 8 3 339 3 889237141 23 9 3 268 3 889236961 23

然後我們就可以按年齡聚合,求出評分均值啦。繪圖也一氣呵成。

In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())

In [29]: age_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        30s
Out[29]: 
   age  rating_mean
0   20     4.002309
1   21     4.051643
2   22     3.227513
3   23     3.519174
4   24     3.481013
5   25     3.774744
6   26     3.391509
7   27     3.355130
8   28     3.382883
9   29     3.705660

In [30]: age_ratings.plot(kind='bar', rot=45)
|==========================================|   1 /  1  (100.00%)        29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>

age_ratings

超級簡單,有木有!

這裡的users其實是存在於本地的,而ratings是存在於ODPS上,使用者依然可以join這兩個Collection。其實對於0.4之前的版本,本地資料上傳的介面也很容易(但是無法使用DataFrame API來進行本地過濾),但是對於0.4版本,不管一個Collection是存在於ODPS還是本地,使用者都可以執行join和union的操作。

而這一切都源自0.4版本帶來的新特性,DataFrame API的pandas計算後端。

DataFrame API使用pandas計算

我們知道,PyOdps DataFrame API類似於pandas的介面,但還是有些許不同的,那我們為什麼不能用pandas來執行本地計算呢,這樣也能充分利用pandas的一些特性,如支援各種資料輸入。

所以,除了過去使用odps.models.Table來初始化DataFrame,我們也可以使用pandas DataFrame來初始化。

In [41]: import numpy as np

In [42]: import pandas as pd

In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))

In [45]: pandas_df
Out[45]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741
3  0.315234  0.907264  0.849361
4  0.678395  0.642199  0.746051
5  0.977872  0.841084  0.931561
6  0.903927  0.846036  0.982424
7  0.347098  0.373247  0.193810
8  0.672611  0.242942  0.381713
9  0.461411  0.687164  0.514689

In [46]: df = DataFrame(pandas_df)

In [49]: type(df)
Out[49]: odps.df.core.DataFrame

In [47]: df.head(3)
|==========================================|   1 /  1  (100.00%)         0s
Out[47]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741

In [48]: df[df.a < 0.5].a.sum()
|==========================================|   1 /  1  (100.00%)         0s
1.2770121422535428

這裡轉化成PyOdps DataFrame後,所有的計算也一樣,變成延遲執行,PyOdps DataFrame在計算前的優化也同樣適用。

這樣做的好處是,除了前面我們提到的,能結合本地和ODPS做計算外;還有個好處就是方便進行本地除錯。所以,我們可以用寫出以下程式碼:

DEBUG = True

if DEBUG:
    # 這個操作使用tunnel下載,因此速度很快。對於分割槽表,需要給出所有分割槽值。
    df = ratings[:100].to_pandas(wrap=True)
else:
    df = ratings

在DEBUG的時候,我們能夠利用PyOdps DataFrame在對原始表做切片操作時使用tunnel下載,速度很快的特性,選擇原始表的一小部分資料來作為本地測試資料。值得注意的是,本地計算通過不一定能在ODPS上也計算通過,比如自定義函式的沙箱限制

目前pandas計算後端尚不支援視窗函式。

apply和MapReduce API

使用apply對單行資料呼叫自定義函式

以前我們對於某個欄位,能呼叫map來使用自定義函式,現在結合axis=1的apply,我們能對一行資料進行操作。

In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================|   1 /  1  (100.00%)      1m44s
Out[13]: 
        rda
0  0.166667
1  0.166667
2  0.208333
3  0.208333
4  0.125000
5  0.208333
6  0.166667
7  0.208333
8  0.208333
9  0.125000

reduce為True的時候,會返回一個sequence,詳細參考文件

MapReduce API

PyOdps DataFrame API也提供MapReduce API。我們還是以movielens 100K為例子,看如何使用。

現在假設我們需要求出每部電影前兩名的評分,直接上程式碼。

from odps.df import output

@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])
def mapper(row):
    yield row.movie_id, row.title, row.rating

@output(['title', 'top_rating'], ['string', 'int'])
def reducer(keys):
    i = [0]
    def h(row, done):
        if i[0] < 2:
            yield row.movie_title, row.movie_rating
        i[0] += 1
    return h

In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)

In [10]: top_ratings.head(10)
|==========================================|   1 /  1  (100.00%)      3m48s
Out[10]: 
               title  top_rating
0   Toy Story (1995)           5
1   Toy Story (1995)           5
2   GoldenEye (1995)           5
3   GoldenEye (1995)           5
4  Four Rooms (1995)           5
5  Four Rooms (1995)           5
6  Get Shorty (1995)           5
7  Get Shorty (1995)           5
8     Copycat (1995)           5
9     Copycat (1995)           5

利用剛剛說的本地DEBUG特性,我們也能使用本地計算來驗證,計算結果能很快得出。人生苦短!

In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================|   1 /  1  (100.00%)         2s

In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[23]: 
                                               title  top_rating
0  Shanghai Triad (Yao a yao yao dao waipo qiao) ...           5
1                              Twelve Monkeys (1995)           4
2                               Seven (Se7en) (1995)           4
3                         Usual Suspects, The (1995)           5
4                                 Postino, Il (1994)           3
5                          Mr. Holland's Opus (1995)           4
6                                 Taxi Driver (1976)           5
7                                       Crumb (1994)           5
8                                   Star Wars (1977)           5
9                                   Star Wars (1977)           5

cache機制

在0.4之前的版本,我們提供了一個persist介面,來儲存執行結果。但是這個操作是個立即執行介面。現在我們提供cache介面,cache的collection會被單獨計算,但不會立即執行。

In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()

In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================|   1 /  1  (100.00%)        35s
Out[26]: 
                  title  movie_id
0  Seven (Se7en) (1995)        11
1  Event Horizon (1997)       260
2      Star Wars (1977)        50

In [27]: tmpdf.count()  # tmpdf已經被cache,所以我們能立刻計算出數量
|==========================================|   1 /  1  (100.00%)         0s
99823

記住,目前的cache介面,計算的結果還是要落地的,並不是存放在記憶體中。

而一個collection如果已經被計算過,這個過程會自動觸發cache機制,後續的計算過程會從這計算個向後進行,而不再需要從頭計算。

其他特性

PyOdps 0.4版本還帶來一些其他特性,比如join支援mapjoin(只對ODPS後端有效);Sequence上支援unique和nunique;execute_sql執行時支援設定hints,對於IPython外掛,支援使用SET來設定hints,等等。

PyOdps下一步計劃

對於PyOdps的DataFrame API來說,我們的短期目標是能完成ODPS SQL能做的所有事情,然後在這個基礎上再帶來更多SQL不容易做到的,但是卻很有用的操作。現在,除了自定義聚合函式,我們已經能基本涵蓋所有的SQL場景。

PyOdps非常年輕,期待大家來使用、提feature、貢獻程式碼。

原文連結:
http://click.aliyun.com/m/14035/