Python并发处理

发布时间:2019-08-22 08:02:36编辑:auto阅读(1547)

    1.创建并销毁线程

    #!/usr/bin/python
    
    #code to execute in an independent thread
    
    import time
    def countdown(n):
        while n > 0:
           print('T-minus',n)
           n -= 1
           time.sleep(5)
    
    #create and launch a thread
    
    from threading import Thread
    t = Thread(target=countdown,args=(10,))
    t.start()



    # python concurency.py 
    ('T-minus', 10)
    ('T-minus', 9)
    ('T-minus', 8)
    ('T-minus', 7)
    ('T-minus', 6)
    ('T-minus', 5)
    ('T-minus', 4)
    ('T-minus', 3)
    ('T-minus', 2)
    ('T-minus', 1)



    创建一个线程实例后,需要调用start()让其运行。线程是以系统级别线程的方式执行,由操作系统管理。一旦执行,各个线程独立运行,直到目标函数返回结果,可以使用is_alive()查询一个线程实例是否正在运行。

    if t.is_alive():
       print('The thread is still running')
    else:
       print('Completed')



    可以使用join()来请求结合一个线程



    执行上述代码时,python解释器会等待所有线程结束。对于需要时间较长的线程或者后台任务,可以考虑将线程以daemon的方式运行


    t.setDaemon(True)


    不能结合以daemon方式运行的线程,当主线程终止时它们会自动销毁



    如果想要能够终止线程,这些线程必须能够可编程在选择的点上能够退出

    import time
    from threading import Thread
    
    class CountdownTask:
         def __init__(self):
             self._running=True
    
         def terminate(self):
             self._running=False
    
         def run(self,n):
             while self._running and n > 0:
                 print('T-minus',n)
                 n-=1
                 time.sleep(5)
    
    c=CountdownTask()
    t=Thread(target=c.run,args=(10,))
    t.start()
    
    c.terminate()
    t.join()


    由于Global Interpreter Lock(GIL)全局解释器锁的原因,Python线程被限制在任何给定时间内只能有一个线程可以执行。基于这个原因,Python线程不能用于那些需要大量计算的任务。Python线程更适合用于I/O处理,处理那些执行阻塞操作例如等待I/O,等待数据库操作结果等的代码的并发执行。




    2.判断一个线程是否已经启动

    Problem:

    已经启动一个线程,但是想要知道它什么时候开始运行的


    Solution:

    Python线程的一个关键特性就是它们独立执行并且非确定性。如果程序的其他线程需要知道是否一个线程执行进一步操作之前已经达到某一个阶段。threading模块的Event对象可以帮助解决这个问题‘

    #!/usr/bin/python
    
    from threading import Thread,Event
    import time
    
    #code to execute in an independent thread
    
    def countdown(n,started_evt):
        print('countdown starting')
        started_evt.set()
        while n > 0:
           print('T-minus',n)
           n -= 1
           time.sleep(5)
    #create the event object that will be used  to signal startup
    started_evt = Event()
    
    #launch the thread and pass the startup event
    print('launching countdown')
    t=Thread(target=countdown,args=(10,started_evt))
    t.start()
    
    #wait for the thread to start
    started_evt.wait()
    print('countdown is running')



    Event objects are best used for one-time events.





    #!/usr/bin/python
    
    import threading
    import time
    
    class PerodicTimer:
        def __init__(self,interval):
            self._interval=interval
            self._flag=0
            self._cv=threading.Condition()
    
        def start(self):
            t=threading.Thread(target=self.run)
            t.daemon=True
            t.start()
    
        def run(self):
            '''
              Run the timer and notify waiting threads after each interval
            '''
            while True:
               time.sleep(self._interval)
               with self._cv:
                    self._flag ^= 1
                    self._cv.notify_all()
    
    
        def wait_for_tick(self):
            '''
              wait for the next tick of the timer
            '''
            with self._cv:
                 last_flag=self._flag
                 while last_flag == self._flag:
                    self._cv.wait()
    #example use of the timer
    ptimer=PerodicTimer(5)
    ptimer.start()
    
    #two threads that synchronize on the timer
    
    def countdown(nticks):
        while nticks > 0:
              ptimer.wait_for_tick()
              print('T-minus',nticks)
              nticks-=1
    
    def countup(last):
        n=0
        while n < last:
           ptimer.wait_for_tick()
           print('Counting',n)
           n+=1
    
    threading.Thread(target=countdown,args=(10,)).start()
    threading.Thread(target=countup,args=(5,)).start()
    # python event2.py 
    ('Counting', 0)
    ('T-minus', 10)
    ('T-minus', 9)
    ('Counting', 1)
    ('Counting', 2)
    ('T-minus', 8)
    ('Counting', 3)
    ('T-minus', 7)
    ('Counting', 4('T-minus', 6)
    )
    ('T-minus', 5)
    ('T-minus', 4)
    ('T-minus', 3)
    ('T-minus', 2)
    ('T-minus', 1)







    Event对象的一个很重要的特性就是它们会唤醒所有正在等待的线程。如果想要编写一个程序只唤醒单个正在等待的线程,更好使用Semaphore或者Condition对象

    #!/usr/bin/python
    
    import threading
    
    #worker thread
    def worker(n,sema):
        #wait to be signaled
        sema.acquire()
        #do some work
        print('working',n)
        
    #create some threads
    sema=threading.Semaphore(0)
    nworkers=10
    for n in range(nworkers):
        t=threading.Thread(target=worker,args=(n,sema,))
        t.start()


    如果运行这个程序,一组线程会被启动,但是没有任何事情发生。应为它们都被阻塞等待获取信号量。每次释放信号量,只有一个worker将被唤醒和运行


    直接在python终端执行将无任何反应并且无法终止程序

    # python semaphore.py 
    ^C^C^C^C^C^C^C


    在ipython终端中输入以上程序然后执行

    In [8]: sema.release()
    
    In [9]: ('working', 0)
    
    
    In [9]: sema.release()
    
    In [10]: ('working', 1)
    
    
    In [10]: sema.release()
    ('working', 2)
     
    In [11]: sema.release()
    ('working'
    , 3)
    In [12]: sema.release()
    ('working', 4)
    
    In [13]: sema.release()
    ('working', 5)
    
    In [14]: sema.release()
    ('working', 6)
     
    In [15]: sema.release()
    
    In [16]: ('working', 7)
    
    
    In [16]: sema.release()
    
    In [17]: ('working', 8)
    
    
    In [17]: sema.release()
    ('working', 
    9)
    In [18]: sema.release()
    
    In [19]: sema.release()



    3.线程间通信

    Problem:

    执行程序时开启了多个线程,现在需要在这些线程之间通信或者交换数据

    Solution:

    也许从一个线程发送数据到另一个线程的最安全的方式就是使用 Queue模块.可以创建一个 Queue实例用于所有的线程共享。这些线程然后使用put()或者get()操作来向队列中添加或者删除项目

    #!/usr/bin/python
    from threading import Thread
    from Queue import Queue
    
    #a thread that produces data
    def producer(out_q):
       while True:
          #produce some data
          data="producer data"
          out_q.put(data)
    
    
    #a thread that comsumes data
    def consumer(in_q):
       while True:
          #get some data
          data=in_q.get()
          #process the data
          print data
          print "consumer"
    
    #create the shared queue and launch both threads
    q=Queue()
    t1=Thread(target=consumer,args=(q,))
    t2=Thread(target=producer,args=(q,))
    t1.start()
    t2.start()


    执行结果:

    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    producer data
    consumer
    .......
    .......


    以上代码会不断地循环执行。当使用队列时,协调关闭producer和comsumer会比较诡异。









    参考文章:

    http://chimera.labs.oreilly.com/books/1230000000393/ch12.html


关键字