国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程

2019-11-25 17:52:36
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

簡(jiǎn)介

這是一篇有關(guān)如何將大量的數(shù)據(jù)放入有限的內(nèi)存中的簡(jiǎn)略教程。

與客戶(hù)工作時(shí),有時(shí)會(huì)發(fā)現(xiàn)他們的數(shù)據(jù)庫(kù)實(shí)際上只是一個(gè)csv或Excel文件倉(cāng)庫(kù),你只能將就著用,經(jīng)常需要在不更新他們的數(shù)據(jù)倉(cāng)庫(kù)的情況下完成工作。大部分情況下,如果將這些文件存儲(chǔ)在一個(gè)簡(jiǎn)單的數(shù)據(jù)庫(kù)框架中或許更好,但時(shí)間可能不允許。這種方法對(duì)時(shí)間、機(jī)器硬件和所處環(huán)境都有要求。

下面介紹一個(gè)很好的例子:假設(shè)有一堆表格(沒(méi)有使用Neo4j、MongoDB或其他類(lèi)型的數(shù)據(jù)庫(kù),僅僅使用csvs、tsvs等格式存儲(chǔ)的表格),如果將所有表格組合在一起,得到的數(shù)據(jù)幀太大,無(wú)法放入內(nèi)存。所以第一個(gè)想法是:將其拆分成不同的部分,逐個(gè)存儲(chǔ)。這個(gè)方案看起來(lái)不錯(cuò),但處理起來(lái)很慢。除非我們使用多核處理器。
目標(biāo)

這里的目標(biāo)是從所有職位中(大約1萬(wàn)個(gè)),找出相關(guān)的的職位。將這些職位與政府給的職位代碼組合起來(lái)。接著將組合的結(jié)果與對(duì)應(yīng)的州(行政單位)信息組合起來(lái)。然后用通過(guò)word2vec生成的屬性信息在我們的客戶(hù)的管道中增強(qiáng)已有的屬性。

這個(gè)任務(wù)要求在短時(shí)間內(nèi)完成,誰(shuí)也不愿意等待。想象一下,這就像在不使用標(biāo)準(zhǔn)的關(guān)系型數(shù)據(jù)庫(kù)的情況下進(jìn)行多個(gè)表的連接。
數(shù)據(jù)

201541105411439.jpg (1274×406)

示例腳本

下面的是一個(gè)示例腳本,展示了如何使用multiprocessing來(lái)在有限的內(nèi)存空間中加速操作過(guò)程。腳本的第一部分是和特定任務(wù)相關(guān)的,可以自由跳過(guò)。請(qǐng)著重關(guān)注第二部分,這里側(cè)重的是multiprocessing引擎。

#import the necessary packagesimport pandas as pdimport usimport numpy as npfrom multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version# of a number where '12000' is '12,000' with that beautiful useless comma in there.# did I mention I excel bothers me?# instead of converting the number right away, we only convert them when we need todef median_maker(column):  return np.median([int(x.replace(',','')) for x in column]) # dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'# related_title_score_df is the dataframe of information for the title; columns = ['title','score']### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871# code_title_df contains columns ['code','title']# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location):  try:    related_title_score_df = dictionary_of_dataframes[title]    # we limit dataframe1 to only those related_titles that are above    # a previously established threshold    related_title_score_df = related_title_score_df[title_score_df['score']>80]     #we merge the related titles with another table and its codes    codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)    codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()     # merge the two dataframes by the codes    merged_df = pd.merge(codes_relTitles_scores, oes_data_df)    #limit the BLS data to the state we want    all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]     #calculate some summary statistics for the time we want    group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)    row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]    #convert it all to strings so we can combine them all when writing to file    row_string = [str(x) for x in row]    return row_string  except:    # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant    'do nothing'

這里發(fā)生了神奇的事情:

#runs the function and puts the answers in the queuedef worker(row, q):    ans = job_title_location_matcher(row[0],row[1])    q.put(ans) # this writes to the file while there are still things that could be in the queue# this allows for multiple processes to write to the same file without blocking eachotherdef listener(q):  f = open(filename,'wb')  while 1:    m = q.get()    if m =='kill':        break    f.write(','.join(m) + 'n')    f.flush()  f.close() def main():  #load all your data, then throw out all unnecessary tables/columns  filename = 'skill_TEST_POOL.txt'   #sets up the necessary multiprocessing tasks  manager = Manager()  q = manager.Queue()  pool = Pool(cpu_count() + 2)  watcher = pool.map_async(listener,(q,))   jobs = []  #titles_states is a dataframe of millions of job titles and states they were found in  for i in titles_states.iloc:    job = pool.map_async(worker, (i, q))    jobs.append(job)   for job in jobs:    job.get()  q.put('kill')  pool.close()  pool.join() if __name__ == "__main__":  main()

由于每個(gè)數(shù)據(jù)幀的大小都不同(總共約有100Gb),所以將所有數(shù)據(jù)都放入內(nèi)存是不可能的。通過(guò)將最終的數(shù)據(jù)幀逐行寫(xiě)入內(nèi)存,但從來(lái)不在內(nèi)存中存儲(chǔ)完整的數(shù)據(jù)幀。我們可以完成所有的計(jì)算和組合任務(wù)。這里的“標(biāo)準(zhǔn)方法”是,我們可以?xún)H僅在“job_title_location_matcher”的末尾編寫(xiě)一個(gè)“write_line”方法,但這樣每次只會(huì)處理一個(gè)實(shí)例。根據(jù)我們需要處理的職位/州的數(shù)量,這大概需要2天的時(shí)間。而通過(guò)multiprocessing,只需2個(gè)小時(shí)。

雖然讀者可能接觸不到本教程處理的任務(wù)環(huán)境,但通過(guò)multiprocessing,可以突破許多計(jì)算機(jī)硬件的限制。本例的工作環(huán)境是c3.8xl ubuntu ec2,硬件為32核60Gb內(nèi)存(雖然這個(gè)內(nèi)存很大,但還是無(wú)法一次性放入所有數(shù)據(jù))。這里的關(guān)鍵之處是我們?cè)?0Gb的內(nèi)存的機(jī)器上有效的處理了約100Gb的數(shù)據(jù),同時(shí)速度提升了約25倍。通過(guò)multiprocessing在多核機(jī)器上自動(dòng)處理大規(guī)模的進(jìn)程,可以有效提高機(jī)器的利用率。也許有些讀者已經(jīng)知道了這個(gè)方法,但對(duì)于其他人,可以通過(guò)multiprocessing能帶來(lái)非常大的收益。順便說(shuō)一句,這部分是skill assets in the job-market這篇博文的延續(xù)。

發(fā)表評(píng)論 共有條評(píng)論
用戶(hù)名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 安康市| 永清县| 金川县| 东莞市| 南川市| 宁安市| 南充市| 彭山县| 蒙城县| 石景山区| 北辰区| 株洲市| 溧阳市| 华池县| 汉中市| 集安市| 沙洋县| 上杭县| 新化县| 建宁县| 陵水| 收藏| 公安县| 剑阁县| 嘉峪关市| 高阳县| 卓尼县| 莱芜市| 鹤岗市| 黑龙江省| 中阳县| 乌鲁木齐县| 潼南县| 桃源县| 如皋市| 左贡县| 和田市| 靖远县| 集安市| 板桥市| 新和县|