国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

用Python編寫一個高效的端口掃描器的方法

2020-01-04 13:45:43
字體:
供稿:網(wǎng)友

PyPortScanner

python多線程端口掃描器。

輸出示例: 

Python,端口掃描器

Github

此端口掃描器的源碼,文檔及詳細(xì)調(diào)用方法見Github PythonPortScanner by Yaokai。

背景

有時候,在進行網(wǎng)絡(luò)相關(guān)的研究的時候,我們需要執(zhí)行一些有目的的參數(shù)測量。而端口掃描就是其中比較普遍也比較重要的一項。所謂的端口掃描,就是指通過TCP握手或者別的方式來判別一個給定主機上的某些端口是否處理開放,或者說監(jiān)聽的狀態(tài)。現(xiàn)有的使用比較廣泛的端口掃描工具是nmap。毋庸置疑,nmap是一款非常強大且易于使用的軟件。但nmap是一款運行于terminal中的軟件,有時在別的代碼中調(diào)用并不是很方便,甚至沒有相應(yīng)的庫。另外,nmap依賴的其他庫較多,在較老的系統(tǒng)中可能無法使用較新的nmap,這樣會造成掃描的不便。另外,nmap在掃描時需要root權(quán)限。基于這個原因,我用python2.7自帶的庫開發(fā)了一款高效的多線程端口掃描器來滿足使用需要。

具體實現(xiàn)

I. 利用TCP握手連接掃描一個給定的(ip,port)地址對

為了實現(xiàn)端口掃描,我們首先明白如何使用python socket與給定的(ip, port)進行TCP握手。為了完成TCP握手,我們需要先初始化一個TCP socket。在python中新建一個TCP socket的代碼如下:

TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #(1)TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2)TCP_sock.settimeout(delay) #(3)

其中(1)是初始化socket的代碼,socket.AF_INTE參數(shù)表示IPv4 socketsocket.SOCK_STREAM參數(shù)表示TCP socket。這樣我們就初始化了一個使用IPv4,TCP協(xié)議的socket。

(2)使用了socket.setsockopt()來設(shè)置socket的另一些參數(shù)。socket.SOL_SOCKET指定當(dāng)前socket將使用setsockopt()中后面的參數(shù)。socket.SO_REUSEPORT表明當(dāng)前socket使用了可復(fù)用端口的設(shè)置。socket.SO_REUSEPORT具體含義可以參考我的另一篇文章

(3)將socket的連接超時時間設(shè)置為delay變量所對應(yīng)的時間(以秒為單位)。這么做是為了防止我們在一個連接上等待太久。 
了解了如何新建一個socket,我們就可以開始對給定的(ip,port)對進行TCP連接。代碼如下:

try:  result = TCP_sock.connect_ex((ip, int(port_number)))  # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE  if result == 0:    output[port_number] = 'OPEN'  else:    output[port_number] = 'CLOSE'    TCP_sock.close()except socket.error as e:  output[port_number] = 'CLOSE'  pass

因為這是一個I/O操作,為了處理可能出現(xiàn)的異常,我們需要在try,except塊處理這部分操作。其次,我們根據(jù)socket.connect_ex()方法連接目標(biāo)地址,通過該方法返回的狀態(tài)代碼來判斷連接是否成功。該方法返回0代表連接成功。所以當(dāng)返回值為0的時候?qū)?dāng)前端口記錄為打開狀態(tài)。反之記錄為關(guān)閉。另外,當(dāng)連接操作出現(xiàn)異常的時候,我們也將端口記錄為關(guān)閉狀態(tài),因為其并不能被成功連接(可能因為防火墻或者數(shù)據(jù)包被過濾等原因)。

需要注意的是,在連接完成后我們一定要調(diào)用socket.close()方法來關(guān)閉與遠(yuǎn)程端口之間的TCP連接。否則的話我們的掃描操作可能會引起所謂的TCP連接懸掛問題(Hanging TCP connection)。

總結(jié)起來,TCP握手掃描的整體代碼如下:

"""Perform status checking for a given port on a given ip address using TCP handshakeKeyword arguments:ip -- the ip address that is being scannedport_number -- the port that is going to be checkeddelay -- the time in seconds that a TCP socket waits until timeoutoutput -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')"""def __TCP_connect(ip, port_number, delay, output):  # Initilize the TCP socket object  TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)  TCP_sock.settimeout(delay)  try:    result = TCP_sock.connect_ex((ip, int(port_number)))    # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE    if result == 0:      output[port_number] = 'OPEN'    else:      output[port_number] = 'CLOSE'    TCP_sock.close()  except socket.error as e:    output[port_number] = 'CLOSE'    pass

II. 多線程掃描端口

單線程掃描雖然邏輯簡單,但無疑是及其低效的。因為在掃描過程中要進行大量的數(shù)據(jù)包的發(fā)送和接受,所以這是一個I/O密集型的操作。如果只是用單線程進行掃描的話,程序會在等待回復(fù)的過程中浪費大量的時間。因此多線程的操作是很有必要的。這里,一個很自然的思路就是為每一個端口單獨開一個線程進行掃描。 

在這里我們將需要掃描的端口列表定為從Nmap中得到的前1000個使用頻率最高的端口:

 

復(fù)制代碼代碼如下:
__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563,...]

 

對于一個給定的ip地址,掃描的過程是這樣的: 

1. 取出一個端口 
2. 新建一條線程,利用__TCP_connect()函數(shù)對該(ip,port)進行連接操作。 
3. 調(diào)用thread.start()thread.join()方法,使掃描的子線程開始工作并且命令主線程等待子線程死亡后再結(jié)束。 
4. 重復(fù)這個過程直到所有的端口都被掃描過。 

根據(jù)以上思路,多線程掃描的代碼如下:

"""Open multiple threads to perform port scanningKeyword arguments:ip -- the ip address that is being scanneddelay -- the time in seconds that a TCP socket waits until timeoutoutput -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')"""def __scan_ports_helper(ip, delay, output):  '''  Multithreading port scanning  '''  port_index = 0  while port_index < len(__port_list):    # Ensure that the number of cocurrently running threads does not exceed the thread limit    while threading.activeCount() < __thread_limit and port_index < len(__port_list):      # Start threads      thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))      thread.start()      # lock the thread until all threads complete      thread.join()      port_index = port_index + 1

其中__thread_limit參數(shù)是用來限制線程數(shù)目的。output是一個字典,以(port: status)的形式保存了掃描的結(jié)果。 
thread.join()保證了主線程只有在所有子線程都結(jié)束之后才會繼續(xù)執(zhí)行,從而確保了我們一定會掃描全部的端口。

III. 多線程掃描多個網(wǎng)站

在多線程掃描端口的同時,如果我們能夠多線程掃描多個網(wǎng)站,那么掃描的效率還將進一步提高。為了達(dá)到這個目的,我們需要另一個線程去管理一個網(wǎng)站對應(yīng)的對其端口進行掃描的所有子線程。 

除此之外,在這種情況下,我們必須刪去__scan_ports_helper()中的thread.join()。否則主線程就會被端口掃描子線程阻塞,我們也就無法多線程掃描多個網(wǎng)站了。 

在不使用join()的情況下,我們?nèi)绾未_保一個網(wǎng)站的掃描線程只有在完成對其全部端口的掃描之后才會返回呢?這里我使用的方法是檢測output字典的長度。因為在全部掃描完成后,output的長度一定與__port_list的長度一致。 

改變后的代碼如下:

def __scan_ports_helper(ip, delay, output):  '''  Multithreading port scanning  '''  port_index = 0  while port_index < len(__port_list):    # Ensure that the number of cocurrently running threads does not exceed the thread limit    while threading.activeCount() < __thread_limit and port_index < len(__port_list):      # Start threads      thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))      thread.start()      port_index = port_index + 1  while (len(output) < len(self.target_ports)):    continue

根據(jù)以上掃描線程的代碼,端口掃描的管理線程的代碼如下所示:

"""Controller of the __scan_ports_helper() functionKeyword arguments:ip -- the ip address that is being scanneddelay -- the time in seconds that a TCP socket waits until timeout"""    def __scan_ports(websites, output_ip, delay):  scan_result = {}  for website in websites:    website = str(website)    scan_result[website] = {}    thread = threading.Thread(target = __scan_ports_helper, args = (ip, delay, scan_result[website]))    thread.start()    # lock the script until all threads complete    thread.join()  return scan_result

至此,我們就完成了一個多線程端口掃描器的全部代碼。

IV. 總結(jié)!利用這些代碼掃描給定網(wǎng)站并輸出結(jié)果

處于輸出方便的考慮,我并沒有使用多線程掃描多個網(wǎng)站,同時對每個網(wǎng)站多線程掃描多個端口的方法。在這個例子中只進行了多線程掃描端口,但同時只掃描一個網(wǎng)站的操作。整合起來的代碼如下:

import sysimport subprocessimport socketimport threadingimport timeclass PortScanner:  # default ports to be scanned  # or put any ports you want to scan here!  __port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563]  # default thread number limit  __thread_limit = 1000  # default connection timeout time inseconds  __delay = 10  """  Constructor of a PortScanner object  Keyword arguments:  target_ports -- the list of ports that is going to be scanned (default self.__port_list)  """  def __init__(self, target_ports = None):    # If target ports not given in the arguments, use default ports    # If target ports is given in the arguments, use given port lists    if target_ports is None:      self.target_ports = self.__port_list    else:      self.target_ports = target_ports  """  Return the usage information for invalid input host name.   """  def __usage(self):    print('python Port Scanner v0.1')    print('please make sure the input host name is in the form of "something.com" or "http://something.com!"/n')  """  This is the function need to be called to perform port scanning  Keyword arguments:  host_name -- the hostname that is going to be scanned  message -- the message that is going to be included in the scanning packets, in order to prevent    ethical problem (default: '')  """  def scan(self, host_name, message = ''):    if 'http://' in host_name or 'https://' in host_name:      host_name = host_name[host_name.find('://') + 3 : ]    print('*' * 60 + '/n')    print('start scanning website: ' + str(host_name))    try:      server_ip = socket.gethostbyname(str(host_name))      print('server ip is: ' + str(server_ip))    except socket.error as e:      # If the DNS resolution of a website cannot be finished, abort that website.      #print(e)      print('hostname %s unknown!!!' % host_name)      self.__usage()      return {}      # May need to return specificed values to the DB in the future    start_time = time.time()    output = self.__scan_ports(server_ip, self.__delay, message)    stop_time = time.time()    print('host %s scanned in %f seconds' %(host_name, stop_time - start_time))    print('finish scanning!/n')    return output  """  Set the maximum number of thread for port scanning  Keyword argument:  num -- the maximum number of thread running concurrently (default 1000)  """  def set_thread_limit(self, num):    num = int(num)    if num <= 0 or num > 50000:      print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!')      print('The scanning process will use default thread limit!')      return    self.__thread_limit = num  """  Set the time out delay for port scanning in seconds  Keyword argument:  delay -- the time in seconds that a TCP socket waits until timeout (default 10)  """  def set_delay(self, delay):    delay = int(delay)    if delay <= 0 or delay > 100:      print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)')      print('The scanning process will use the default delay time')      return     self.__delay = delay  """  Print out the list of ports being scanned  """  def show_target_ports(self):    print ('Current port list is:')    print (self.target_ports)  """  Print out the delay in seconds that a TCP socket waits until timeout  """  def show_delay(self):    print ('Current timeout delay is :%d' %(int(self.__delay)))  """  Open multiple threads to perform port scanning  Keyword arguments:  ip -- the ip address that is being scanned  delay -- the time in seconds that a TCP socket waits until timeout  output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')  message -- the message that is going to be included in the scanning packets, in order to prevent    ethical problem (default: '')  """  def __scan_ports_helper(self, ip, delay, output, message):    '''    Multithreading port scanning    '''    port_index = 0    while port_index < len(self.target_ports):      # Ensure that the number of cocurrently running threads does not exceed the thread limit      while threading.activeCount() < self.__thread_limit and port_index < len(self.target_ports):        # Start threads        thread = threading.Thread(target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message))        thread.start()        port_index = port_index + 1  """  Controller of the __scan_ports_helper() function  Keyword arguments:  ip -- the ip address that is being scanned  delay -- the time in seconds that a TCP socket waits until timeout  message -- the message that is going to be included in the scanning packets, in order to prevent    ethical problem (default: '')  """      def __scan_ports(self, ip, delay, message):    output = {}    thread = threading.Thread(target = self.__scan_ports_helper, args = (ip, delay, output, message))    thread.start()    # Wait until all port scanning threads finished    while (len(output) < len(self.target_ports)):      continue    # Print openning ports from small to large    for port in self.target_ports:      if output[port] == 'OPEN':        print(str(port) + ': ' + output[port] + '/n')    return output  """  Perform status checking for a given port on a given ip address using TCP handshake  Keyword arguments:  ip -- the ip address that is being scanned  port_number -- the port that is going to be checked  delay -- the time in seconds that a TCP socket waits until timeout  output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')  message -- the message that is going to be included in the scanning packets, in order to prevent    ethical problem (default: '')  """  def __TCP_connect(self, ip, port_number, delay, output, message):    # Initilize the TCP socket object    TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)    TCP_sock.settimeout(delay)    # Initilize a UDP socket to send scanning alert message if there exists an non-empty message    if message != '':      UDP_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)      UDP_sock.sendto(str(message), (ip, int(port_number)))    try:      result = TCP_sock.connect_ex((ip, int(port_number)))      if message != '':        TCP_sock.sendall(str(message))      # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE      if result == 0:        output[port_number] = 'OPEN'      else:        output[port_number] = 'CLOSE'      TCP_sock.close()    except socket.error as e:      output[port_number] = 'CLOSE'      pass

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持VEVB武林網(wǎng)。


注:相關(guān)教程知識閱讀請移步到python教程頻道。
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 杂多县| 宁强县| 卓尼县| 蒙城县| 应城市| 卢氏县| 德庆县| 仪陇县| 从化市| 天柱县| 阿克陶县| 江华| 南京市| 阳春市| 革吉县| 神木县| 皋兰县| 虹口区| 凤山市| 龙门县| 阿合奇县| 邹城市| 交城县| 遵义市| 清水河县| 泾源县| 甘南县| 民勤县| 万年县| 阿勒泰市| 昌平区| 通河县| 罗源县| 仙游县| 邯郸县| 轮台县| 平远县| 禄劝| 新竹市| 香河县| 永兴县|