要說明的是, 串行版本足夠快了, 在我的酷睿雙核 debian7.6 下運行只要 0.2s , 簡直是難以超越。 多進程版本難以避免大量的進程創建和數據同步與傳輸開銷, 性能反而不如串行版本, 只能作為學習的示例了。 以后再優化吧。
并發程序設計的兩種基本模式:
1. 將大數據集分解為多個小數據集并行處理后合并。 其難點在于負載均衡。
2. 將一個復雜任務分解為多個子任務流水線并發處理。 其難點在于子任務之間的協調與同步。 發送者與接收者必須制定某種協議,避免接收者過早退出。
實際場景:
1. 任務處理。 將一個復雜任務分解為多個子任務流水線處理(多進程), 在每個子任務中并行地處理整個數據集(多線程)。
2. 現實模擬。 每個對象都是一個并發活動原子, 對象之間靠消息傳遞和資源互斥同步來約束彼此行為。
一個重要的教訓是: 并發程序設計越復雜, 就越難控制程序進程和運行的穩定性, 并發程序的微妙之處讓優化顯得無力。
以下提供了兩個多進程版本的實現。 我的實際想法是, 使用三個進程, 一個是文件讀取進程, 內部使用多線程來讀取文件, 一個是單詞解析進程, 內部使用多線程來處理單詞解析, 一個是主進程。 由于 python GIL 鎖的緣故, 無法使用多線程來達到充分利用并發的優勢。
第一個版本說明:
1. WordReading 內部使用多個進程讀取文件, WordAnalyzing 內部使用多個進程解析單詞。 注意, 由于封裝良好的緣故, 可以自由改變內部的實現(串行變并發), 對外的接口保持不變;
2. 由于大量文件行傳輸需要大量的同步開銷, 因此 WordReading 一次性讀取完所有文件行傳輸給 WordAnalysing , 兩個子任務仍然是串行的;
3. 使用多重隊列原本是想避免多個生產者和多個消費者對一個隊列讀寫的激烈競爭, 由于兩個子任務是串行的, 因此沒排上用場。
第二個版本說明:
1. 主要思想是,WordReading 每次只讀取一部分文件的文件行, 然后傳輸給 WordAnalyzing 進行解析; 這樣兩個子任務是并發的。
2. 難點在于: 難以僅僅通過隊列來判斷文件行是讀完了, 還是正在讀只是暫時沒有輸出。程序中通過非正常消息 EOF FINISHED 標識, 正常消息是 list , 結束消息是字符串, 不會出錯。
3. 文件讀取是采用線程啟動的, 文件行解析在主進程中運行, 兩者是并發的。
4. 采用多重隊列時, 結束消息標識可能寫在任意一個隊列。 當檢測到結束消息時, 不能立即退出, 而是記下這個隊列, 后續取消息不再從這個隊列取,直到所有消息都取出完畢。
第一個版本:
#-------------------------------------------------------------------------------# Name: wordstat_multiprocessing.py# Purpose: statistic words in java files of given directory by multiPRocessing## Author: qin.shuq## Created: 09/10/2014# Copyright: (c) qin.shuq 2014# Licence: <your licence>#-------------------------------------------------------------------------------import reimport osimport timeimport loggingfrom Queue import Emptyfrom multiprocessing import Process, Manager, Pool, Pipe, cpu_countLOG_LEVELS = { 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARN': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}ncpu = cpu_count()def initlog(filename) : logger = logging.getLogger() hdlr = logging.FileHandler(filename) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(LOG_LEVELS['INFO']) return loggererrlog = initlog("error.log")infolog = initlog("info.log")class FileObtainer(object): def __init__(self, dirpath, fileFilterFunc=None): self.dirpath = dirpath self.fileFilterFunc = fileFilterFunc def findAllFilesInDir(self): files = [] for path, dirs, filenames in os.walk(self.dirpath): if len(filenames) > 0: for filename in filenames: files.append(path+'/'+filename) if self.fileFilterFunc is None: return files else: return filter(self.fileFilterFunc, files)class MultiQueue(object): def __init__(self, qnum, timeout): manager = Manager() self.timeout = timeout self.qnum = qnum self.queues = [] self.pindex = 0 for i in range(self.qnum): qLines = manager.Queue() self.queues.append(qLines) def put(self, obj): self.queues[self.pindex].put(obj) self.pindex = (self.pindex+1) % self.qnum def get(self): for i in range(self.qnum): try: obj = self.queues[i].get(True, self.timeout) return obj except Empty, emp: print 'Not Get.' errlog.error('In WordReading:' + str(emp)) return Nonedef readFile(filename): try: f = open(filename, 'r') lines = f.readlines() infolog.info('[successful read file %s]/n' % filename) f.close() return lines except IOError, err: errorInfo = 'file %s Not found /n' % filename errlog.error(errorInfo) return []def batchReadFiles(fileList, ioPool, mq): futureResult = [] for filename in fileList: futureResult.append(ioPool.apply_async(readFile, args=(filename,))) allLines = [] for res in futureResult: allLines.extend(res.get()) mq.put(allLines)class WordReading(object): def __init__(self, allFiles, mq): self.allFiles = allFiles self.mq = mq self.ioPool = Pool(ncpu*3) infolog.info('WordReading Initialized') def run(self): fileNum = len(allFiles) batchReadFiles(self.allFiles, self.ioPool, self.mq)def processLines(lines): result = {} linesContent = ''.join(lines) matches = WordAnalyzing.wordRegex.findall(linesContent) if matches: for word in matches: if result.get(word) is None: result[word] = 0 result[word] += 1 return resultdef mergeToSrcMap(srcMap, destMap): for key, value in destMap.iteritems(): if srcMap.get(key): srcMap[key] = srcMap.get(key)+destMap.get(key) else: srcMap[key] = destMap.get(key) return srcMapclass WordAnalyzing(object): ''' return Map<Word, count> the occurrence times of each word ''' wordRegex = re.compile("[/w]+") def __init__(self, mq, conn): self.mq = mq self.cpuPool = Pool(ncpu) self.conn = conn self.resultMap = {} infolog.info('WordAnalyzing Initialized') def run(self): starttime = time.time() lines = [] futureResult = [] while True: lines = self.mq.get() if lines is None: break futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,))) resultMap = {} for res in futureResult: mergeToSrcMap(self.resultMap, res.get()) endtime = time.time() print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms' self.conn.send('OK') self.conn.close() def obtainResult(self): return self.resultMapclass PostProcessing(object): def __init__(self, resultMap): self.resultMap = resultMap def sortByValue(self): return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True) def obtainTopN(self, topN): sortedResult = self.sortByValue() sortedNum = len(sortedResult) topN = sortedNum if topN > sortedNum else topN for i in range(topN): topi = sortedResult[i] print topi[0], ' counts: ', topi[1]if __name__ == "__main__": dirpath = "/home/lovesqcc/workspace/java/javastudy/src/" if not os.path.exists(dirpath): print 'dir %s not found.' % dirpath exit(1) fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java')) allFiles = fileObtainer.findAllFilesInDir() mqTimeout = 0.01 mqNum = 1 mq = MultiQueue(mqNum, timeout=mqTimeout) p_conn, c_conn = Pipe() wr = WordReading(allFiles, mq) wa = WordAnalyzing(mq, c_conn) wr.run() wa.run() msg = p_conn.recv() if msg == 'OK': pass # taking less time, parallel not needed. postproc = PostProcessing(wa.obtainResult()) postproc.obtainTopN(30) print 'exit the program.'
第二個版本:
#-------------------------------------------------------------------------------# Name: wordstat_multiprocessing.py# Purpose: statistic words in java files of given directory by multiprocessing## Author: qin.shuq## Created: 09/10/2014# Copyright: (c) qin.shuq 2014# Licence: <your licence>#-------------------------------------------------------------------------------import reimport osimport timeimport loggingimport threadingfrom Queue import Emptyfrom multiprocessing import Process, Manager, Pool, Pipe, cpu_countLOG_LEVELS = { 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARN': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}ncpu = cpu_count()CompletedMsg = "EOF FINISHED"def initlog(filename) : logger = logging.getLogger() hdlr = logging.FileHandler(filename) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(LOG_LEVELS['INFO']) return loggererrlog = initlog("error.log")infolog = initlog("info.log")class FileObtainer(object): def __init__(self, dirpath, fileFilterFunc=None): self.dirpath = dirpath self.fileFilterFunc = fileFilterFunc def findAllFilesInDir(self): files = [] for path, dirs, filenames in os.walk(self.dirpath): if len(filenames) > 0: for filename in filenames: files.append(path+'/'+filename) if self.fileFilterFunc is None: return files else: return filter(self.fileFilterFunc, files)class MultiQueue(object): def __init__(self, qnum, CompletedMsg, timeout=0.01): manager = Manager() self.timeout = timeout self.qnum = qnum self.CompletedMsg = CompletedMsg self.queues = [] self.pindex = 0 self.endIndex = -1 for i in range(self.qnum): qLines = manager.Queue() self.queues.append(qLines) def put(self, obj): self.queues[self.pindex].put(obj) self.pindex = (self.pindex+1) % self.qnum def get(self, timeout=0.01): for i in range(self.qnum): if i != self.endIndex: try: obj = self.queues[i].get(True, timeout) if obj == self.CompletedMsg: self.endIndex = i # this queue contains 'finsh flag' msg self.queues[i].put(self.CompletedMsg) continue return obj except Empty, emp: errlog.error('In WordReading:' + str(emp)) if self.endIndex != -1: return self.CompletedMsg return Nonedef readFile(filename): try: f = open(filename, 'r') lines = f.readlines() infolog.info('[successful read file %s]/n' % filename) f.close() return lines except IOError, err: errorInfo = 'file %s Not found /n' % filename errlog.error(errorInfo) return []def divideNParts(total, N): ''' divide [0, total) into N parts: return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)] ''' each = total / N parts = [] for index in range(N): begin = index*each if index == N-1: end = total else: end = begin + each parts.append((begin, end)) return partsdef batchReadFiles(fileList): allLines = [] for filename in fileList: allLines.extend(readFile(filename)) return allLinesdef putResult(futureResult, mq): for res in futureResult: mq.put(res.get()) mq.put(CompletedMsg)class WordReading(object): def __init__(self, allFiles, mq): self.allFiles = allFiles self.mq = mq self.ioPool = Pool(ncpu*3) infolog.info('WordReading Initialized') def run(self): parts = divideNParts(len(self.allFiles), ncpu*3) futureResult = [] for (begin, end) in parts: futureResult.append(self.ioPool.apply_async(func=batchReadFiles, args=(self.allFiles[begin:end],))) t = threading.Thread(target=putResult, args=(futureResult, self.mq)) t.start() print 'Now quit'def processLines(lines): result = {} linesContent = ''.join(lines) matches = WordAnalyzing.wordRegex.findall(linesContent) if matches: for word in matches: if result.get(word) is None: result[word] = 0 result[word] += 1 return resultdef mergeToSrcMap(srcMap, destMap): for key, value in destMap.iteritems(): if srcMap.get(key): srcMap[key] = srcMap.get(key)+destMap.get(key) else: srcMap[key] = destMap.get(key) return srcMapclass WordAnalyzing(object): ''' return Map<Word, count> the occurrence times of each word ''' wordRegex = re.compile("[/w]+") def __init__(self, mq, conn): self.mq = mq self.cpuPool = Pool(ncpu) self.conn = conn self.resultMap = {} infolog.info('WordAnalyzing Initialized') def run(self): starttime = time.time() lines = [] futureResult = [] while True: lines = self.mq.get() if lines == None: continue if lines == CompletedMsg: break futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,))) resultMap = {} for res in futureResult: mergeToSrcMap(self.resultMap, res.get()) endtime = time.time() print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms' self.conn.send('OK') self.conn.close() def obtainResult(self): return self.resultMapclass PostProcessing(object): def __init__(self, resultMap): self.resultMap = resultMap def sortByValue(self): return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True) def obtainTopN(self, topN): sortedResult = self.sortByValue() sortedNum = len(sortedResult) topN = sortedNum if topN > sortedNum else topN for i in range(topN): topi = sortedResult[i] print topi[0], ' counts: ', topi[1]if __name__ == "__main__": #dirpath = "/home/lovesqcc/workspace/java/javastudy/src/" dirpath = "c://Users//qin.shuq//Desktop//region_master//src" if not os.path.exists(dirpath): print 'dir %s not found.' % dirpath exit(1) fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java')) allFiles = fileObtainer.findAllFilesInDir() mqTimeout = 0.01 mqNum = 3 mq = MultiQueue(mqNum, CompletedMsg, timeout=mqTimeout) p_conn, c_conn = Pipe() wr = WordReading(allFiles, mq) wa = WordAnalyzing(mq, c_conn) wr.run() wa.run() msg = p_conn.recv() if msg == 'OK': pass # taking less time, parallel not needed. postproc = PostProcessing(wa.obtainResult()) postproc.obtainTopN(30) print 'exit the program.'
新聞熱點
疑難解答