1. 程式人生 > >【select模塊】select IO多路復用和select實現FTP

【select模塊】select IO多路復用和select實現FTP

func epoll which 使用 list mit lock where puts

select是全平臺通用的IO多路復用模塊。最大連接數:1024。

poll和epoll沒有最大連接數限制,但只能用在linux平臺。

selectors是再封裝模塊,推薦使用。下篇會討論。

select.select(rlist, wlist, xlist[, timeout])?

  • This is a straightforward interface to the Unix select() system call. The first three arguments are sequences of ‘waitable objects’: either integers representing file descriptors or objects with a parameterless method named fileno()

    returning such an integer:

    Empty sequences are allowed, but acceptance of three empty sequences is platform-dependent. (It is known to work on Unix but not on Windows.) The optional timeout argument specifies a time-out as a floating point number in seconds. When the timeout argument is omitted the function blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks.

    The return value is a triple of lists of objects that are ready: subsets of the first three arguments. When the time-out is reached without a file descriptor becoming ready, three empty lists are returned.

    Among the acceptable object types in the sequences are Python file objects (e.g. sys.stdin, or objects returned by open()

    or os.popen()), socket objects returned by socket.socket(). You may also define a wrapper class yourself, as long as it

    • rlist: wait until ready for reading

    • wlist: wait until ready for writing

    • xlist: wait for an “exceptional condition” (see the manual page for what your system considers such a condition)

方法、屬性參數作用示例
select(rlist,wlist,rlist,[timout=1])



poll()
沒人用了,已經升級為epoll


epoll(sizehint = -1,flags=0)

sizehint informs epoll about the expected number of events to be registered. It must be positive, or-1to use the default. It is only used on older systems where epoll_create1() is not available; otherwise it has no effect (though its value is still checked).

flags is deprecated and completely ignored. However, when supplied, its value must be 0 or select.EPOLL_CLOEXEC, otherwise OSError is raised.


(Only supported on Linux 2.5.44 and newer.) Return an edge polling object, which can be used as Edge or Level Triggered interface for I/O events.

devpoll()

(Only supported on Solaris and derivatives.) Returns a /dev/poll polling object; see section /dev/poll Polling Objects below for the methods supported by devpoll objects.
kevent()

select.kevent(ident, filter=KQ_FILTER_READ, flags=KQ_EV_ADD, fflags=0, data=0, udata=0)?
  • (Only supported on BSD.) Returns a kernel event object; see section Kevent Objects below for the methods supported by kevent objects.


kqueue()
(Only supported on BSD.) Returns a kernel queue object; see section Kqueue Objects below for the methods supported by kqueue objects.


import socket
import os
import select
import queue
import json


class SelectFTP(object):
    def __init__(self, ip, port):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setblocking(0)
        self.server.bind((ip, port))
        self.server.listen(20)
        

        self.inputs = [self.server]
        self.outputs = []
        self.file_attr = {}
        # file_attr format:file_attr[socket]:{func:'', filename:'', len:999, recv_len:0}      
        self.socket_queue = {}

    
    def upload(self, sock, write_data):
        # if os.path.isfile(self.file_attr[sock]['filename']):
        with open(self.file_attr[sock]['filename'], 'a+') as file:
            file.write(data)
            self.file_attr[sock][recv_len] += len(write_data)
            if self.file_attr[sock][recv_len] == self.file_attr[sock][len]:
                del self.file_attr[sock]
                file.close()
        
   
    def download(self, sock, *args):
        pass
        
    def run(self):
        while self.inputs:
            read_active, read_output, exception = select.select(self.inputs, self.outputs, self.inputs)
            
            for fd in read_active:
                if fd is server:
                    conn, addr = fd.accept(1024)
                    conn.setblocking(0)
                    self.inputs = self.inputs.append(conn)
                    self.socket_queue[fd] = queue.Queue()
                    
                else:
                    recv_data = fd.recv(1024)

                    if recv_data:
                        data = json.loads(recv_data.decode())
                        
                        if fd not in self.file_attr.keys:
                            self.file_attr[fd] = data
                        
                        else:
                            try:
                                self.socket_queue.put_nowait(data)
                                if fd not in self.outputs:
                                    self.outputs.append(fd)
                            except Exception as e:
                                print(e)
                            
                    else:
                        self.inputs.remove(fd)
                        if fd in self.outputs:
                            self.outputs.remove(fd)
                        del self.socket_queue[fd]
                        
                    send_data = 
                        
            for fd in read_output:
                try:
                    message = self.socket_queue.get_nowait()
                except queue.Empty:
                    self.outputs.remove(fd)
                    print('wait...')
                else:
                    getattr(self.file_attr[fd]['func'])(fd, message)

【select模塊】select IO多路復用和select實現FTP