1. 程式人生 > >使用select.select編寫聊天室伺服器 《Python網路程式設計攻略》

使用select.select編寫聊天室伺服器 《Python網路程式設計攻略》

#

現實中,大型網路伺服器可能要處理幾百或幾千個客戶端同時連線的請求,此時為每個客戶端建立單獨的執行緒或程序可能不實際。因為主機的記憶體可用量和CPU的能力皆有限制。

要處理大量客戶端的連線需要更好的技術,那就是Python提供的select模組。

select

select最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。

Python的select()方法直接呼叫作業系統的IO介面,它監控sockets,open files, and pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable 和writeable, 或者通訊錯誤。

select()使得同時監控多個連線變的簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的網路介面進行操作,而不是通過Python的直譯器。

select()方法接收並監控3個通訊列表, 第一個是所有的輸入的data,就是指外部發過來的資料,第2個是監控和接收所有要發出去的data(outgoing data),第3個監控錯誤資訊,接下來我們需要建立2個列表來包含輸入和輸出資訊來傳給select().

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to
which we expect to write outputs = [ ] # Outgoing message queues (socket:Queue) message_queues = {} while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)

當你把inputs,outputs,exceptional(這裡跟inputs共用)傳給select()後,它返回3個新的list,我們上面將他們分別賦值為readable,writable,exceptional, 所有在readable list中的socket連線代表有資料可接收(recv),所有在writable list中的存放著你可以對其進行傳送(send)操作的socket連線,當連線通訊出現error時會把error寫到exceptional列表中。

Readable list

Readable list中的socket 可以有3種可能狀態:
- 第一種是如果這個socket是main “server” socket,它負責監聽客戶端的連線,如果這個main server socket出現在readable裡,那代表這是server端已經ready來接收一個新的連線進來了,為了讓這個main server能同時處理多個連線,在下面的程式碼裡,我們把這個main server的socket設定為非阻塞模式。

# Handle inputs
for s in readable:

    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print >>sys.stderr, 'new connection from', client_address
        connection.setblocking(0)
        inputs.append(connection)

        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()
  • 第二種情況是這個socket是已經建立了的連線,它把資料發了過來,這個時候你就可以通過recv()來接收它發過來的資料,然後把接收到的資料放到queue裡,這樣你就可以把接收到的資料再傳回給客戶端了。
else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)
  • 第三種情況就是這個客戶端已經斷開了,所以你再通過recv()接收到的資料就為空了,所以這個時候你就可以把這個跟客戶端的連線關閉了。
else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
    inputs.remove(s)    #inputs中也刪除掉
    s.close()           #把這個連線關閉掉

    # Remove message queue
    del message_queues[s]  

writable list

對於writable list中的socket,也有幾種狀態,如果這個客戶端連線在跟它對應的queue裡有資料,就把這個資料取出來再發回給這個客戶端,否則就把這個連線從output list中移除,這樣下一次迴圈select()呼叫時檢測到outputs list中沒有這個連線,那就會認為這個連線還處於非活動狀態。

# Handle outputs
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
        outputs.remove(s)
    else:
        print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
        s.send(next_msg)

select 例項

伺服器端示例程式碼

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import select
import socket
import sys
import queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to which we expect to write
outputs = [ ]

message_queues = {}
while inputs:

    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:

        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)

            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
                inputs.remove(s)    #inputs中也刪除掉
                s.close()           #把這個連線關閉掉

                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Remove message queue
        del message_queues[s]

客戶端示例程式碼

__author__ = 'jieli'
import socket
import sys

messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()
            s.close()

執行結果參考:

實現方法

  1. 本例將客戶端和伺服器端程式碼寫在一個腳本里,執行時指定不同的–name引數區別執行的是伺服器或客戶端:當傳入–name=server時,指令碼啟動聊天室伺服器;當傳入的是其他引數,如client1、client2時,則執行的是客戶端。
  2. 聊天室伺服器埠通過–port指定。
  3. 對大型的應用程式而言,最好在不同模組中編寫伺服器和客戶端。

程式程式碼

'''
Created on 2017-2-28

@author: lenovo
'''
import select
import socket
import sys
import signal
import cPickle
import struct
import argparse

SERVER_HOST = 'localhost'
CHAT_SERVER_NAME = 'server'

# Some utilities
def send(channel, *args):
    buffer = cPickle.dumps(args)
    value = socket.htonl(len(buffer))
    size = struct.pack("L",value)
    channel.send(size)
    channel.send(buffer)

def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)

    #socket.recv(bufsize[, flags]) 
    #Receive data from the socket. The return value is a string representing the data received.
    #The maximum amount of data to be received at once is specified by bufsize. See the Unix manual page recv(2) for the meaning of the optional argument flags; it defaults to zero.
    #Note:For best match with hardware and network realities, the value of bufsize should be a relatively small power of 2, for example, 4096.

    try:
        size = socket.ntohl(struct.unpack("L",size)[0])
        #socket.ntohl(x) 
        #Convert 32-bit positive integers from network to host byte order.
        #On machines where the host byte order is the same as network byte order, 
        #this is a no-op; otherwise, it performs a 4-byte swap operation.

    except struct.error, e:
        return ''
    buf = ""
    while len(buf) < size:
        buf += channel.recv(size - len(buf))
    return cPickle.loads(buf)[0]

class ChatServer(object):
    """An example chat server using select"""
    def __init__(self,port,backlog=5):
        self.clients = 0
        self.clientmap = {}
        self.outputs = [] #list out sockets
        self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((SERVER_HOST,port))
        print 'Server listening to port: %s...' %port
        self.server.listen(backlog)
        # Catch keyboard interrupts
        signal.signal(signal.SIGINT, self.sighandler)

    def sighandler(self,signum,frame):
        """Clean up client output"""
        # Close the server
        print 'Shutting down server...'
        # close existing client sockets
        for output in self.outputs:
            output.close()
        self.server.close()

    def get_client_name(self,client):
        """Return the name of the client"""
        info = self.clientmap[client]
        host,name = info[0][0],info[1]
        return '@'.join((name,host))

    def run(self):
        # define input source.
        inputs = [self.server,sys.stdin]
        self.outputs = []
        running = True
        while running:
            try:
                readable,writeable,exceptional = select.select(inputs, self.outputs, [])
            except select.error, e:
                break

            for sock in readable:
                if sock == self.server:
                    # handle the server socket
                    client,address = self.server.accept()
                    print "Chat server: got connection %d from %s" %(client.fileno(),address)
                    # read the login name
                    cname = receive(client).split('NAME: ')[1]
                    # Compute client name and send back
                    self.clients  += 1
                    send(client, 'CLIENT: ' + str(address[0]))
                    inputs.append(client)
                    self.clientmap[client] = (address,cname)
                    # Send joining information to other clients
                    msg = "\n(Connected: New client (%d) from %s)" % (self.clients,self.get_client_name(client))
                    for output in self.outputs:
                        send(output,msg)
                    self.outputs.append(client)

                elif sock == sys.stdin:
                    #handle standard inut
                    junk = sys.stdin.readline()
                    running = False

                else:
                    # Handle all other sockets
                    try:
                        data = receive(sock)
                        if data:
                            #send as new client's message...
                            msg = '\n#[' + self.get_client_name(sock) +']>>' + data
                            # send data to all except ourself
                            for output in self.outputs:
                                if output != sock:
                                    send(output,msg)
                        else:
                            print "Chat server: %d hung up" % sock.fileno()
                            self.clients -= 1
                            sock.close()
                            inputs.remove(sock)
                            self.outputs.remove(sock)

                            #sending client leaving information to others
                            msg = "\n(Now hung up: Client from %s)" % self.get_client_name(sock)
                            for output in self.outputs:
                                send(output,msg)
                    except socket.error, e:
                        #remove
                        inputs.remove(sock)
                        self.outputs.remove(sock)
        self.server.close()

class ChatClient(object):
    """ A command ine chat client using select """
    def __init__(self,name,port,host=SERVER_HOST):
        self.name = name
        self.connected = False
        self.host = host
        self.port = port
        #Initial prompt
        self.prompt = '[' + '@'.join((name,socket.gethostname().split('.')[0])) + ']>'
        # Connect to server at port
        try:
            self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            self.sock.connect((host,self.port))
            print "New connected to chat server @ port %d" %self.port
            self.connected = True
            #Send my name...
            send(self.sock,'NAME: ' + self.name)
            data = receive(self.sock)
            #Contains client address, set it
            addr = data.split('CLIENT: ')[1]
            self.prompt = '[' +'@'.join((self.name,addr)) +']>'
        except socket.error, e:
            print "Failed to connect to chat server @ port %d" %self.port
            sys.exit(1)

    def run(self):
        """ Chat client main loop"""
        while self.connected:
            try:
                sys.stdout.write(self.prompt)
                sys.stdout.flush()
                #wait for input from  stdin and socket
                readable,writeable,exceptional = select.select([0,self.sock], [], [])
                for sock in readable:
                    if sock == 0:
                        data = sys.stdin.readline().strip()
                        if data: send(self.sock,data)
                    elif sock == self.sock:
                        data = receive(self.sock)
                        if not data:
                            print 'Client shutting down.'
                            self.connected = False
                            break
                        else:
                            sys.stdout.write(data + '\n')
                            sys.stdout.flush()
            except KeyboardInterrupt:
                print " CLient interrupted. """
                self.sock.close()
                break
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description = 'Socket Server Example with Select')
    parser.add_argument('--name', action="store",dest="name",required=True)
    parser.add_argument('--port',action="store",dest="port",type=int,required=True)
    given_args=parser.parse_args()
    port=given_args.port
    name=given_args.name
    if name == CHAT_SERVER_NAME:
        server = ChatServer(port)
        server.run()
    else:
        client = ChatClient(name=name,port=port)
        client.run()    

執行結果:

20170308-1.JPG