国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python多線程在滲透測試中的應用

2019-11-06 06:52:47
字體:
來源:轉載
供稿:網友
難易程度:★★★閱讀點:python;web安全;文章作者:xiaoye文章來源:i春秋關鍵字:網絡滲透技術前言python是門簡單易學的語言,強大的第三方庫讓我們在編程中事半功倍,今天,我們來談談python多線程在滲透測試中的應用,本文,我們將編寫一個簡易c段存活主機掃描腳本,以及一個python版本的多線程御劍--目錄掃描工具一、python 多線程python多線程有幾種寫法1. thread模塊python的一個多線程模塊,小腳本可以用,但是有瑕疵,比如不穩定,線程數不好控制下方貼出一個c段存活主機掃描腳本,這個腳本i春秋ado老師也有過哦思想:輸入一個ip,經過字符拆分,獲取c段,多線程地ping -c 2 ip ,根據返回的信息來判斷主機是否存活demo ping_thread.py:
'''Created on 2017-2-27 @author: xiaoye'''#coding: utf-8import threadimport timefrom subPRocess import Popen,PIPE def scan_ip(ip):    process = Popen('ping -c 2 ' + ip, stdin=PIPE, stdout=PIPE, shell=True)    data = process.stdout.read()    if 'ttl' in data:        print '%s is live ,now time is %s' % (ip, time.strftime('%H:%M:%S'))     if __name__ == '__main__':    #scan_ip('111.13.147.229')    ips = raw_input()    ip_header = '.'.join(ips.split('.')[:3])    for i in range(1,255):        ip = ip_header + '.' + str(i)        #print ip        thread.start_new_thread(scan_ip, (ip,))        time.sleep(0.1)運行情況:                                                                           速度還行,穩定性一般thread模塊,核心在:
thread.start_new_thread(scan_ip, (ip,))        time.sleep(0.1)scan_ip是要執行的函數,(ip,)是傳入的參數,記得sleep一下2.threading模塊用法:demo:
'''Created on 2017-2-28@author: xiaoye'''#coding: utf-8import threadingimport time def test(th):    print 'i am doing %s %s' % (th, time.strftime('%H:%M:%S'))     def main():    thread = []    keys = ['movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th','movie_th','swim_th','listen_th','learn_th']    thread_count = len(keys)    #print thread_count    for i in range(thread_count):        t = threading.Thread(target=test, args=(keys[i],))        thread.append(t)    for i in range(thread_count):        thread[i].start()    for i in range(thread_count):        thread[i].join() if __name__ == '__main__':    main()運行情況:可以看到,基本是同時運行的,threading.Thread模塊的一種用法就是這樣:
for i in range(thread_count):        t = threading.Thread(target=test, args=(keys[i],))        thread.append(t)    for i in range(thread_count):        thread[i].start()    for i in range(thread_count):        thread[i].join()模式1.:一個列表存放所有線程,start()執行列表中線程,join()等待運行完畢模式1?,還有模式2嗎?當然,模式2就是從threading.Thread繼承一個子類class,重寫父類run方法,實現多線程運行run函數,而這種也是非常良好的寫法demo:
# -*- coding: utf-8 -*-import threading class T(threading.Thread):     def __init__(self):        threading.Thread.__init__(self)     def run(self):                    #繼承,threading.Thread子類, 重寫run方法, run方法在start()后自動執行        print 'i love you'  def main():    thread = []    for i in range(10):      thread.append(T())    for i in thread:        i.start()    for i in thread:        i.join() if __name__ == '__main__':    main()運行情況:二、線程間的數據守護Queue絕對是保護線程間數據安全的好選擇,有關于Queue,大家可以自行百度其用法,我發出一點經常用的:Queue.qsize() 返回隊列的大小Queue.empty() 如果隊列為空,返回True,反之FalseQueue.full() 如果隊列滿了,返回True,反之FalseQueue.full 與 maxsize 大小對應Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間Queue.get_nowait() 相當Queue.get(False)非阻塞 Queue.put(item) 寫入隊列,timeout等待時間Queue.put_nowait(item) 相當Queue.put(item, False) Queue.task_done() 在完成一項工作之后,Queue.task_done() 函數向任務已經完成的隊列發送一個信號Queue.join() 實際上意味著等到隊列為空,再執行別的操作三、多線程threading.Thread+Queue實現滲透測試工具編寫腳本放出來:1.多線程c段存活主機掃描:
'''Created on 2017-2-28@author: xiaoye'''#coding: utf-8import timeimport sysimport threadingimport Queuefrom subprocess import Popen,PIPE  class Quethread(threading.Thread):    def __init__(self, que):        threading.Thread.__init__(self)        self._que = que         def run(self):        while not self._que.empty():            ip = self._que.get()            process = Popen('ping -c 2 ' + ip, stdin=PIPE, stdout=PIPE, shell=True)            data = process.stdout.read()            if 'ttl' in data:                sys.stdout.write('%s is live %s/n' % (ip, time.strftime('%H:%M:%S')))          def main():    que = Queue.Queue()    ips = raw_input()    thread = []    thread_count = 200    ip_head = '.'.join(ips.split('.')[:3])    #print ip_head    for i in range(1, 255):        que.put(ip_head + '.' + str(i))    '''for i in range(1,255):        print que.get()'''         for i in range(thread_count):        thread.append(Quethread(que))             for i in thread:        i.start()             for i in thread:        i.join()              if __name__ == '__main__':    main()Ubuntu下運行成功,win下需要修改Popen里的命令等,截圖:速度很快,穩定性較強c段主機存活腳本:https://github.com/xiaoyecent/ping_threading_Queue2.py版多線程御劍--目錄掃描--支持自定義字典、輸出文件位置以及自定義線程數:
'''@author: xiaoye'''#coding: utf-8import requestsimport sysimport threading#import timeimport Queuefrom optparse import OptionParser reload(sys)sys.setdefaultencoding('utf8') class Doscan(threading.Thread):    def __init__(self, que):        threading.Thread.__init__(self)        self._que = que             def run(self):        while not self._que.empty():            d = self._que.get()            try:                r = requests.get(url + d, headers=headers, timeout=3)                sys.stdout.write(d + ' is scan  status:' + str(r.status_code) + '/n')                if r.status_code == 200:                    with open(option.outfile, 'a') as f:                        f.write(url + d + '/n')            except:                pass     def main():    thread = []    thread_count = option.threadcount    que = Queue.Queue()         with open(option.dictname, 'r') as f:        for d in f.readlines():            d = d.strip('/n')            que.put(d)         for i in range(thread_count):        thread.append(Doscan(que))         for i in thread:        i.start()         for i in thread:        i.join()         if __name__ == '__main__':    parse = OptionParser()    parse.add_option('-u', '--url', dest='input_url', type='string', help='the url you wan to scan dir')    parse.add_option('-o', '--out', dest='outfile', type='string', help='output filename', default='result.txt')    parse.add_option('-s', '--speed', dest='threadcount', type='int', default=60, help='the thread_count')    parse.add_option('-d', '--dict', dest='dictname', type='string', help='dict filename')    (option, args) = parse.parse_args()    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0'}    url = option.input_url    main()      
Usage: scan_dir.py [options] Options:  -h, --help            show this help message and exit  -u INPUT_URL, --url=INPUT_URL                        the url you wan to scan dir  -o OUTFILE, --out=OUTFILE                        output filename  -s THREADCOUNT, --speed=THREADCOUNT                        the thread_count  -d DICTNAME, --dict=DICTNAME                        dict filename參數用法貼出來運行情況舉個例子:-u http://localhost -s 30 -d d://php.txt -o d://ichunqiu.txt:結果:運行速度取決于線程數(默認60)和實際環境源碼:https://github.com/xiaoyecent/scan_dir四、總結多線程加隊列實現線程間的數據保護是很好的搭配,threading.Thread+Queue的用法希望大家能夠掌握,另外,繼承threading.Thread寫出子類,重寫父類run方法來實現多線程的寫法也值得借鑒
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 无为县| 峨边| 香格里拉县| 舟曲县| 湘西| 红河县| 项城市| 边坝县| 白沙| 仁怀市| 望谟县| 故城县| 西贡区| 甘肃省| 高台县| 平昌县| 含山县| 彭州市| 运城市| 中西区| 望奎县| 阳曲县| 巴南区| 玉田县| 秀山| 香格里拉县| 芜湖县| 永修县| 武川县| 报价| 五家渠市| 杂多县| 雷波县| 原平市| 丹寨县| 云林县| 柳河县| 新乡县| 北碚区| 醴陵市| 台江县|