1. 程式人生 > >Python NumPy-快速處理資料

Python NumPy-快速處理資料

2.2 ufunc運算

ufunc是universal function的縮寫,它是一種能對陣列的每個元素進行操作的函式。NumPy內建的許多ufunc函式都是在C語言級別實現的,因此它們的計算速度非常快。讓我們來看一個例子:

>>> x = np.linspace(0, 2*np.pi, 10)
# 對陣列x中的每個元素進行正弦計算,返回一個同樣大小的新陣列
>>> y = np.sin(x)
>>> y
array([  0.00000000e+00,   6.42787610e-01,   9.84807753e-01,
         8.66025404e-01,   3.42020143e-01,  -3.42020143e-01,
-8.66025404e-01, -9.84807753e-01, -6.42787610e-01, -2.44921271e-16])

先用linspace產生一個從0到2*PI的等距離的10個數,然後將其傳遞給sin函式,由於np.sin是一個ufunc函式,因此它對x中的每個元素求正弦值,然後將結果返回,並且賦值給y。計算之後x中的值並沒有改變,而是新建立了一個數組儲存結果。如果我們希望將sin函式所計算的結果直接覆蓋到陣列x上去的話,可以將要被覆蓋的陣列作為第二個引數傳遞給ufunc函式。例如::

>>> t = np.sin
(x,x) >>> x array([ 0.00000000e+00, 6.42787610e-01, 9.84807753e-01, 8.66025404e-01, 3.42020143e-01, -3.42020143e-01, -8.66025404e-01, -9.84807753e-01, -6.42787610e-01, -2.44921271e-16]) >>> id(t) == id(x) True

sin函式的第二個引數也是x,那麼它所做的事情就是對x中的每給值求正弦值,並且把結果儲存到x中的對應的位置中。此時函式的返回值仍然是整個計算的結果,只不過它就是x,因此兩個變數的id是相同的(變數t和變數x指向同一塊記憶體區域)。

我用下面這個小程式,比較了一下numpy.math和Python標準庫的math.sin的計算速度::

import time
import math
import numpy as np

x = [i * 0.001 for i in xrange(1000000)]
start = time.clock()
for i, t in enumerate(x):
    x[i] = math.sin(t)
print "math.sin:", time.clock() - start

x = [i * 0.001 for i in xrange(1000000)]
x = np.array(x)
start = time.clock()
np.sin(x,x)
print "numpy.sin:", time.clock() - start

# 輸出
# math.sin: 1.15426932753
# numpy.sin: 0.0882399858083

在我的電腦上計算100萬次正弦值,numpy.sin比math.sin快10倍多。這得利於numpy.sin在C語言級別的迴圈計算。numpy.sin同樣也支援對單個數值求正弦,例如:numpy.sin(0.5)。不過值得注意的是,對單個數的計算math.sin則比numpy.sin快得多了,讓我們看下面這個測試程式:

x = [i * 0.001 for i in xrange(1000000)]
start = time.clock()
for i, t in enumerate(x):
    x[i] = np.sin(t)
print "numpy.sin loop:", time.clock() - start

# 輸出
# numpy.sin loop: 5.72166965355

請注意numpy.sin的計算速度只有math.sin的1/5。這是因為numpy.sin為了同時支援陣列和單個值的計算,其C語言的內部實現要比math.sin複雜很多,如果我們同樣在Python級別進行迴圈的話,就會看出其中的差別了。此外,numpy.sin返回的數的型別和math.sin返回的型別有所不同,math.sin返回的是Python的標準float型別,而numpy.sin則返回一個numpy.float64型別:

>>> type(math.sin(0.5))
<type 'float'>
>>> type(np.sin(0.5))
<type 'numpy.float64'>

通過上面的例子我們瞭解瞭如何最有效率地使用math庫和numpy庫中的數學函式。因為它們各有長短,因此在匯入時不建議使用*號全部載入,而是應該使用import numpy as np的方式載入,這樣我們可以根據需要選擇合適的函式呼叫。

NumPy中有眾多的ufunc函式為我們提供各式各樣的計算。除了sin這種單輸入函式之外,還有許多多個輸入的函式,add函式就是一個最常用的例子。先來看一個例子:

>>> a = np.arange(0,4)
>>> a
array([0, 1, 2, 3])
>>> b = np.arange(1,5)
>>> b
array([1, 2, 3, 4])
>>> np.add(a,b)
array([1, 3, 5, 7])
>>> np.add(a,b,a)
array([1, 3, 5, 7])
>>> a
array([1, 3, 5, 7])

add函式返回一個新的陣列,此陣列的每個元素都為兩個引數陣列的對應元素之和。它接受第3個引數指定計算結果所要寫入的陣列,如果指定的話,add函式就不再產生新的陣列。

由於Python的操作符過載功能,計算兩個陣列相加可以簡單地寫為a+b,而np.add(a,b,a)則可以用a+=b來表示。下面是陣列的運算子和其對應的ufunc函式的一個列表,注意除號"/"的意義根據是否啟用__future__.division有所不同。

y = x1 + x2: add(x1, x2 [, y])
y = x1 - x2: subtract(x1, x2 [, y])
y = x1 * x2: multiply (x1, x2 [, y])
y = x1 / x2: divide (x1, x2 [, y]), 如果兩個陣列的元素為整數,那麼用整數除法
y = x1 / x2: true divide (x1, x2 [, y]), 總是返回精確的商
y = x1 // x2: floor divide (x1, x2 [, y]), 總是對返回值取整
y = -x: negative(x [,y])
y = x1**x2: power(x1, x2 [, y])
y = x1 % x2: remainder(x1, x2 [, y]), mod(x1, x2, [, y])

陣列物件支援這些操作符,極大地簡化了算式的編寫,不過要注意如果你的算式很複雜,並且要運算的陣列很大的話,會因為產生大量的中間結果而降低程式的運算效率。例如:假設a b c三個陣列採用算式x=a*b+c計算,那麼它相當於:

t = a * b
x = t + c
del t

也就是說需要產生一個數組t儲存乘法的計算結果,然後再產生最後的結果陣列x。我們可以通過手工將一個算式分解為x = a*b; x += c,以減少一次記憶體分配。

通過組合標準的ufunc函式的呼叫,可以實現各種算式的陣列計算。不過有些時候這種算式不易編寫,而針對每個元素的計算函式卻很容易用Python實現,這時可以用frompyfunc函式將一個計算單個元素的函式轉換成ufunc函式。這樣就可以方便地用所產生的ufunc函式對陣列進行計算了。讓我們來看一個例子。

我們想用一個分段函式描述三角波,三角波的樣子如圖2.4所示:

_images/numpy_intro_01.png

圖2.4 三角波可以用分段函式進行計算

我們很容易根據上圖所示寫出如下的計算三角波某點y座標的函式:

def triangle_wave(x, c, c0, hc):
    x = x - int(x) # 三角波的週期為1,因此只取x座標的小數部分進行計算
    if x >= c: r = 0.0
    elif x < c0: r = x / c0 * hc
    else: r = (c-x) / (c-c0) * hc
    return r

顯然triangle_wave函式只能計算單個數值,不能對陣列直接進行處理。我們可以用下面的方法先使用列表包容(List comprehension),計算出一個list,然後用array函式將列表轉換為陣列:

x = np.linspace(0, 2, 1000)
y = np.array([triangle_wave(t, 0.6, 0.4, 1.0) for t in x])

這種做法每次都需要使用列表包容語法呼叫函式,對於多維陣列是很麻煩的。讓我們來看看如何用frompyfunc函式來解決這個問題:

triangle_ufunc = np.frompyfunc( lambda x: triangle_wave(x, 0.6, 0.4, 1.0), 1, 1)
y2 = triangle_ufunc(x)

frompyfunc的呼叫格式為frompyfunc(func, nin, nout),其中func是計算單個元素的函式,nin是此函式的輸入引數的個數,nout是此函式的返回值的個數。雖然triangle_wave函式有4個引數,但是由於後三個c, c0, hc在整個計算中值都是固定的,因此所產生的ufunc函式其實只有一個引數。為了滿足這個條件,我們用一個lambda函式對triangle_wave的引數進行一次包裝。這樣傳入frompyfunc的函式就只有一個引數了。這樣子做,效率並不是太高,另外還有一種方法:

def triangle_func(c, c0, hc):
    def trifunc(x):
        x = x - int(x) # 三角波的週期為1,因此只取x座標的小數部分進行計算
        if x >= c: r = 0.0
        elif x < c0: r = x / c0 * hc
        else: r = (c-x) / (c-c0) * hc
        return r

    # 用trifunc函式建立一個ufunc函式,可以直接對陣列進行計算, 不過通過此函式
    # 計算得到的是一個Object陣列,需要進行型別轉換
    return np.frompyfunc(trifunc, 1, 1)

y2 = triangle_func(0.6, 0.4, 1.0)(x)

我們通過函式triangle_func包裝三角波的三個引數,在其內部定義一個計算三角波的函式trifunc,trifunc函式在呼叫時會採用triangle_func的引數進行計算。最後triangle_func返回用frompyfunc轉換結果。

值得注意的是用frompyfunc得到的函式計算出的陣列元素的型別為object,因為frompyfunc函式無法保證Python函式返回的資料型別都完全一致。因此還需要再次 y2.astype(np.float64)將其轉換為雙精度浮點陣列。

2.2.1 廣播

當我們使用ufunc函式對兩個陣列進行計算時,ufunc函式會對這兩個陣列的對應元素進行計算,因此它要求這兩個陣列有相同的大小(shape相同)。如果兩個陣列的shape不同的話,會進行如下的廣播(broadcasting)處理:

  1. 讓所有輸入陣列都向其中shape最長的陣列看齊,shape中不足的部分都通過在前面加1補齊
  2. 輸出陣列的shape是輸入陣列shape的各個軸上的最大值
  3. 如果輸入陣列的某個軸和輸出陣列的對應軸的長度相同或者其長度為1時,這個陣列能夠用來計算,否則出錯
  4. 當輸入陣列的某個軸的長度為1時,沿著此軸運算時都用此軸上的第一組值

上述4條規則理解起來可能比較費勁,讓我們來看一個實際的例子。

先建立一個二維陣列a,其shape為(6,1):

>>> a = np.arange(0, 60, 10).reshape(-1, 1)
>>> a
array([[ 0], [10], [20], [30], [40], [50]])
>>> a.shape
(6, 1)

再建立一維陣列b,其shape為(5,):

>>> b = np.arange(0, 5)
>>> b
array([0, 1, 2, 3, 4])
>>> b.shape
(5,)

計算a和b的和,得到一個加法表,它相當於計算a,b中所有元素組的和,得到一個shape為(6,5)的陣列:

>>> c = a + b
>>> c
array([[ 0,  1,  2,  3,  4],
       [10, 11, 12, 13, 14],
       [20, 21, 22, 23, 24],
       [30, 31, 32, 33, 34],
       [40, 41, 42, 43, 44],
       [50, 51, 52, 53, 54]])
>>> c.shape
(6, 5)

由於a和b的shape長度(也就是ndim屬性)不同,根據規則1,需要讓b的shape向a對齊,於是將b的shape前面加1,補齊為(1,5)。相當於做了如下計算:

>>> b.shape=1,5
>>> b
array([[0, 1, 2, 3, 4]])

這樣加法運算的兩個輸入陣列的shape分別為(6,1)和(1,5),根據規則2,輸出陣列的各個軸的長度為輸入陣列各個軸上的長度的最大值,可知輸出陣列的shape為(6,5)。

由於b的第0軸上的長度為1,而a的第0軸上的長度為6,因此為了讓它們在第0軸上能夠相加,需要將b在第0軸上的長度擴充套件為6,這相當於:

>>> b = b.repeat(6,axis=0)
>>> b
array([[0, 1, 2, 3, 4],
       [0, 1, 2, 3, 4],
       [0, 1, 2, 3, 4],
       [0, 1, 2, 3, 4],
       [0, 1, 2, 3, 4],
       [0, 1, 2, 3, 4]])

由於a的第1軸的長度為1,而b的第一軸長度為5,因此為了讓它們在第1軸上能夠相加,需要將a在第1軸上的長度擴充套件為5,這相當於:

>>> a = a.repeat(5, axis=1)
>>> a
array([[ 0,  0,  0,  0,  0],
       [10, 10, 10, 10, 10],
       [20, 20, 20, 20, 20],
       [30, 30, 30, 30, 30],
       [40, 40, 40, 40, 40],
       [50, 50, 50, 50, 50]])

經過上述處理之後,a和b就可以按對應元素進行相加運算了。

當然,numpy在執行a+b運算時,其內部並不會真正將長度為1的軸用repeat函式進行擴充套件,如果這樣做的話就太浪費空間了。

由於這種廣播計算很常用,因此numpy提供了一個快速產生如上面a,b陣列的方法: ogrid物件:

>>> x,y = np.ogrid[0:5,0:5]
>>> x
array([[0],
       [1],
       [2],
       [3],
       [4]])
>>> y
array([[0, 1, 2, 3, 4]])

ogrid是一個很有趣的物件,它像一個多維陣列一樣,用切片組元作為下標進行存取,返回的是一組可以用來廣播計算的陣列。其切片下標有兩種形式:

  • 開始值:結束值:步長,和np.arange(開始值, 結束值, 步長)類似

  • 開始值:結束值:長度j,當第三個引數為虛數時,它表示返回的陣列的長度,和np.linspace(開始值, 結束值, 長度)類似:

    >>> x, y = np.ogrid[0:1:4j, 0:1:3j]
    >>> x
    array([[ 0.        ],
           [ 0.33333333],
           [ 0.66666667],
           [ 1.        ]])
    >>> y
    array([[ 0. ,  0.5,  1. ]])
    

ogrid為什麼不是函式

根據Python的語法,只有在中括號中才能使用用冒號隔開的切片語法,如果ogrid是函式的話,那麼這些切片必須使用slice函式建立,這顯然會增加程式碼的長度。

利用ogrid的返回值,我能很容易計算x, y網格面上各點的值,或者x, y, z網格體上各點的值。下面是繪製三維曲面 x * exp(x**2 - y**2) 的程式:

import numpy as np
from enthought.mayavi import mlab

x, y = np.ogrid[-2:2:20j, -2:2:20j]
z = x * np.exp( - x**2 - y**2)

pl = mlab.surf(x, y, z, warp_scale="auto")
mlab.axes(xlabel='x', ylabel='y', zlabel='z')
mlab.outline(pl)

此程式使用mayavi的mlab庫快速繪製如圖2.5所示的3D曲面,關於mlab的相關內容將在今後的章節進行介紹。

_images/numpy_intro_04.png

圖2.5 使用ogrid建立的三維曲面

2.2.2 ufunc的方法

ufunc函式本身還有些方法,這些方法只對兩個輸入一個輸出的ufunc函式有效,其它的ufunc物件呼叫這些方法時會丟擲ValueError異常。

reduce 方法和Python的reduce函式類似,它沿著axis軸對array進行操作,相當於將<op>運算子插入到沿axis軸的所有子陣列或者元素當中。

<op>.reduce (array=, axis=0, dtype=None)

例如:

>>> np.add.reduce([1,2,3]) # 1 + 2 + 3
6
>>> np.add.reduce([[1,2,3],[4,5,6]], axis=1) # 1,4 + 2,5 + 3,6
array([ 6, 15])

accumulate 方法和reduce方法類似,只是它返回的陣列和輸入的陣列的shape相同,儲存所有的中間計算結果:

>>> np.add.accumulate([1,2,3])
array([1, 3, 6])
>>> np.add.accumulate([[1,2,3],[4,5,6]], axis=1)
array([[ 1,  3,  6],
       [ 4,  9, 15]])

reduceat 方法計算多組reduce的結果,通過indices引數指定一系列reduce的起始和終了位置。reduceat的計算有些特別,讓我們通過一個例子來解釋一下:

>>> a = np.array([1,2,3,4])
>>> result = np.add.reduceat