python的并发和异步编程实例

发布时间:2019-08-09 10:34:14编辑:auto阅读(1653)

    关于并发、并行、同步阻塞、异步非阻塞、线程、进程、协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较。解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了。
    1、准备阶段
    下面为所有测试代码所需要的包

    #! python3
    # coding:utf-8
    
    import socket
    from concurrent import futures
    from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
    import asyncio
    import aiohttp
    import time
    from time import ctime

    在进行不同实现方式的比较时,实现场景就是在进行爬虫开发的时候通过向对方网站发起一系列的http请求访问,统计耗时来判断实现方式的优劣,具体地,通过建立通信套接字,访问新浪主页,返回源码,作为一次请求。先实现一个装饰器用来统计函数的执行时间:

    def tsfunc(func):
        def wrappedFunc(*args,**kargs):
            start = time.clock()
            action = func(*args,**kargs)
            time_delta = time.clock() - start
            print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
            return action
        return wrappedFunc

    输出的格式为:当前时间,调用的函数,函数的执行时间。
    2、阻塞/非阻塞和同步/异步
    这两对概念不是很好区分,从定义上理解:
    阻塞:在进行socket通信过程中,一个线程发起请求,如果当前请求没有返回结果,则进入sleep状态,期间线程挂起不能做其他操作,直到有返回结果,或者超时(如果设置超时的话)。
    非阻塞:与阻塞相似,只不过在等待请求结果时,线程并不挂起而是进行其他操作,即在不能立刻得到结果之前,该函数不会阻挂起当前线程,而会立刻返回。
    同步:同步和阻塞比较相似,但是二者并不是同一个概念,同步是指完成事件的逻辑,是指一件事完成之后,再完成第二件事,以此类推…
    异步:异步和非阻塞比较类似,异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者,实现异步的方式通俗讲就是“等会再告诉你”。
    1)阻塞方式
    回到代码上,首先实现阻塞方式的请求函数:

    def blocking_way():
        sock = socket.socket()
        sock.connect(('www.sina.com',80))
        request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            chunk = sock.recv(4096)
        return response

    测试线程、多进程和多线程

    # 阻塞无并发
    @tsfunc
    def sync_way():
        res = []
        for i in range(10):
            res.append(blocking_way())
        return len(res)
    @tsfunc
    # 阻塞、多进程
    def process_way():
        worker = 10
        with futures.ProcessPoolExecutor(worker) as executor:
            futs = {executor.submit(blocking_way) for i in range(10)}
        return len([fut.result() for fut in futs])
    # 阻塞、多线程
    @tsfunc
    def thread_way():
        worker = 10
        with futures.ThreadPoolExecutor(worker) as executor:
            futs = {executor.submit(blocking_way) for i in range(10)}
        return len([fut.result() for fut in futs])

    运行结果:

    [Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328
    [Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734
    [Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727

    可见与非并发的方式相比,启动10个进程完成10次请求访问耗费的时间最长,进程确实需要很大的系统开销,相比多线程则效果好得多,启动10个线程并发请求,比顺序请求速度快了6倍左右。
    2)非阻塞方式
    实现非阻塞的请求代码,与阻塞方式的区别在于等待请求时并不挂起而是直接返回,为了确保能正确读取消息,最原始的方式就是循环读取,知道读取完成为跳出循环,代码如下:

    def nonblocking_way():
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sock.setblocking(False)
        try:
            sock.connect(('www.sina.com', 80))
        except BlockingIOError:
            pass
        request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
        data = request.encode('ascii')
        while True:
            try:
                sock.send(data)
                break
            except OSError:
                pass
    
        response = b''
        while True:
            try:
                chunk = sock.recv(4096)
                while chunk:
                    response += chunk
                    chunk = sock.recv(4096)
                break
            except OSError:
                pass
    
        return response

    测试单线程异步非阻塞方式:

    @tsfunc
    def async_way():
        res = []
        for i in range(10):
            res.append(nonblocking_way())
        return len(res)

    测试结果与单线程同步阻塞方式相比:

    [Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574
    [Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886

    非阻塞方式起到了一定的效果,但是并不明显,原因肯定是读取消息的时候虽然不是在线程挂起的时候而是在循环读取消息的时候浪费了时间,如果大部分时间读浪费了并没有发挥异步编程的威力,解决的办法就是后面要说的【事件驱动】
    3、回调、生成器和协程
    a、回调

    class Crawler():
        def __init__(self,url):
            self.url = url
            self.sock = None
            self.response = b''
    
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('www.sina.com',80))
            except BlockingIOError:
                pass
            selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
    
        def connected(self,key,mask):
            selector.unregister(key.fd)
            get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
            self.sock.send(get.encode('ascii'))
            selector.register(key.fd,EVENT_READ,self.read_response)
    
        def read_response(self,key,mask):
            global stopped
            while True:
                try:
                    chunk = self.sock.recv(4096)
                    if chunk:
                        self.response += chunk
                        chunk = self.sock.recv(4096)
                    else:
                        selector.unregister(key.fd)
                        urls_todo.remove(self.url)
                        if not urls_todo:
                            stopped = True
                    break
                except:
                    pass
    
    def loop():
        while not stopped:
            events = selector.select()
            for event_key,event_mask in events:
                callback = event_key.data
                callback(event_key,event_mask)
     @tsfunc
    def callback_way():
        for url in urls_todo:
            crawler = Crawler(url)
            crawler.fetch()
        loop1()

    这是通过传统回调方式实现的异步编程,结果如下:
    [Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374
    b、生成器

    class Crawler2:
        def __init__(self, url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            global stopped
            sock = socket.socket()
            yield from connect(sock, ('www.sina.com', 80))
            get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
            sock.send(get.encode('ascii'))
            self.response = yield from read_all(sock)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True
    
    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future1()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                # send会进入到coro执行, 即fetch, 直到下次yield
                # next_future 为yield返回的对象
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
            next_future.add_done_callback(self.step)
    
    def loop1():
        while not stopped:
            events = selector.select()
            for event_key,event_mask in events:
                callback = event_key.data
                callback()

    运行结果如下:
    [Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473

    c、协程

    def nonblocking_way():
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sock.setblocking(False)
        try:
            sock.connect(('www.sina.com', 80))
        except BlockingIOError:
            pass
        request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
        data = request.encode('ascii')
        while True:
            try:
                sock.send(data)
                break
            except OSError:
                pass
    
        response = b''
        while True:
            try:
                chunk = sock.recv(4096)
                while chunk:
                    response += chunk
                    chunk = sock.recv(4096)
                break
            except OSError:
                pass
    
        return response
    @tsfunc
    def asyncio_way():
           tasks = [fetch(host+url) for url in urls_todo]
           loop.run_until_complete(asyncio.gather(*tasks))
           return (len(tasks))

    运行结果:
    [Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166

    到此终于把并发和异步编程实例代码测试完,下边贴出全部代码,共读者自行测试,在任务量加大时,相信结果会大不一样。

    #! python3
    # coding:utf-8
    
    import socket
    from concurrent import futures
    from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
    import asyncio
    import aiohttp
    import time
    from time import ctime
    
    def tsfunc(func):
        def wrappedFunc(*args,**kargs):
            start = time.clock()
            action = func(*args,**kargs)
            time_delta = time.clock() - start
            print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
            return action
        return wrappedFunc
    
    def blocking_way():
        sock = socket.socket()
        sock.connect(('www.sina.com',80))
        request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            chunk = sock.recv(4096)
        return response
    
    def nonblocking_way():
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sock.setblocking(False)
        try:
            sock.connect(('www.sina.com', 80))
        except BlockingIOError:
            pass
        request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
        data = request.encode('ascii')
        while True:
            try:
                sock.send(data)
                break
            except OSError:
                pass
    
        response = b''
        while True:
            try:
                chunk = sock.recv(4096)
                while chunk:
                    response += chunk
                    chunk = sock.recv(4096)
                break
            except OSError:
                pass
    
        return response
    
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']
    
    
    class Crawler():
        def __init__(self,url):
            self.url = url
            self.sock = None
            self.response = b''
    
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('www.sina.com',80))
            except BlockingIOError:
                pass
            selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
    
        def connected(self,key,mask):
            selector.unregister(key.fd)
            get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
            self.sock.send(get.encode('ascii'))
            selector.register(key.fd,EVENT_READ,self.read_response)
    
        def read_response(self,key,mask):
            global stopped
            while True:
                try:
                    chunk = self.sock.recv(4096)
                    if chunk:
                        self.response += chunk
                        chunk = self.sock.recv(4096)
                    else:
                        selector.unregister(key.fd)
                        urls_todo.remove(self.url)
                        if not urls_todo:
                            stopped = True
                    break
                except:
                    pass
    
    def loop():
        while not stopped:
            events = selector.select()
            for event_key,event_mask in events:
                callback = event_key.data
                callback(event_key,event_mask)
    
    
    # 基于生成器的协程
    class Future:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self,fn):
            self._callbacks.append(fn)
    
        def set_result(self,result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    
    class Crawler1():
        def __init__(self,url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            sock = socket.socket()
            sock.setblocking(False)
            try:
                sock.connect(('www.sina.com',80))
            except BlockingIOError:
                pass
    
            f = Future()
            def on_connected():
                f.set_result(None)
    
            selector.register(sock.fileno(),EVENT_WRITE,on_connected)
            yield f
            selector.unregister(sock.fileno())
            get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
            sock.send(get.encode('ascii'))
    
            global stopped
            while True:
                f = Future()
                def on_readable():
                    f.set_result(sock.recv(4096))
                selector.register(sock.fileno(),EVENT_READ,on_readable)
                chunk = yield f
                selector.unregister(sock.fileno())
                if chunk:
                    self.response += chunk
                else:
                    urls_todo.remove(self.url)
                    if  not urls_todo:
                        stopped = True
                    break
    
    
    # yield from 改进的生成器协程
    class Future1:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self,fn):
            self._callbacks.append(fn)
    
        def set_result(self,result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    
        def __iter__(self):
            yield self
            return self.result
    
    def connect(sock, address):
        f = Future1()
        sock.setblocking(False)
        try:
            sock.connect(address)
        except BlockingIOError:
            pass
    
        def on_connected():
            f.set_result(None)
    
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield from f
        selector.unregister(sock.fileno())
    
    def read(sock):
        f = Future1()
    
        def on_readable():
            f.set_result(sock.recv(4096))
    
        selector.register(sock.fileno(), EVENT_READ, on_readable)
        chunk = yield from f
        selector.unregister(sock.fileno())
        return chunk
    
    def read_all(sock):
        response = []
        chunk = yield from read(sock)
        while chunk:
            response.append(chunk)
            chunk = yield from read(sock)
        return b''.join(response)
    
    class Crawler2:
        def __init__(self, url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            global stopped
            sock = socket.socket()
            yield from connect(sock, ('www.sina.com', 80))
            get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
            sock.send(get.encode('ascii'))
            self.response = yield from read_all(sock)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True
    
    
    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future1()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                # send会进入到coro执行, 即fetch, 直到下次yield
                # next_future 为yield返回的对象
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
            next_future.add_done_callback(self.step)
    
    def loop1():
        while not stopped:
            events = selector.select()
            for event_key,event_mask in events:
                callback = event_key.data
                callback()
    
    
    # asyncio 协程
    host = 'http://www.sina.com'
    loop = asyncio.get_event_loop()
    
    async def fetch(url):
        async with aiohttp.ClientSession(loop=loop) as session:
            async with session.get(url) as response:
                response = await response.read()
                return response
    
    @tsfunc
    def asyncio_way():
           tasks = [fetch(host+url) for url in urls_todo]
           loop.run_until_complete(asyncio.gather(*tasks))
           return (len(tasks))
    
    @tsfunc
    def sync_way():
        res = []
        for i in range(10):
            res.append(blocking_way())
        return len(res)
    
    @tsfunc
    def process_way():
        worker = 10
        with futures.ProcessPoolExecutor(worker) as executor:
            futs = {executor.submit(blocking_way) for i in range(10)}
        return len([fut.result() for fut in futs])
    
    @tsfunc
    def thread_way():
        worker = 10
        with futures.ThreadPoolExecutor(worker) as executor:
            futs = {executor.submit(blocking_way) for i in range(10)}
        return len([fut.result() for fut in futs])
    
    @tsfunc
    def async_way():
        res = []
        for i in range(10):
            res.append(nonblocking_way())
        return len(res)
    
    @tsfunc
    def callback_way():
        for url in urls_todo:
            crawler = Crawler(url)
            crawler.fetch()
        loop1()
    
    @tsfunc
    def generate_way():
        for url in urls_todo:
            crawler = Crawler2(url)
            Task(crawler.fetch())
        loop1()
    
    if __name__ == '__main__':
    
        #sync_way()
        #process_way()
        #thread_way()
        #async_way()
        #callback_way()
        #generate_way()
        asyncio_way()
    
    

关键字

上一篇: python倒读文件

下一篇: python 分解url