• 周日. 9月 25th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

python 并发编程之IO 模型

admin

11月 28, 2021

首先说一下 IO 发生时涉及的对象和步骤。以read 为例,会经历两个阶段:

1)等待数据准备

2)将数据从内核拷贝到进程中

二,阻塞Io(blocking IO)

在 Linux中  默认情况下所有的socket都是blocking,一个典型的读操作流程大概如下:

所以blocking IO 的特点就是在IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被block(阻塞)了

    几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

    ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

所以一个简单的解决方案:

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

该方案的问题是:

#开启多进程或多线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案:

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进方案其实也存在着问题:

#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

    对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

三 非阻塞io(non-blocking IO)

Linux 下,可以通过设置socket时期变为non-blocking.当对一个non-blocking  socket 执行读操作时,流程如下;

    从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

    也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

    所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

#服务端

from socket import *
import   time

s =socket()
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False)

r_list=[]
w_list=[]
while True:
    try:
        conn,addr= s.accept()
        r_list.append(conn)
    except BlockingIOError:
        print('可以去干其他活了')
        print('rlist:',len(r_list))

        del_rlist=[]
        for conn in r_list:
            try:
                data =conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list.append((conn,data.upper()))
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_rlist.append(conn)

        del_wlist=[]
        for item in w_list:
            try:
                conn=item[0]
                res=item[1]
                conn.send(res)
                del_wlist.append(item)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_wlist.append(item)


        for conn in del_rlist:
            r_list.remove(conn)

        for  item in del_wlist:
            w_list.remove(item)



#客户端

from socket import *
import  os
client =socket()
client.connect(('127.0.0.1',8080))

while True:
    data='%s say hello'%os.getpid()
    client.send(data.encode('utf-8'))
    res=client.recv(1024)
    print(res.decode('utf-8'))

但是非阻塞IO 模型决不被推荐。

我们不能否认其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是‘后台’可以有多个任务在‘同时’执行)

但是也难掩其缺点:

1:循环调用recv() 将大幅度提到CPU占用率,这也是我们在代码中留一句time.sleep(2)de 原因,否则在低配主机下极容易出现卡机极容易出现卡机情况。
2:。任务完成的响应延迟增大了, 因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成, 这回导致整体数据吞吐量的降低。

多路复用IO (IOmultiplexing)

 IO multiplexing 也叫select/epoll, 他的好处就在单个process 就可以同时出处理多个网络连接的io.

基本原理就是select/epoll.这个function 会不断的轮询负责所有的socket,当某个 socket 有数据到达了,就通知用户进程。特的流程如图:

当用户进程调用了select,name整个进程会被block,而同时,select 会检测所有的它所负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程在调用read操作, 将数据从kernel 拷贝到用户进程。

强调:

1、如果处理的连接数不是很高的话,使用selec/epoll 的web server 不一定比使用multi_threading+blocking IO 的web server 性能更好,可能延尺

还更大。select/epool的优势并不是对单个连接能处理的更快, 而是在于能处理更多的连接。

 2、在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是如果整个用户的process其实是一致被block的,值不过process是被select这个函数block ,而不 是被被socket io给block.

结论:

select的优势在于可以处理多个连接,不适用与单个连接

#服务端
from socket import  *
import  select

s=socket
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False)

r_list=[s,]
w_list=[]
w_data={}
while True:
    print('被检测r_list: ',len(r_list))
    print('被检测w_list: ',len(w_list))
    rl,wl,xl=select.select(r_list,w_list,[],)

    for r in rl:
        if r==s:
            conn,addr=r.accept()
            r_list.append(conn)
        else:
            try:
                data = r.recv(1024)
                if not data:
                    r.close()
                    r_list.remove(r)
                    continue

                w_list.append(r)
                w_data[r]=data.upper()
            except ConnectionResetError:
                r.close()
                r_list.remove(r)
                continue
    for w in wl:
        w.send(w_data[w])
        w_list.remove(w)
        w_data.pop(w)




#客户端



from socket import  *
import os,

client=socket()
client.connect(('127.0.0.1',8080))
while True:
    data ="%s say hello "%os.getpid()
    client.send(data.encode('utf-8'))
    res=client.recv(1024)
    print(res.deccode('utf-8'))

该模型优点:

相比其他模型,使用select()的事件驱动模型是用单线程(进程)执行,占用资源少,不消耗太多cpu,同时能够为多客户端提供服务,如果视图建立一个简单的事件驱动的服务器程序, 这个模型有一定的参考价值。

该模型的缺点

首先select()接口并不是事先‘事件驱动’的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linuxt提供了epoll。。等等。如果需要实现更高效的服务器程序, 类似epoll 这样的接口更被推荐, 遗憾的是不同的操作系统提供的epoll接口有很大的差异。所以使用类似于epoll 的接口实现据欧较好的跨平台能力的服务器会比较困难。

其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

 异步IO(Asynchronous I/O)

Linux 下的asynchronous IO 其实用的不多,从内核2.6版本才开始引入,先看他的流程:

 用户进程发起 read 操作之后,立刻就可以开始去做其他事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

发表回复

您的电子邮箱地址不会被公开。