1. 程式人生 > >利用pandas進行大檔案計數處理

利用pandas進行大檔案計數處理

Pandas讀取大檔案

  要處理的是由探測器讀出的脈衝訊號,一組資料為兩列,一列為時間,一列為脈衝能量,資料量在千萬級,為了有一個直接的認識,先使用Pandas讀取一些
import pandas as pd
data = pd.read_table('filename.txt', iterator=True)
chunk = data.get_chunk(5) 

而輸出是這樣的:
Out[4]:
332.977889999979 -0.0164794921875
0 332.97790 -0.022278
1 332.97791 -0.026855
2 332.97792 -0.030518
3 332.97793 -0.045776
4 332.97794 -0.032654

DataFram基本用法

這裡,data只是個容器,pandas.io.parsers.TextFileReader。
使用astype可以實現dataframe欄位型別轉換
輸出資料中,每組資料會多處一行,因為get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在讀取過程中並沒有指定DataFrame的columns,因此在get_chunk過程中,預設將第一組資料作為columns。因此需要在讀取過程中指定names即DataFrame的columns。

import pandas as pd
data = pd.read_table('filename
.txt', iterator=Truenames=['time', 'energe'])
chunk = data.get_chunk(5) data['energe'] = df['energe'].astype('int')

輸出為
Out[6]:

index time energe
0 332.97789 -0.016479
1 332.97790 -0.022278
2 332.97791 -0.026855
3 332.97792 -0.030518
4 332.97793 -0.045776

DataFram儲存和索引

這裡講一下DataFrame這個格式,與一般二維資料不同(二維列表等),DataFrame既有行索引又有列索引,因此在建立一個DataFrame資料是
DataFrame(data, columns=[‘year’, ‘month’, ‘day’],
index=[‘one’, ‘two’, ‘three’])

year month day
0 2010 4 1
1 2011 5 2
2 2012 6 3
3 2013 7 5
4 2014 8 9

而pd.read_table中的names就是指定DataFrame的columns,而index自動設定。
而DataFrame的索引格式有很多

型別 說明 例子
obj[val] 選取單列或者一組列
obj.ix[val] 選取單個行或者一組行
obj.ix[:,val] 選取單個列或列子集
obj.ix[val1, val2] 同時選取行和列
reindex方法 將一個或多個軸匹配到新索引
xs方法 根據標籤選取單行或單列,返回一個Series
icol,lrow方法 根據整數位置選取單列或單行,返回一個Series
get_value,set_value 根據行標籤列標籤選取單個值

exp: In[1]:data[:2]
Out[2]:

year month day
0 2010 4 1
1 2011 5 2

In[2]:data[data[‘month’]>5]
Out[2]:

year month day
2 2012 6 3
4 2014 8 9

如果我們直接把data拿來比較的話,相當於data中所有的標量元素
In[3]:data[data<6]=0
Out[3]:

year month day
0 2010 0 0
1 2011 0 0
2 2012 6 0
3 2013 7 0
4 2014 8 9

Pandas運算

series = data.ix[0]
data - series

Out:

year month day
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 4
4 4 4 8

DataFrame與Series之間運算會將Series索引匹配到DataFrame的列,然後沿行一直向下廣播
如果令series1 = data[‘year’]
data.sub(series1,axis=0)
則每一列都減去該series1,axis為希望匹配的軸,=0行索引,即匹配列,=1列索引,則按行匹配。

DataFrame的一些函式方法

這個就有很多了,比如排序和排名;求和、平均數以及方差、協方差等數學方法;還有就是唯一值(類似於集合)、值計數和成員資格等方法。
當然還有一些更高階的屬性,用的時候再看吧

資料處理

在得到資料樣式後我們先一次性讀取資料

start = time.time()
data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe'])
end = time.time()
data.index
print("The time is %f s" % (end - start))
plus = data['energe']
plus[plus < 0] = 0

The time is 29.403917 s
RangeIndex(start=0, stop=68319232, step=1)

對於一個2G大小,千萬級的資料,這個讀取速度還是挺快的。之前使用matlab load用時160多s,但是不知道這個是否把資料完全讀取了。然後只抽取脈衝訊號,將負值歸0,因為會出現一定的電子噪聲從而產生一定負值。
然後就需要定位脈衝訊號中的能峰了,也就是findpeaks
這裡用到了scipy.signal中的find_peaks_cwt,具體用法可以參見官方文件
peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,輸入第一個為資料,第二個為窗函式,也就是在這個寬度的能窗內尋找峰,我是這樣理解的。剛開始以為是資料的另一維座標,結果找了半天沒結果。不過事實上這個找的確定也挺慢的。
50w條的資料,找了足足7分鐘,我這一個資料3000w條不得找半個多小時,而各種資料有好幾十,恩。。這樣是不行的,於是想到了並行的方法。這個下篇文章會講到,也就是把資料按照chunksize讀取,然後同時交給(map)幾個程序同時尋峰,尋完後返回(reduce)一起計數,計數的同時,子程序再此尋峰。

在處理的時候碰到我自己的破 筆記本由於記憶體原因不能load這個資料,並且想著每次copy這麼大資料好麻煩,就把一個整體資料檔案分割成了幾個部分,先對方法進行一定的實驗,時間快,比較方便。

import pandas as pd


def split_file(filename, size):
    name = filename.split('.')[0]
    data = pd.read_table(filename, chunksize=size, names=['time', 'intension'])
    i = 1
    for piece in data:
        outname = name + str(i) + '.csv'
        piece.to_csv(outname, index=False, names = ['time', 'intension'])
        i += 1

def split_csvfile(filename, size):
    name = filename.split('.')[0]
    data = pd.read_csv(filename, chunksize=size, names=['time', 'intension'])
    i = 1
    for piece in data:
        outname = name + str(i) + '.csv'
        piece = piece['intension']
        piece.to_csv(outname, index=False)
        i += 1

額..使用並行尋峰通過map/reduce的思想來解決提升效率這個想法,很早就實現了,但是,由於效果不是特別理想,所以放那也就忘了,今天整理程式碼來看了下當時記的些筆記,然後竟然發現有個評論…..我唯一收到的評論竟然是“催稿”=。=。想一想還是把下面的工作記錄下來,免得自己後來完全忘記了。

rom scipy import signal
import os
import time
import pandas as pd
import numpy as np
from multiprocessing import Pool
import matplotlib.pylab as plt
from functools import partial


def findpeak(pluse):
    pluse[pluse < 0.05] = 0
    print('Sub process %s.' % os.getpid())
    start = time.time()
    peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10))  # 返回一個列表
    end = time.time()
    print("The time is %f s" % (end - start))
    pks = [pluse[x] for x in peaks]
    return pks


def histcnt(pks, edge=None, channel=None):
    cnt = plt.hist(pks, edge)
    res = pd.DataFrame(cnt[0], index=channel, columns=['cnt'])
    return res


if __name__ == '__main__':
    with Pool(processes=8) as p:
        start = time.time()
        print('Parent process %s.' % os.getpid())
        pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe'])
        channel = pd.read_csv('data/channels.txt', names=['value'])
        edges = channel * 2
        edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True)
        specal = []
        for data in pluse:
            total = p.apply_async(findpeak, (data['energe'],),
                                  callback=partial(histcnt, edge=edges['value'], channel=channel['value']))
            specal.append(total)
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
        spec = sum(specal)
        plt.figure()
        plt.plot(spec['cnt'])
        spec.to_csv('data/spec1.csv', header=False)
        print('every is OK')
        end = time.time()
        print("The time is %f s" % (end - start))

由於對對程序執行緒的程式設計不是很瞭解,其中走了很多彎路,嘗試了很多方法也,這個是最終效果相對較好的。
首先,通過 pd.readtable以chunksize=50000分塊讀取,edges為hist過程中的下統計box。

然後,apply_async為非阻塞呼叫findpeak,然後將結果返回給回撥函式histcnt,但是由於回撥函式除了程序返回結果還有額外的引數,因此使用partial,對特定的引數賦予固定的值(edge和channel)並返回了一個全新的可呼叫物件,這個新的可呼叫物件仍然需要通過制定那些未被賦值的引數(findpeak返回的值)來呼叫。這個新的課呼叫物件將傳遞給partial()的固定引數結合起來,同一將所有引數傳遞給原始函式(histcnt)。(至於為啥不在histcnt中確定那兩個引數,主要是為了避免一直開啟檔案。。當然,有更好的辦法只是懶得思考=。=),還有個原因就是,apply_async返回的是一個物件,需要通過該物件的get方法才能獲取值。。
對於 apply_async官方上是這樣解釋的
Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一個變體,返回一個結果物件
如果指定回撥,那麼它應該是一個可呼叫的接受一個引數。結果準備好回撥時,除非呼叫失敗,在這種情況下,應用error_callback代替。
如果error_callback被指定,那麼它應該是一個可呼叫的接受一個引數。如果目標函式失敗,那麼error_callback叫做除了例項。
回撥應立即完成以來,否則執行緒處理結果將被封鎖。

不使用回撥函式的版本如下,即先將所有子程序得到的資料都存入peaks列表中,然後所有程序完畢後在進行統計計數。

import pandas as pd
import time
import scipy.signal as signal
import numpy as np
from multiprocessing import Pool
import os
import matplotlib.pyplot as plt


def findpeak(pluse):
    pluse[pluse < 0] = 0
    pluse[pluse > 100] = 0
    print('Sub process %s.' % os.getpid())
    start = time.time()
    peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10))
    end = time.time()
    print("The time is %f s" % (end - start))
    res = [pluse[x] for x in peaks]
    return res


if __name__ == '__main__':
    with Pool(processes=8) as p:
        start = time.time()
        print('Parent process %s.' % os.getpid())
        pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe'])
        pks = []
        for data in pluse:
            pks.append(p.apply_async(findpeak, (data['energe'],)))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
        peaks = []
        for i, ele in enumerate(pks):
            peaks.extend(ele.get())
        peaks = pd.DataFrame(peaks, columns=['energe'])
        peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000)
        channel = pd.read_csv('data/channels.txt', names=['value'])
        channel *= 2
        channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True)
        plt.figure()
        spec = plt.hist(peaks['energe'], channel['value'])
        # out.plot.hist(bins=1024)
        # print(out)
        # cnt = peaks.value_counts(bins=1024)
        # cnt.to_csv('data/cnt.csv', index=False, header=False)
        print('every is OK')
        end = time.time()
        print("The time is %f s" % (end - start))