測試系統中有一項記錄ssh登錄日志,需要對此進行并發壓力測試。
于是用多線程進行python并發記錄
因為需要安裝的一些依賴和模塊比較麻煩,腳本完成后再用pyinstaller打成exe包分發給其他測試人員一起使用。
1.腳本編寫
# -*- coding: utf-8 -*-import paramikoimport threadingimport timelt = []def ssh(a,xh,sp): count = 0 for i in range(0,xh): try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('ip地址',22,'用戶名', '密碼') ssh.close() print u"線程[%s]第[%s]次登錄"%(a,i) if sp != 0: time.sleep(sp) count += 1 except: print u"線程異常,已處理" lt.append(count) if __name__ == "__main__": figlet = ''' _____ _____ _ | ___| | _ / | | | |__ | |_| | | | | __| | _ { | | | | | |_| | | | |_| |_____/ |_| Code by FBI. ''' print figlet print u"認證攻擊次數=線程數*每個線程認證攻擊次數" print u"請輸入線程數:" xc = raw_input() print u"請輸入每個線程攻擊次數:" xh = raw_input() print u"請輸入每個線程延遲時間(秒),0為不休眠:" sp = raw_input() try: print u"預計總共發送認證攻擊%s次"%(int(xc)*int(xh)) threads = [] for j in range(int(xc)): threads.append(threading.Thread(target=ssh,args=(j,int(xh),int(sp),))) for t in threads: t.start() print t.name t.join() print lt count = 0 for count in lt: count += count print u"程序執行完畢總共發送認證攻擊【%s】次" % count except ValueError,e: print u"因為輸入不規范導致程序出現錯誤,請輸入數字"2.pyinstaller制作exe程序
下載pyinstaller后
在根目錄中cmd中執行python setup.py install安裝pyinstaller
安裝完成后執行命令打成exe文件
python pyinstaller.py -F 文件路徑
3.執行效果
如圖:

以上這篇對python多線程SSH登錄并發腳本詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持武林站長站。
新聞熱點
疑難解答