1. 程式人生 > >《Python 黑帽子》學習筆記

原書的程式碼主要考慮的是如何實現功能,在字元編碼,socket 阻塞和資料互動,異常處理等方面存在一些問題,造成了程式功能不完善,邏輯出差和退出等情況。

本篇筆記記錄用 Python3 實現原書的 netcat, 指令碼功能和步驟主要是參照原書的實現思路,會對部分程式碼的邏輯進行更合理的調整,並學習字元編碼,異常處理,除錯日誌記錄等知識點。


Python3 程式碼

#!/usr/bin/env python3
# -*- code: utf-8 -*-

import sys
import getopt
import socket
import subprocess
threading import logging logging.basicConfig(level=logging.DEBUG, format='%(filename)s[line:%(lineno)d] %(levelname)s: %(message)s', # format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s: %(message)s', # datefmt='%Y/%m/%d %H:%M:%S',
# filename='myapp.log', filemode='a') # define some global variables listen = False command = False upload = False execute = "" target = "" upload_destination = "" port = 0 def run_command(command): """ execute the shell command, or file received from client. :param command: :return: output: shell command result. """
# trim the newline.(delete the characters of the string end.) command = command.rstrip() # run the command and get the output back try: # run command with arguments and return its output. output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True) logging.debug(output) except Exception as e: logging.error(e) output = b"Failed to execute command.\r\n" # send the output back to the client return output def client_handler(client_socket): """ the thread function of handling the client. :param client_socket: :return: """ global upload global execute global command # upload file if len(upload_destination): # read in all of the bytes and write to our destination file_buffer = "" # keep reading data until none is available while True: data = client_socket.recv(1024) file_buffer += data.decode("utf-8") logging.debug(data) # "#EOF#" tell the server, file is end. if "#EOF#" in file_buffer: file_buffer = file_buffer[:-6] break # for interaciton, like heart packet. client_socket.send(b"#") # now we take these bytes and try to write them out try: with open(upload_destination, "wb") as fw: fw.write(file_buffer.encode("utf-8")) client_socket.send(b"save file successed.\n") except Exception as err: logging.error(err) client_socket.send(b"save file failed.\n") finally: client_socket.close() # execute the given file if len(execute): # run the command output = run_command(execute) client_socket.send(output) # now we go into another loop if a command shell was requested if command: # receive command from client, execute it, and send the result data. try: while True: # show a simple prompt client_socket.send(b"<BHP:#>") # now we receive until we see a linefeed (enter key) cmd_buffer = "" while "\n" not in cmd_buffer: try: cmd_buffer += client_socket.recv(1024).decode("utf-8") except Exception as err: logging.error(err) client_socket.close() break # we have a valid command so execute it and send back the results response = run_command(cmd_buffer) # send back the response client_socket.send(response) except Exception as err: logging.error(err) client_socket.close() def server_loop(): """ the server listen. create a thread to handle client's connection. :return: """ global target global port # if no target is defined we listen on all interfaces if not len(target): target = "" server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((target, port)) logging.info("Listen %s:%d" % (target, port)) server.listen(5) while True: client_socket, addr = server.accept() # spin off a thread to handle our new client client_thread = threading.Thread(target=client_handler, args=(client_socket,)) client_thread.start() def client_sender(buffer): """ the client send datas to the server, and receive datas from server. :param buffer: datas from the stdin :return: """ client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # conncet to target client.connect((target, port)) logging.debug('server is %s:%d' % (target, port)) # if we detect input from stdin then send the datas. # if not we are going to wait for the user to input. if len(buffer): # send the datas with utf-8 endecode. client.send(buffer.encode("utf-8")) while True: # now wait for datas back recv_len = 1 response = "" while recv_len: data = client.recv(4096) logging.debug("receive datas : %s" % data) try: response += data.decode("utf-8") except Exception as e: logging.error(e) response += data.decode("gbk") if recv_len < 4096: break print(response + " ") # wait for more input # Python2 is raw_input(), and Python3 is input() buffer = input("") buffer += "\n" client.send(buffer.encode("utf-8")) # logging.info("send datas: %s" % buffer) except Exception as e: logging.error(e) finally: # teardown the connection client.close() def usage(): """ print the info of help :return: """ print("Usage: netcat.py -t target_host -p port") print("\t-l --listen - listen on [host]:[port] for incoming connections") print("\t-e --execute=file_to_run - execute the given file upon receiving a connection") print("\t-c --command - initialize a command shell") print("\t-u --upload=destination - upon receiving connection upload a file and write to [destination]") print("Examples: ") print("\tnetcat.py -t -p 5555 -l -c") print("\tnetcat.py -t -p 5555 -l -u=c:\\target.exe") print("\tnetcat.py -t -p 5555 -l -e=\"cat /etc/passwd\"") print("\techo 'ABCDEFGHI' | ./netcat.py.py -t192.168.1.7 -p80") sys.exit(0) def main(): """ parse shell option and parameters, and set the vars. call listen function or connect function. :return: """ global listen global port global execute global command global upload_destination global target if not len(sys.argv[1:]): usage() # read the commandline options try: opts, args = getopt.getopt(sys.argv[1:], "hle:t:p:cu:", ["help", "listen", "execute=", "target=", "port=", "command", "upload="]) except getopt.GetoptError as err: logging.error("%s", err) usage() for o, a in opts: if o in ("-h", "--help"): usage() elif o in ("-l", "--listen"): listen = True elif o in ("-e", "--execute"): execute = a elif o in ("-c", "--commandshell"): command = True elif o in ("-u", "--upload"): upload_destination = a elif o in ("-t", "--target"): target = a elif o in ("-p", "--port"): port = int(a) else: assert False, "Unhandled Option" usage() # are we going to listen or just send data from stdin if not listen and len(target) and port > 0: # read in the buffer from the commandline # this will block, so send CTRL-D if not sending input to stdin # Windows is Ctrl-Z # buffer = sys.stdin.read() buffer = input() + '\n' # send data off client_sender(buffer) # we are going to listen and potentially # upload things, execute commands and drop a shell back # depending on our command line options above if listen: server_loop() main()


命令列 shell 功能:


帶 debug 的很亂。


logging 的 level 設定為 ERROR,不會輸出 debug 資訊。

upload 功能

我修改為當客戶端傳送 #EOF# 後,作為檔案傳輸的結束,並在服務端傳送一個反饋資料 #, 以保障雙方能資料互動,不然 socket.recv() 將一直阻塞,也可以考慮修改 socket 的超時設定。





在編寫程式碼的時候,用 logging 除錯資料通訊和異常錯誤,幫我解決了很多問題。簡單記錄下,更詳細的知識使用到時再去查閱。

異常處理程式碼結構如下,把可能會引發異常的程式碼放在 try 後執行,引發異常會執行 except 裡的程式碼,最後會執行 finall 裡的程式碼,可以把關閉套接字,退出程式等善後的程式碼放在這裡。

except Exception as e:

用 Python 自帶的 logging 模組,可以直觀的在終端看到除錯資訊,或把除錯資訊存到檔案裡,比 print() 函式要方便很多,能夠顯示出除錯資訊出自程式的哪一行程式碼,可以通過設定不同的日誌等級(level)來輸出不同日誌資訊,設定高等級的日誌等級後,低等級的日誌資訊不會輸出。

level 值的說明:

  • FATAL 致命錯誤
  • CRITICAL 特別糟糕的事情,如記憶體耗盡、磁碟空間為空,一般很少使用
  • ERROR 發生錯誤時,如 IO 操作失敗或者連線問題
  • WARNING 發生很重要的事件,但是並不是錯誤時,如使用者登入密碼錯誤
  • INFO 處理請求或者狀態變化等日常事務
  • DEBUG 除錯過程中使用 DEBUG 等級,如演算法中每個迴圈的中間狀態

format 說明:

  • %(levelno)s:列印日誌級別的數值
  • %(levelname)s:列印日誌級別的名稱
  • %(pathname)s:列印當前執行程式的路徑,其實就是sys.argv[0]
  • %(filename)s:列印當前執行程式名
  • %(funcName)s:列印日誌的當前函式
  • %(lineno)d:列印日誌的當前行號
  • %(asctime)s:列印日誌的時間
  • %(thread)d:列印執行緒ID
  • %(threadName)s:列印執行緒名稱
  • %(process)d:列印程序ID
  • %(message)s:列印日誌資訊
import logging

                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s: %(message)s',
                    datefmt='%Y/%m/%d %H:%M:%S',
                    # filename='myapp.log',


實現命令列 shell 功能時,在 Win7 中文系統上測試,需要傳輸中文字元,出現 UnicodeDecodeError 錯誤。即:

netcat-p3.py[line:168] DEBUG: receive datas : b'\r\nMicrosoft Windows [\xb0\xe6\xb1\xbe 6.1.7601]\r\n<BHP:#>'
netcat-p3.py[line:173] ERROR: 'utf-8' codec can't decode byte 0xb0 in position 21: invalid start byte


    response += data.decode("utf-8")  # 異常的地方
except Exception as e:
    response += data.decode("gbk")


b'\r\nMicrosoft Windows [\xb0\xe6\xb1\xbe 6.1.7601]\r\n<BHP:#>'

這段資料進行 decode(“utf-8”) 解碼。這段資料的來源是建立 shell 子程序後,執行 ver 命令後的結果,即中文的

Microsoft Windows [版本 6.1.7601]

shell 子程序輸出結果資料的編碼是跟隨執行 shell 的系統的,或者說是跟隨當前啟動 shell 的終端的資料編碼。而當前終端資料的編碼是 cp936, 近似於 gbk 編碼。實際上中文 Win7 系統內部都是 cp936.

>>> import locale
>>> locale.getdefaultlocale()  # 獲取系統當前的編碼
('zh_CN', 'cp936')

可以理解為,這段資料的編碼是 gbk 編碼,而 utf-8 和 gbk 編碼之間是不能直接轉換的,所有的 utf-8 和 gbk 編碼都得通過 unicode 編碼進行轉換。

所以,在將 shell 子程序的結果資料,直接進行 decode(“utf-8”) 解碼,會引發 UnicodeDecodeError 異常。我在修改程式碼時,添加了一個異常處理,如果 utf-8 解碼失敗,會修改為 gbk 解碼。這樣能保證程式不會因為異常而退出。

再說明下,為什麼要先進行 utf-8 解碼?因為要保證 socket 通訊使用 byte 流傳輸,我對大多數要通訊的資料(基本都是 str)用 utf-8 進行了編碼,編碼後即為 byte 流,傳送前 encode, 接收後 decode. 由於建立 shell 子程序後,其輸出結果直接就是 byte 流,所以沒對其進行編碼轉換,直接通過 socket.send() 傳送。

執行 shell 的程式碼如下,用 logging.debug(output), 可以看到輸出資料為 byte 流。

# run the command and get the output back
    # run command with arguments and return its output.
    output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
except Exception as e:
    output = b"Failed to execute command.\r\n"

netcat-p3.py[line:40] DEBUG: b'\r\nMicrosoft Windows [\xb0\xe6\xb1\xbe 6.1.7601]\r\n'


用 Python 自帶的 locale 模組可以檢測命令列的預設編碼(也是系統的編碼),和設定命令列編碼。

我的 Kali 英文系統,編碼為 utf-8.

>>> import locale
>>> locale.getdefaultlocale()
('en_US', 'UTF-8')

我的 Win7 中文系統,編碼為 cp936.

>>> import locale
>>> locale.getdefaultlocale()
('zh_CN', 'cp936')


關於 netcat 的實現,主要是解決了一些異常和邏輯的問題,還可以有很多完善的地方,考慮加快下學習進度,下步的筆記將主要記錄程式碼的實現。




