国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

python實現指定目錄下JAVA文件單詞計數的多進程版本

2019-11-14 17:32:57
字體:
來源:轉載
供稿:網友

  

       要說明的是, 串行版本足夠快了, 在我的酷睿雙核 debian7.6 下運行只要 0.2s , 簡直是難以超越。 多進程版本難以避免大量的進程創建和數據同步與傳輸開銷, 性能反而不如串行版本, 只能作為學習的示例了。 以后再優化吧。

 

       并發程序設計的兩種基本模式:

          1.  將大數據集分解為多個小數據集并行處理后合并。 其難點在于負載均衡。

          2.  將一個復雜任務分解為多個子任務流水線并發處理。 其難點在于子任務之間的協調與同步。 發送者與接收者必須制定某種協議,避免接收者過早退出。

 

      實際場景:

          1.  任務處理。 將一個復雜任務分解為多個子任務流水線處理(多進程), 在每個子任務中并行地處理整個數據集(多線程)。

          2.  現實模擬。 每個對象都是一個并發活動原子, 對象之間靠消息傳遞和資源互斥同步來約束彼此行為。

  

       一個重要的教訓是: 并發程序設計越復雜, 就越難控制程序進程和運行的穩定性, 并發程序的微妙之處讓優化顯得無力。  

 

       以下提供了兩個多進程版本的實現。 我的實際想法是, 使用三個進程, 一個是文件讀取進程, 內部使用多線程來讀取文件, 一個是單詞解析進程, 內部使用多線程來處理單詞解析, 一個是主進程。 由于 python GIL 鎖的緣故, 無法使用多線程來達到充分利用并發的優勢。

 

       第一個版本說明:

       1.  WordReading 內部使用多個進程讀取文件, WordAnalyzing 內部使用多個進程解析單詞。 注意, 由于封裝良好的緣故, 可以自由改變內部的實現(串行變并發), 對外的接口保持不變;

       2.  由于大量文件行傳輸需要大量的同步開銷, 因此 WordReading 一次性讀取完所有文件行傳輸給 WordAnalysing , 兩個子任務仍然是串行的;

       3.  使用多重隊列原本是想避免多個生產者和多個消費者對一個隊列讀寫的激烈競爭, 由于兩個子任務是串行的, 因此沒排上用場。 

   

     第二個版本說明:

       1.  主要思想是,WordReading 每次只讀取一部分文件的文件行, 然后傳輸給 WordAnalyzing 進行解析; 這樣兩個子任務是并發的。 

       2.  難點在于: 難以僅僅通過隊列來判斷文件行是讀完了, 還是正在讀只是暫時沒有輸出。程序中通過非正常消息 EOF FINISHED 標識, 正常消息是 list , 結束消息是字符串, 不會出錯。

       3.  文件讀取是采用線程啟動的, 文件行解析在主進程中運行, 兩者是并發的。

       4.  采用多重隊列時, 結束消息標識可能寫在任意一個隊列。 當檢測到結束消息時, 不能立即退出, 而是記下這個隊列, 后續取消息不再從這個隊列取,直到所有消息都取出完畢。 

  

     第一個版本:

#-------------------------------------------------------------------------------# Name:        wordstat_multiprocessing.py# Purpose:     statistic words in java files of given directory by multiPRocessing## Author:      qin.shuq## Created:     09/10/2014# Copyright:   (c) qin.shuq 2014# Licence:     <your licence>#-------------------------------------------------------------------------------import reimport osimport timeimport loggingfrom Queue import Emptyfrom multiprocessing import Process, Manager, Pool, Pipe, cpu_countLOG_LEVELS = {    'DEBUG': logging.DEBUG, 'INFO': logging.INFO,    'WARN': logging.WARNING, 'ERROR': logging.ERROR,    'CRITICAL': logging.CRITICAL}ncpu = cpu_count()def initlog(filename) :    logger = logging.getLogger()    hdlr = logging.FileHandler(filename)    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")    hdlr.setFormatter(formatter)    logger.addHandler(hdlr)    logger.setLevel(LOG_LEVELS['INFO'])    return loggererrlog = initlog("error.log")infolog = initlog("info.log")class FileObtainer(object):    def __init__(self, dirpath, fileFilterFunc=None):        self.dirpath = dirpath        self.fileFilterFunc = fileFilterFunc    def findAllFilesInDir(self):        files = []        for path, dirs, filenames in os.walk(self.dirpath):            if len(filenames) > 0:                for filename in filenames:                    files.append(path+'/'+filename)        if self.fileFilterFunc is None:            return files        else:            return filter(self.fileFilterFunc, files)class MultiQueue(object):    def __init__(self, qnum, timeout):        manager = Manager()        self.timeout = timeout        self.qnum = qnum        self.queues = []        self.pindex = 0        for i in range(self.qnum):            qLines = manager.Queue()            self.queues.append(qLines)    def put(self, obj):        self.queues[self.pindex].put(obj)        self.pindex = (self.pindex+1) % self.qnum    def get(self):        for i in range(self.qnum):            try:                obj = self.queues[i].get(True, self.timeout)                return obj            except Empty, emp:                print 'Not Get.'                errlog.error('In WordReading:' + str(emp))        return Nonedef readFile(filename):    try:        f = open(filename, 'r')        lines = f.readlines()        infolog.info('[successful read file %s]/n' % filename)        f.close()        return lines    except IOError, err:        errorInfo = 'file %s Not found /n' % filename        errlog.error(errorInfo)        return []def batchReadFiles(fileList, ioPool, mq):    futureResult = []    for filename in fileList:        futureResult.append(ioPool.apply_async(readFile, args=(filename,)))        allLines = []    for res in futureResult:        allLines.extend(res.get())    mq.put(allLines)class WordReading(object):    def __init__(self, allFiles, mq):        self.allFiles = allFiles        self.mq = mq        self.ioPool = Pool(ncpu*3)        infolog.info('WordReading Initialized')        def run(self):        fileNum = len(allFiles)        batchReadFiles(self.allFiles, self.ioPool, self.mq)def processLines(lines):    result = {}    linesContent = ''.join(lines)    matches = WordAnalyzing.wordRegex.findall(linesContent)    if matches:        for word in matches:            if result.get(word) is None:                result[word] = 0            result[word] += 1    return resultdef mergeToSrcMap(srcMap, destMap):    for key, value in destMap.iteritems():        if srcMap.get(key):            srcMap[key] = srcMap.get(key)+destMap.get(key)        else:            srcMap[key] = destMap.get(key)    return srcMapclass WordAnalyzing(object):    '''     return Map<Word, count>  the occurrence times of each word    '''    wordRegex = re.compile("[/w]+")    def __init__(self, mq, conn):        self.mq = mq        self.cpuPool = Pool(ncpu)        self.conn = conn        self.resultMap = {}        infolog.info('WordAnalyzing Initialized')    def run(self):        starttime = time.time()        lines = []        futureResult = []        while True:            lines = self.mq.get()            if lines is None:                break            futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,)))        resultMap = {}        for res in futureResult:            mergeToSrcMap(self.resultMap, res.get())        endtime = time.time()        print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'        self.conn.send('OK')        self.conn.close()    def obtainResult(self):        return self.resultMapclass PostProcessing(object):    def __init__(self, resultMap):        self.resultMap = resultMap    def sortByValue(self):        return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)    def obtainTopN(self, topN):        sortedResult = self.sortByValue()        sortedNum = len(sortedResult)        topN = sortedNum if topN > sortedNum else topN        for i in range(topN):            topi = sortedResult[i]            print topi[0], ' counts: ', topi[1]if __name__ == "__main__":    dirpath = "/home/lovesqcc/workspace/java/javastudy/src/"    if not os.path.exists(dirpath):        print 'dir %s not found.' % dirpath        exit(1)    fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java'))    allFiles = fileObtainer.findAllFilesInDir()        mqTimeout = 0.01    mqNum = 1    mq = MultiQueue(mqNum, timeout=mqTimeout)    p_conn, c_conn = Pipe()    wr = WordReading(allFiles, mq)    wa = WordAnalyzing(mq, c_conn)    wr.run()    wa.run()    msg = p_conn.recv()    if msg == 'OK':        pass    # taking less time, parallel not needed.    postproc = PostProcessing(wa.obtainResult())    postproc.obtainTopN(30)    print 'exit the program.'

   第二個版本: 

   

#-------------------------------------------------------------------------------# Name:        wordstat_multiprocessing.py# Purpose:     statistic words in java files of given directory by multiprocessing## Author:      qin.shuq## Created:     09/10/2014# Copyright:   (c) qin.shuq 2014# Licence:     <your licence>#-------------------------------------------------------------------------------import reimport osimport timeimport loggingimport threadingfrom Queue import Emptyfrom multiprocessing import Process, Manager, Pool, Pipe, cpu_countLOG_LEVELS = {    'DEBUG': logging.DEBUG, 'INFO': logging.INFO,    'WARN': logging.WARNING, 'ERROR': logging.ERROR,    'CRITICAL': logging.CRITICAL}ncpu = cpu_count()CompletedMsg = "EOF FINISHED"def initlog(filename) :    logger = logging.getLogger()    hdlr = logging.FileHandler(filename)    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")    hdlr.setFormatter(formatter)    logger.addHandler(hdlr)    logger.setLevel(LOG_LEVELS['INFO'])    return loggererrlog = initlog("error.log")infolog = initlog("info.log")class FileObtainer(object):    def __init__(self, dirpath, fileFilterFunc=None):        self.dirpath = dirpath        self.fileFilterFunc = fileFilterFunc    def findAllFilesInDir(self):        files = []        for path, dirs, filenames in os.walk(self.dirpath):            if len(filenames) > 0:                for filename in filenames:                    files.append(path+'/'+filename)        if self.fileFilterFunc is None:            return files        else:            return filter(self.fileFilterFunc, files)class MultiQueue(object):    def __init__(self, qnum, CompletedMsg, timeout=0.01):        manager = Manager()        self.timeout = timeout        self.qnum = qnum        self.CompletedMsg = CompletedMsg        self.queues = []        self.pindex = 0        self.endIndex = -1        for i in range(self.qnum):            qLines = manager.Queue()            self.queues.append(qLines)    def put(self, obj):        self.queues[self.pindex].put(obj)        self.pindex = (self.pindex+1) % self.qnum    def get(self, timeout=0.01):        for i in range(self.qnum):            if i != self.endIndex:                try:                    obj = self.queues[i].get(True, timeout)                    if obj == self.CompletedMsg:                        self.endIndex = i   # this queue contains 'finsh flag' msg                        self.queues[i].put(self.CompletedMsg)                        continue                    return obj                except Empty, emp:                    errlog.error('In WordReading:' + str(emp))        if self.endIndex != -1:            return self.CompletedMsg        return Nonedef readFile(filename):    try:        f = open(filename, 'r')        lines = f.readlines()        infolog.info('[successful read file %s]/n' % filename)        f.close()        return lines    except IOError, err:        errorInfo = 'file %s Not found /n' % filename        errlog.error(errorInfo)        return []def divideNParts(total, N):    '''       divide [0, total) into N parts:        return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]    '''    each = total / N    parts = []    for index in range(N):        begin = index*each        if index == N-1:            end = total        else:            end = begin + each        parts.append((begin, end))    return partsdef batchReadFiles(fileList):    allLines = []    for filename in fileList:        allLines.extend(readFile(filename))    return allLinesdef putResult(futureResult, mq):    for res in futureResult:        mq.put(res.get())    mq.put(CompletedMsg)class WordReading(object):    def __init__(self, allFiles, mq):        self.allFiles = allFiles        self.mq = mq        self.ioPool = Pool(ncpu*3)        infolog.info('WordReading Initialized')    def run(self):        parts = divideNParts(len(self.allFiles), ncpu*3)        futureResult = []        for (begin, end) in parts:            futureResult.append(self.ioPool.apply_async(func=batchReadFiles, args=(self.allFiles[begin:end],)))        t = threading.Thread(target=putResult, args=(futureResult, self.mq))        t.start()        print 'Now quit'def processLines(lines):    result = {}    linesContent = ''.join(lines)    matches = WordAnalyzing.wordRegex.findall(linesContent)    if matches:        for word in matches:            if result.get(word) is None:                result[word] = 0            result[word] += 1    return resultdef mergeToSrcMap(srcMap, destMap):    for key, value in destMap.iteritems():        if srcMap.get(key):            srcMap[key] = srcMap.get(key)+destMap.get(key)        else:            srcMap[key] = destMap.get(key)    return srcMapclass WordAnalyzing(object):    '''     return Map<Word, count>  the occurrence times of each word    '''    wordRegex = re.compile("[/w]+")    def __init__(self, mq, conn):        self.mq = mq        self.cpuPool = Pool(ncpu)        self.conn = conn        self.resultMap = {}        infolog.info('WordAnalyzing Initialized')    def run(self):        starttime = time.time()        lines = []        futureResult = []        while True:            lines = self.mq.get()            if lines == None:                continue            if lines == CompletedMsg:                break            futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,)))        resultMap = {}        for res in futureResult:            mergeToSrcMap(self.resultMap, res.get())        endtime = time.time()        print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'        self.conn.send('OK')        self.conn.close()    def obtainResult(self):        return self.resultMapclass PostProcessing(object):    def __init__(self, resultMap):        self.resultMap = resultMap    def sortByValue(self):        return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)    def obtainTopN(self, topN):        sortedResult = self.sortByValue()        sortedNum = len(sortedResult)        topN = sortedNum if topN > sortedNum else topN        for i in range(topN):            topi = sortedResult[i]            print topi[0], ' counts: ', topi[1]if __name__ == "__main__":    #dirpath = "/home/lovesqcc/workspace/java/javastudy/src/"    dirpath = "c://Users//qin.shuq//Desktop//region_master//src"    if not os.path.exists(dirpath):        print 'dir %s not found.' % dirpath        exit(1)    fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java'))    allFiles = fileObtainer.findAllFilesInDir()    mqTimeout = 0.01    mqNum = 3    mq = MultiQueue(mqNum, CompletedMsg, timeout=mqTimeout)    p_conn, c_conn = Pipe()    wr = WordReading(allFiles, mq)    wa = WordAnalyzing(mq, c_conn)    wr.run()    wa.run()    msg = p_conn.recv()    if msg == 'OK':        pass    # taking less time, parallel not needed.    postproc = PostProcessing(wa.obtainResult())    postproc.obtainTopN(30)    print 'exit the program.'

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 泗洪县| 新乡市| 含山县| 枣强县| 辽源市| 云梦县| 望奎县| 罗田县| 江川县| 前郭尔| 丰县| 枞阳县| 齐河县| 平原县| 舒兰市| 鄯善县| 上杭县| 怀宁县| 城口县| 黄梅县| 永康市| 孟连| 当雄县| 滁州市| 禹州市| 彭山县| 高阳县| 五大连池市| 资溪县| 海丰县| 敦煌市| 鹿泉市| 阳城县| 白银市| 绩溪县| 汾阳市| 鹤壁市| 屏东县| 乌海市| 辽宁省| 焉耆|