国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

利用pandas進行大文件計數處理的方法

2020-01-04 14:45:22
字體:
來源:轉載
供稿:網友

Pandas讀取大文件

要處理的是由探測器讀出的脈沖信號,一組數據為兩列,一列為時間,一列為脈沖能量,數據量在千萬級,為了有一個直接的認識,先使用Pandas讀取一些

import pandas as pddata = pd.read_table('filename.txt', iterator=True)chunk = data.get_chunk(5) 

而輸出是這樣的:

Out[4]: 332.977889999979 -0.0164794921875 0 332.97790 -0.022278 1 332.97791 -0.026855 2 332.97792 -0.030518 3 332.97793 -0.045776 4 332.97794 -0.032654

DataFram基本用法

這里,data只是個容器,pandas.io.parsers.TextFileReader。

使用astype可以實現dataframe字段類型轉換

輸出數據中,每組數據會多處一行,因為get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在讀取過程中并沒有指定DataFrame的columns,因此在get_chunk過程中,默認將第一組數據作為columns。因此需要在讀取過程中指定names即DataFrame的columns。

import pandas as pddata = pd.read_table('filename.txt', iterator=True, names=['time', 'energe'])chunk = data.get_chunk(5) data['energe'] = df['energe'].astype('int')

輸出為

Out[6]:

 

index time energe
0 332.97789 -0.016479
1 332.97790 -0.022278
2 332.97791 -0.026855
3 332.97792 -0.030518
4 332.97793 -0.045776

 

DataFram存儲和索引

這里講一下DataFrame這個格式,與一般二維數據不同(二維列表等),DataFrame既有行索引又有列索引,因此在建立一個DataFrame數據是

DataFrame(data, columns=[‘year', ‘month', ‘day'], index=[‘one', ‘two', ‘three'])

 

  year month day
0 2010 4 1
1 2011 5 2
2 2012 6 3
3 2013 7 5
4 2014 8 9

 

而pd.read_table中的names就是指定DataFrame的columns,而index自動設置。 而DataFrame的索引格式有很多

 

類型 說明 例子
obj[val] 選取單列或者一組列  
obj.ix[val] 選取單個行或者一組行  
obj.ix[:,val] 選取單個列或列子集  
obj.ix[val1, val2] 同時選取行和列  
reindex方法 將一個或多個軸匹配到新索引  
xs方法 根據標簽選取單行或單列,返回一個Series  
icol,lrow方法 根據整數位置選取單列或單行,返回一個Series  
get_value,set_value 根據行標簽列標簽選取單個值

 

exp: In[1]:data[:2]

Out[2]:

 

  year month day
0 2010 4 1
1 2011 5 2

 

In[2]:data[data[‘month']>5]

Out[2]:

 

  year month day
2 2012 6 3
4 2014 8 9

 

如果我們直接把data拿來比較的話,相當于data中所有的標量元素

In[3]:data[data<6]=0

Out[3]:

 

  year month day
0 2010 0 0
1 2011 0 0
2 2012 6 0
3 2013 7 0
4 2014 8 9

 

Pandas運算

series = data.ix[0]data - series

Out:

 

  year month day
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 4
4 4 4 8

 

DataFrame與Series之間運算會將Series索引匹配到DataFrame的列,然后沿行一直向下廣播

如果令series1 = data[‘year']

data.sub(series1,axis=0) 

則每一列都減去該series1,axis為希望匹配的軸,=0行索引,即匹配列,=1列索引,則按行匹配。

DataFrame的一些函數方法

這個就有很多了,比如排序和排名;求和、平均數以及方差、協方差等數學方法;還有就是唯一值(類似于集合)、值計數和成員資格等方法。

當然還有一些更高級的屬性,用的時候再看吧

數據處理

在得到數據樣式后我們先一次性讀取數據

start = time.time()data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe'])end = time.time()data.indexprint("The time is %f s" % (end - start))plus = data['energe']plus[plus < 0] = 0
The time is 29.403917 s RangeIndex(start=0, stop=68319232, step=1)

對于一個2G大小,千萬級的數據,這個讀取速度還是挺快的。之前使用matlab load用時160多s,但是不知道這個是否把數據完全讀取了。然后只抽取脈沖信號,將負值歸0,因為會出現一定的電子噪聲從而產生一定負值。

然后就需要定位脈沖信號中的能峰了,也就是findpeaks

這里用到了scipy.signal中的find_peaks_cwt,具體用法可以參見官方文檔

peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,輸入第一個為數據,第二個為窗函數,也就是在這個寬度的能窗內尋找峰,我是這樣理解的。剛開始以為是數據的另一維坐標,結果找了半天沒結果。不過事實上這個找的確定也挺慢的。

50w條的數據,找了足足7分鐘,我這一個數據3000w條不得找半個多小時,而各種數據有好幾十,恩。。這樣是不行的,于是想到了并行的方法。這個下篇文章會講到,也就是把數據按照chunksize讀取,然后同時交給(map)幾個進程同時尋峰,尋完后返回(reduce)一起計數,計數的同時,子進程再此尋峰。

在處理的時候碰到我自己的破 筆記本由于內存原因不能load這個數據,并且想著每次copy這么大數據好麻煩,就把一個整體數據文件分割成了幾個部分,先對方法進行一定的實驗,時間快,比較方便。

import pandas as pddef split_file(filename, size): name = filename.split('.')[0] data = pd.read_table(filename, chunksize=size, names=['time', 'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece.to_csv(outname, index=False, names = ['time', 'intension']) i += 1def split_csvfile(filename, size): name = filename.split('.')[0] data = pd.read_csv(filename, chunksize=size, names=['time', 'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece = piece['intension'] piece.to_csv(outname, index=False) i += 1

額..使用并行尋峰通過map/reduce的思想來解決提升效率這個想法,很早就實現了,但是,由于效果不是特別理想,所以放那也就忘了,今天整理代碼來看了下當時記的些筆記,然后竟然發現有個評論…..我唯一收到的評論竟然是“催稿”=。=。想一想還是把下面的工作記錄下來,免得自己后來完全忘記了。

rom scipy import signalimport osimport timeimport pandas as pdimport numpy as npfrom multiprocessing import Poolimport matplotlib.pylab as pltfrom functools import partialdef findpeak(pluse): pluse[pluse < 0.05] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) # 返回一個列表 end = time.time() print("The time is %f s" % (end - start)) pks = [pluse[x] for x in peaks] return pksdef histcnt(pks, edge=None, channel=None): cnt = plt.hist(pks, edge) res = pd.DataFrame(cnt[0], index=channel, columns=['cnt']) return resif __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe']) channel = pd.read_csv('data/channels.txt', names=['value']) edges = channel * 2 edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True) specal = [] for data in pluse: total = p.apply_async(findpeak, (data['energe'],),   callback=partial(histcnt, edge=edges['value'], channel=channel['value'])) specal.append(total) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') spec = sum(specal) plt.figure() plt.plot(spec['cnt']) spec.to_csv('data/spec1.csv', header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start))

由于對對進程線程的編程不是很了解,其中走了很多彎路,嘗試了很多方法也,這個是最終效果相對較好的。

首先,通過 pd.readtable以chunksize=50000分塊讀取,edges為hist過程中的下統計box。

然后,apply_async為非阻塞調用findpeak,然后將結果返回給回調函數histcnt,但是由于回調函數除了進程返回結果還有額外的參數,因此使用partial,對特定的參數賦予固定的值(edge和channel)并返回了一個全新的可調用對象,這個新的可調用對象仍然需要通過制定那些未被賦值的參數(findpeak返回的值)來調用。這個新的課調用對象將傳遞給partial()的固定參數結合起來,同一將所有參數傳遞給原始函數(histcnt)。(至于為啥不在histcnt中確定那兩個參數,主要是為了避免一直打開文件。。當然,有更好的辦法只是懶得思考=。=),還有個原因就是,apply_async返回的是一個對象,需要通過該對象的get方法才能獲取值。。

對于 apply_async官方上是這樣解釋的

Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一個變體,返回一個結果對象

如果指定回調,那么它應該是一個可調用的接受一個參數。結果準備好回調時,除非調用失敗,在這種情況下,應用error_callback代替。

如果error_callback被指定,那么它應該是一個可調用的接受一個參數。如果目標函數失敗,那么error_callback叫做除了實例。

回調應立即完成以來,否則線程處理結果將被封鎖。

不使用回調函數的版本如下,即先將所有子進程得到的數據都存入peaks列表中,然后所有進程完畢后在進行統計計數。

import pandas as pdimport timeimport scipy.signal as signalimport numpy as npfrom multiprocessing import Poolimport osimport matplotlib.pyplot as pltdef findpeak(pluse): pluse[pluse < 0] = 0 pluse[pluse > 100] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) end = time.time() print("The time is %f s" % (end - start)) res = [pluse[x] for x in peaks] return resif __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe']) pks = [] for data in pluse: pks.append(p.apply_async(findpeak, (data['energe'],))) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') peaks = [] for i, ele in enumerate(pks): peaks.extend(ele.get()) peaks = pd.DataFrame(peaks, columns=['energe']) peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000) channel = pd.read_csv('data/channels.txt', names=['value']) channel *= 2 channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True) plt.figure() spec = plt.hist(peaks['energe'], channel['value']) # out.plot.hist(bins=1024) # print(out) # cnt = peaks.value_counts(bins=1024) # cnt.to_csv('data/cnt.csv', index=False, header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start))

以上這篇利用pandas進行大文件計數處理的方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持VEVB武林網。


注:相關教程知識閱讀請移步到python教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 新民市| 贡嘎县| 天门市| 揭阳市| 华安县| 贞丰县| 潞西市| 柳州市| 博兴县| 普陀区| 凤山县| 托里县| 交口县| 莱州市| 龙门县| 喜德县| 新竹市| 孟村| 八宿县| 克拉玛依市| 四川省| 西充县| 星子县| 云林县| 建瓯市| 彰化县| 肥乡县| 尉氏县| 贵定县| 望江县| 祁连县| 桐城市| 宁海县| 德江县| 河源市| 永济市| 白玉县| 武义县| 许昌县| 尼勒克县| 兴安盟|