使用Python select 与 selector模块处理网络并发请求

news/2024/7/8 7:02:31

为什么选择 select技术处理并发❓

在并发场景中,如果使用线程池,或多进程进行并发处理,会占用过多的系统资源。这时可以考虑采用操作系统的 select 技术来处理。
select 能监听的socket文件描述符,监听的socoket产生事件,即会自动执行相应处理函数,如接送或发送。 当soket的客户端数量在1024以下,使用select是很合适的。但如果链接客户端过多,select采用的是轮询模型,服务器响应效率不高。应该采用epoll,或者用asyncio异步编程。

Python中有2个模块:select模块与selector模块,selector是高阶API,建议使用它。
两个模块提供了:select、poll、epoll三个方法,分别调用系统的select,poll,epoll 从而实现I/O多路复用。但注意,只有Linux支持epoll

  • Windows Python:提供: select
  • Mac Python:提供: select
  • Linux Python:提供: select、poll、epoll

Select模块的使用

Python 的select 模块是基于操作系统的event事件来实现的。
对于socket来说,它可以监控socket连接的readable, writable, error事件,对应地,select监视3个列表(read, write, error),当所监控的某个socket connection有事件发生,会添加到相应监控列表。比如,对于write list,当监控到某个连接的writable 事件后,即向该连接发送(write)数据.

python还提供了1个selectors 模块,其基于select模块,提供高阶编程接口。

服务端示例

import select
import socket


def start_server(port):
    HOST = '0.0.0.0'
    PORT = port

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))   # 套接字绑定的IP与端口
    server.listen(10)           # 开始TCP监听

    inputs = [server]           # 存放需要被检测可读的socket
    outputs = []                # 存放需要被检测可写的socket

    while inputs:
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        # 检查可读的端口
        for s in readable:
            if s is server:     # 可读的是server,说明有连接进入
                connection, client_address = s.accept()
                inputs.append(connection)     # 将新建立的soket加入input列表
            else:               # 客户请求socket
                data = s.recv(1024)        # 故意设置的这么小
                if data:
                    # 从这个socket当中收到数据, 
# 如果要发Response, 将其加入到outputs中, select模型将检查它是否可写
                    print(data.decode(encoding='utf-8'))
                    if s not in outputs:
                        outputs.append(s)
                else:
                    # 收到为空的数据,意味着对方已经断开连接,需要做清理工作
                    if s in outputs:
                        outputs.remove(s)
                    inputs.remove(s)
                    s.close()

        # 可写
        for w in writable:
            w.send('收到数据'.encode(encoding='utf-8'))
            outputs.remove(w)

        # 异常
        for s in exceptional:
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()

if __name__ == '__main__':
    start_server(8801)


客户端实现代码

import os
import time
import socket

def start_client(addr, port):
    PLC_ADDR = addr
    PLC_PORT = port
    s = socket.socket()
    s.connect((PLC_ADDR, PLC_PORT))
    count = 0
    while True:
        msg = '进程{pid}发送数据'.format(pid=os.getpid())
        msg = msg.encode(encoding='utf-8')
        s.send(msg)
        recv_data = s.recv(1024)
        print(recv_data.decode(encoding='utf-8'))
        time.sleep(3)
        count += 1
        if count > 20:
            break

    s.close()

if __name__ == '__main__':
    start_client('127.0.0.1', 8801)


selectors 模块编程步骤

selectors 模块是在 select 模块原型的基础上进行封装,建议使用此模块。
该模块提供了对 select() 和 poll() 函数的访问,这两个函数对大多数操作系统中是可用的。只有Linux版本支持epoll(). windows上只支持 select()方法

selector 数据主要数据结构

selector类的结构

BaseSelector

  • SelectSelector
  • PollSelector
  • EpollSelector
  • DevpollSelector
  • KqueueSelector

BaseSelector类,支持在多个文件对象上等待 I/O 事件就绪。支持文件流(socket也是file stream)注册、注销,以及在这些流上等待 I/O 事件。但它是一个抽象基类,因此不能被实例化。实例化时请改用 DefaultSelector,或者 SelectSelector, KqueueSelector 等。

基本方法

  • register(fileobj, events, data=None)
  • unregister(fileobj)
  • modify(fileobj, events, data=None)
  • select(timeout=None)

等待直到有已注册的文件对象就绪. 返回由 (key, events) 元组构成的列表,每项各表示一个就绪的文件对象。
key的类型selectors.SelectorKey.
events只有两个可选值:两个event

  • EVENT_READ 可读
  • EVENT_WRITE 可写

其它类:
class selectors.SelectorKey
selector 用其返回文件对象等。属性有:

  • fileobj 已注册的文件对象。
  • fd 下层的文件描述符。
  • events 必须在此文件对象上被等待的事件。
  • data 可选的关联到此文件对象的不透明数据:例如,这可被用来存储各个客户端的会话 ID

class selectors.SelectSelector
基于 select.select() 的选择器。
class selectors.PollSelector
基于 select.poll() 的选择器。

Selector编程步骤

1)创建socket:

host, port = sys.argv[1], int(sys.argv[2])
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print(f"Listening on {(host, port)}")

2) socket必须是non-blocking mode. 以便允许其它连接读写

lsock.setblocking(False)

3)生成1个select对象,将socket注册到selector对象,

sel = selectors.DefaultSelector()
sel.register(lsock, selectors.EVENT_READ, data=None)

参数说明:
• fileobj : windows 只接受socket stream
• events : 只有EVENT_READ, 或 EVENT_WRITE
• data, 是传给select()方法结果里的 key.data
socket第1次register时,触发只读事件,向select()方法传递 data 参数

  1. 服务器进入监听循环
    循环体内,首先用用 select()方法收集socket事件。
    events = sel.select(timeout=None) 返回的是 (key, events) 类型的list,每个元素对应1个 ready状态的Socket Object.
    key的类型为selectors.SelectorKey, 主要属性:
    • fileobj
    • fd 文件描述符
    • data 这个数据是由register()中的参数data传入
while True:
    events = sel.select()     # 收集socket状态事件
    for key, mask in events:  
        callback = key.data   # data是register()传过来的
        callback(key.fileobj, mask)    # 实际为 accept(), 或read()

  1. 编写对读,写处理的函数(略)

简单示例1

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    # 注册新进来的连接,状态为READ, data 为read. 
    sel.register(conn, selectors.EVENT_READ, read)   

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

当然也可以不用callback

import selectors
import socket

# Set up the server
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("localhost", 7342))
server.listen()

# Set up the selectors "bag of sockets"
selector = selectors.DefaultSelector()
selector.register(server, selectors.EVENT_READ)

while True:
    events = selector.select()
    for key, _ in events:
        sock = key.fileobj
        print("About to accept.")
        client, _ = sock.accept()
        print("Accepted.")


http://lihuaxi.xjx100.cn/news/1340719.html

相关文章

【USRP X410】LabVIEW参考架构软件,用于使用Ettus USRP X410对无线系统进行原型验证

LabVIEW参考架构软件,用于使用Ettus USRP X410对无线系统进行原型验证 设备 1 MHz to 7.2 GHz,400 MHz带宽,GPS驯服OCXO,USRP软件无线电设备 - Ettus USRP X410集成硬件和软件,可帮助您制作高性能无线系统的原型&…

VUE之基本部署及VScode常用插件

参考资料: 参考视频 VScode常用插件清单 node.js官网 node.js官网中文版 VUE官方文档 VScode常用插件: VScode常用插件详解见上述连接,插件列表如下: VScode的注释/取消注释快捷键为:Ctrl/ VUE的基本安装部署--…

小程序:如何判断机型

在小程序中,可以通过 wx.getSystemInfoSync() 方法获取当前设备的系统信息,包括机型、操作系统版本等。其中,通过系统信息中的 model 字段可以获取到当前设备的机型信息。 // 获取系统信息 const systemInfo wx.getSystemInfoSync();// 获取…

深圳交易所证券交易规则

目录 一、交易市场 1.深交所可交易证券 1)股票 2)存托凭证 3)基金 4)权证 5)经证监会批准的其他交易品种 2.交易参与人 3.交易时间 二、证券买卖 1.委托 1)投资者要求 2)委托方式 …

移动端声明及meta标签设置

移动端meta标签设置 1.设置当前html文件的字符编码 <meta charset"UTF-8"> 2设置浏览器的兼容模式&#xff08;让IE使用最新的浏览器渲染&#xff09; <meta http-equiv"X-UA-Compatible" content"IEedge"/> 3.视口&#xff08;快…

MySQL数据库——索引练习

一、练习题目 1、建立一个utf8编码的数据库test1 2、建立商品表goods和栏目表category&#xff08;要求&#xff1a;按如下表结构创建表&#xff0c;并且存储引擎engine myisam 字符集charset utf8&#xff09; 3、删除 goods 表中的 goods_desc 字段及货号字段,并增加 click…

找工作前,是否要去培训班进行学习呢?

在学习IT技术的过程中&#xff0c;你是否也被安利过各种五花八门的技术培训班&#xff1f;这些培训班都是怎样向你宣传的&#xff0c;你又对此抱有着怎样的态度呢&#xff1f;在培训班里学技术&#xff0c;真的有用吗&#xff1f; 一、引入话题 一年又一年的毕业季有来临了&am…

Vector - CANoe - CAPL文件加密

目录 为什么会有CAPL文件加密需求? 加密文件介绍 “*.can”和“*.cin” 文件创建 <