1. 程式人生 > >python非阻塞伺服器實現

python非阻塞伺服器實現

不多解釋,直接上程式碼,是我們之前做戰鬥伺服器時搞得,翻出來分享一下,把業務那塊的程式碼去掉了,只貼出核心程式碼


程式入口:

#!/usr/bin/env python
# encoding: utf-8
"""
Server.py

Create by sunshaolei at 12-9-3
Email: [email protected]
"""
import sys
from servercore.Client import Client
from servercore.HandlerPool import HandlerPool

class Server():
    """
    """
    def __init__(self, host, port):
        """
        """
        self.client = Client(host, port)
        self.handlerPool = HandlerPool(1)

    def start(self):
        """
        """
        self.client.start()
        self.handlerPool.start()

        self.handlerPool.join()


server = Server(sys.argv[1], sys.argv[2])
server.start()


客戶端連線:
#!/usr/bin/env python
# encoding: utf-8
"""
Client.py

Create by sunshaolei at 12-9-3
Email: [email protected]
"""
import socket
import select
import threading
import struct
import copy
import time
from buffer.DataBase import DataBase
from buffer.Sequence import Sequence
from utils.Util import Util


class Client(threading.Thread):
    """
    """
    MAX_CLIENT = 1024 * 2
    MAX_BUFFER = 1024*10240
    RECV_LEN = 1024
    ENDIAN = DataBase.HIGHT_ENDIAN

    def __init__(self, host, port):
        """
        """
        threading.Thread.__init__(self, name='Client')

        self.client_buffer = {}     # 收取資料buff
        self.sequence = Sequence()

        self.host = host
        self.port = int(port)

        self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.sock.bind((self.host, self.port))
        self.sock.setblocking(0)
        self.sock.listen(Client.MAX_CLIENT)

        self.onlineSocketList = [self.sock]

    def run(self):
        """
        """
        while True:
            infds, outfds, errfds = select.select(self.onlineSocketList, self.onlineSocketList, self.onlineSocketList)
            for infd in infds:
                if infd is self.sock:
                    connect, addr = self.sock.accept()
                    Util.write_log('info', 'new connect by: %s:%s' % addr)
                    connect.setblocking(0)
                    self.onlineSocketList.append(connect)
                else:
                    try:
                        self.recv(infd)
                    except:
                        self.disconnect(infd)

            for outfd in outfds:
                try:
                    self.send(outfd)
                except :
                    self.disconnect(outfd)

            for errfd in errfds:
                self.disconnect(errfd)

            time.sleep(0.001)


    def recv(self, fd):
        """
        接收資料處理
        """
        if fd not in self.client_buffer:
            self.client_buffer[fd] = ''

        if len(self.client_buffer[fd]) + Client.RECV_LEN < Client.MAX_BUFFER:
            buff = fd.recv(Client.RECV_LEN)
            if not buff:
                self.disconnect(fd)
                return

            self.client_buffer[fd] += buff

            data = self.client_buffer[fd]
            num = 0     # 放置錯誤的資料格式導致死迴圈的
            while len(data) > 4 and num < 20:
                num += 1
                tag = '>i' if Client.ENDIAN == 1 else 'i'
                data_len = struct.unpack(tag, data[0:4])[0]
                if (len(data)-4) >= data_len:     # 滿足條件
                    database = DataBase(fd, data[4:data_len+4])
                    self.sequence.inQueue.put(database)
                    self.client_buffer[fd] = data[data_len+4:]
                    num = 0     # 如果有資料置為0

                data = self.client_buffer[fd]


    def send(self, fd):
        """
        傳送資料
        """
        database_list = self.sequence.getOut(fd)
        if not database_list:
            return
        try:
            for database in database_list:
                database.fd.send(database.buffer)
        except Exception:
            self.disconnect(fd)


    def disconnect(self, fd):
        """
        """
        if fd in self.onlineSocketList:
            self.onlineSocketList.remove(fd)
        if fd in self.client_buffer:
            self.client_buffer.pop(fd)

        self.sequence.getOut(fd)
        fd.close()

邏輯處理:
#!/usr/bin/env python
# encoding: utf-8
"""
HandlerPool.py

Create by sunshaolei at 12-9-3
Email: [email protected]
"""
import threading
from servercore.Handler import Handler

class HandlerPool(threading.Thread):
    """
    """
    def __init__(self, handlerNum=1):
        """
        """
        self.handlerNum = handlerNum
        self.threads = []

    def start(self):
        """
        """
        self.createThread(self.handlerNum)
        self.wait_for_complete()

    def createThread(self, num):
        for i in range(num):
            thread = Handler(i)
            thread.start()
            self.threads.append(thread)

    def wait_for_complete(self):
        #等待所有執行緒完成。
        while len(self.threads):
            thread = self.threads.pop()
            if thread.isAlive():
                thread.join()


#!/usr/bin/env python
# encoding: utf-8
"""
Handler.py

Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import threading
import traceback
from Game.fight_service import FightService
from buffer.Sequence import Sequence
from utils.Util import Util

class Handler(threading.Thread):
    """
    """
    def __init__(self, threadId):
        """
        """
        threading.Thread.__init__(self, name='Hander_%s' % threadId)

        self.sequence = Sequence()

    def run(self):
        """
        """
        while True:
            database = self.sequence.inQueue.get()

            try:
                # 此處為業務入口,可以根據 moduleId,actionId分發進行資料解包,並執行相關處理,此處省略了
                moduleId = database.getByte()
                actionId = database.getByte()
                data = database.getUTF()

                FightService.app_server_msg(database.fd, moduleId, actionId, data)
            except:
                exstr = traceback.format_exc()
                Util.write_log('err', exstr)
#!/usr/bin/env python
# encoding: utf-8
"""
DataBase.py

Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import struct

class DataBase(object):
    """
    """
    HIGHT_ENDIAN = 1
    LOW_ENDIAN = 0
    def __init__(self, fd, buff=None, endian=HIGHT_ENDIAN):
        """
        """
        self.fd = fd
        self.pos = 0
        self.endian = endian
        self.buffer = buff if buff else ''

    def reset(self):
        """
        """
        self.pos = 0

    def getInt(self):
        """
        """
        tag = '>i' if self.endian == DataBase.HIGHT_ENDIAN else 'i'
        result = struct.unpack(tag, self.buffer[self.pos:self.pos+4])[0]
        self.pos += 4
        return result

    def getShort(self):
        """
        """
        tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
        result = struct.unpack(tag, self.buffer[self.pos:self.pos+2])[0]
        self.pos += 2
        return result

    def getByte(self):
        """
        """
        tag = 'b'
        result = struct.unpack(tag, self.buffer[self.pos:self.pos+1])[0]
        self.pos += 1
        return result

    def getUTF(self):
        """
        """
        tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
        length = struct.unpack(tag, self.buffer[self.pos:self.pos+2])[0]
        self.pos += 2

        tag = '%ss' % (length)
        result = struct.unpack(tag, self.buffer[self.pos:self.pos+length])[0]
        self.pos += length
        return result

    def setInt(self, value):
        """
        """
        tag = '>i' if self.endian == DataBase.HIGHT_ENDIAN else 'i'
        self.buffer += (struct.pack(tag, value))
        return True

    def setShort(self, value):
        """
        """
        tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
        self.buffer += (struct.pack(tag, value))
        return True

    def setByte(self, value):
        """
        """
        tag = 'b'
        self.buffer += (struct.pack(tag, value))
        return True

    def setUTF(self, value):
        """
        """
        tag = '>h' if self.endian == DataBase.HIGHT_ENDIAN else 'h'
        length = len(value)
        self.buffer += (struct.pack(tag, length))

        tag = '%ss' % (length)
        self.buffer += (struct.pack(tag, value))
        return True


#!/usr/bin/env python
# encoding: utf-8
"""
Sequence.py

Create by sunshaolei at 12-9-19
Email: [email protected]
"""
import threading
from Queue import Queue
from utils.Singleton import Singleton

buffLock = threading.RLock()

class Sequence(Singleton):
    """
    """
    def __init__(self):
        """
        """
        self.out_buff = {}

        self.inQueue = Queue(64)

    def getOut(self, k):
        """
        """
        buffLock.acquire()
        data = None
        if k in self.out_buff:
            data = self.out_buff.pop(k)
        buffLock.release()
        return data

    def addOut(self, k, v):
        """
        """
        buffLock.acquire()
        if k not in self.out_buff:
            self.out_buff[k] = []

        self.out_buff[k].append(v)
        buffLock.release()



相關推薦

python阻塞伺服器實現

不多解釋,直接上程式碼,是我們之前做戰鬥伺服器時搞得,翻出來分享一下,把業務那塊的程式碼去掉了,只貼出核心程式碼 程式入口: #!/usr/bin/env python # encoding: utf-8 """ Server.py Create by sunsha

python阻塞式單程序伺服器

python的單程序伺服器一次只能處理一個客戶端,顯然是沒有實用價值的,但是我們可以將單程序伺服器變為非阻塞式的。 利用socket中的setblocking()方法可以將tcp套接字轉化為非阻塞式套

java NIO實現同步阻塞伺服器

server package net.smgui.util; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.Byt

python-阻塞IO

recv code conn waiting res import client wait bin client端: # import time # import socket # sk = socket.socket(socket.AF_INET,socket.SOCK

阻塞伺服器需要注意的主要問題(譯)

非阻塞伺服器有一個嚴重的問題,一些人甚至在沒解決這個問題的背景下就開發自己的應用框架(比如Python的Tornado) 當你使用非阻塞伺服器的時候,你會獲得出色的效能並且不需要擔心可擴充套件性,然而同時你需要意識到一個問題:你的IO呼叫、網路系統呼叫也都是非阻塞的嗎?很

單程序阻塞伺服器

程序的執行取決於排程演算法,多個程序就是多個資源,程序是資源分配的單位,用多程序在客戶端訪問伺服器中是很常見的應用。下面我們介紹一種單程序實現客戶端訪問伺服器的方法,其基本思想就是雖然使用單執行緒但是

python:阻塞或非同步程式設計

例如,對於一個聊天室來說,因為有多個連線需要同時被處理,所以很顯然,阻塞或同步的方法是不合適的,這就像買票只開了一個視窗,佷多人排隊等一樣。那麼我們如何解決這個問題呢?主要有三種方法:forking、threading、非同步I/O。 Forking和threading的方

多執行緒高併發程式設計(11) -- 阻塞演算法實現ConcurrentLinkedQueue原始碼分析

  一.背景   要實現對佇列的安全訪問,有兩種方式:阻塞演算法和非阻塞演算法。阻塞演算法的實現是使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現;非阻塞演算法使用自旋+CAS實現。   今天來探究下使用非阻塞演算法

利用Python中SocketServer 實現客戶端與伺服器阻塞通訊

利用SocketServer模組來實現網路客戶端與伺服器併發連線非阻塞通訊。 首先,先了解下SocketServer模組中可供使用的類: BaseServer:包含伺服器的核心功能與混合(mix-in)類掛鉤;這個類只用於派生,所以不會生成這個類的例項;可以考慮使用TCPS

基於阻塞socket的多執行緒伺服器實現------一個伺服器如何與多個客戶端進行通訊?

      我們首先來看服務端(涉及非阻塞socket和多執行緒): #include <stdio.h> #include <winsock2.h> #include <windows.h> #pragma comment(li

linux下socket程式設計 select實現阻塞模式多臺客戶端與伺服器通訊

select函式原型如下: int select (int maxfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); select系統呼叫是用來讓我們的程式

異步阻塞socket的實現

print except 事件循環 port int 性能 run utf8 try 在學習使用scrapy爬蟲框架之前,需要了解一些基礎原理   我們知道HTTP請求是基於socket模塊進行發送和接受的,但是socket套接字的在使用的中存在著阻塞,不利用爬蟲的高性能運

Python web框架 Tornado(二)異步阻塞

.py thread bind log class multiple fin ini lex 異步非阻塞 阻塞式:(適用於所有框架,Django,Flask,Tornado,Bottle)   一個請求到來未處理完成,後續一直等待   解決方案:多線程,多進程 異步

Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步阻塞模型

.net post this fab htm true 底層實現 自己 print 使用協程建立自己的異步非阻塞模型 接下來例子中,將使用純粹的Python編碼搭建一個異步模型,相當於自己構建的一個asyncio模塊,這也許能對asyncio模塊底層實現的理解有更大的

阻塞套接字實現並發處理

pre ror enc con put 服務 生成 import data 服務端 import socket server = socket.socket() server.setblocking(False) server.bind((‘0.0.0.0‘,8080)

python學習筆記之四-多進程&多線程&異步阻塞

running executor 服務器 RoCE 進行 break python buffer 創建 ProcessPoolExecutor對multiprocessing進行了高級抽象,暴露出簡單的統一接口。 異步非阻塞 爬蟲 對於異步IO請求的本質則是【非阻塞So

scala通過akka的actor實現socket http server(NIO阻塞模式)

1首先是sbt需要匯入的依賴 name := "HttpServer" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-act

c/c++ llinux epoll系列4 利用epoll_wait實現阻塞的connect

llinux epoll系列4 利用epoll_wait實現非阻塞的connect connect函式是阻塞的,而且不能設定connect函式的timeout時間,所以一旦阻塞太長時間,影響使用者的體驗,所以就出來一個需求,硬要設定connect的timeout時間。 實現方法:先把connect函式變成

Java入門系列-25-NIO(實現阻塞網絡通信)

寫入 eve asn accept public int 次數 客戶端 服務器 還記得之前介紹NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網絡操作。 補充:以數組的形式使用緩沖區 package testnio; import

基於Java NIO2實現的非同步阻塞訊息通訊框架

原文傳送門 基於Java NIO2實現的非同步非阻塞訊息通訊框架 前奏 AIO應用開發 Future方式 Callback方式 Reader/Writer方式實現 執行緒池和Group PendingExceptio