Python 中 Ctrl+C 不能终

发布时间:2019-09-28 08:37:58编辑:auto阅读(2913)

    本文理论上对multiprocessing.dummy的Pool同样有效。

    python2.x中multiprocessing提供的基于函数进程池,join后陷入内核态,按下ctrl+c不能停止所有的进程并退出。即必须ctrl+z后找到残留的子进程,把它们干掉。先看一段ctrl+c无效的代码:

    #!/usr/bin/env python
    import multiprocessing
    import os
    import time
    
    
    def do_work(x):
        print 'Work Started: %s' % os.getpid()
        time.sleep(10)
        return x * x
    
    
    def main():
        pool = multiprocessing.Pool(4)
        try:
            result = pool.map_async(do_work, range(8))
            pool.close()
            pool.join()
            print result
        except KeyboardInterrupt:
            print 'parent received control-c'
            pool.terminate()
            pool.join()
     
    
    if __name__ == "__main__":
        main()

    这段代码运行后,按^c一个进程也杀不掉,最后会残留包括主进程在内共5个进程(1+4),kill掉主进程能让其全部退出。很明显,使用进程池时KeyboardInterrupt不能被进程捕捉。解决方法有两种。

    方案一

    下面这段是python源码里multiprocessing下的pool.py中的一段,ApplyResult就是Pool用来保存函数运行结果的类

    class ApplyResult(object):
    
        def __init__(self, cache, callback):
            self._cond = threading.Condition(threading.Lock())
            self._job = job_counter.next()
            self._cache = cache
            self._ready = False
            self._callback = callback
            cache[self._job] = self

    而下面这段代码也是^c无效的代码

    if __name__ == '__main__':
        import threading
    
        cond = threading.Condition(threading.Lock())
        cond.acquire()
        cond.wait()
        print "done"

    很明显,threading.Condition(threading.Lock())对象无法接收KeyboardInterrupt,但稍微修改一下,给cond.wait()一个timeout参数即可,这个timeout可以在map_async后用get传递,把

    result = pool.map_async(do_work, range(4))

    改为

    result = pool.map_async(do_work, range(4)).get(1)

    就能成功接收^c了,get里面填1填99999还是0xffff都行

    方案二

    另一种方法当然就是自己写进程池了,需要使用队列,贴一段代码感受下

    #!/usr/bin/env python
    import multiprocessing, os, signal, time, Queue
    
    def do_work():
        print 'Work Started: %d' % os.getpid()
        time.sleep(2)
        return 'Success'
    
    def manual_function(job_queue, result_queue):
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        while not job_queue.empty():
            try:
                job = job_queue.get(block=False)
                result_queue.put(do_work())
            except Queue.Empty:
                pass
            #except KeyboardInterrupt: pass
    
    def main():
        job_queue = multiprocessing.Queue()
        result_queue = multiprocessing.Queue()
    
        for i in range(6):
            job_queue.put(None)
    
        workers = []
        for i in range(3):
            tmp = multiprocessing.Process(target=manual_function,
                                          args=(job_queue, result_queue))
            tmp.start()
            workers.append(tmp)
    
        try:
            for worker in workers:
                worker.join()
        except KeyboardInterrupt:
            print 'parent received ctrl-c'
            for worker in workers:
                worker.terminate()
                worker.join()
    
        while not result_queue.empty():
            print result_queue.get(block=False)
    
    if __name__ == "__main__":
        main()

    方案三

    使用一个全局变量eflag作标识,让SIG_INT信号绑定一个处理函数,在其中对eflag的值更改,线程的函数中以eflag的值判定作为while的条件,把语句写在循环里,老实说这个方案虽然可以用,但是简直太差劲。线程肯定是可行的,进程应该还需要单独共享变量,非常不推荐的方式

    常见的错误方案

    这个必须要提一下,我发现segmentfault上都有人被误导了

    理论上,在Pool初始化时传递一个initializer函数,让子进程忽略SIGINT信号,也就是^c,然后Pool进行terminate处理。代码

    #!/usr/bin/env python
    import multiprocessing
    import os
    import signal
    import time
    
    
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    
    def run_worker(x):
        print "child: %s" % os.getpid()
        time.sleep(20)
        return x * x
    
    
    def main():
        pool = multiprocessing.Pool(4, init_worker)
        try:
            results = []
            print "Starting jobs"
            for x in range(8):
                results.append(pool.apply_async(run_worker, args=(x,)))
    
            time.sleep(5)
            pool.close()
            pool.join()
            print [x.get() for x in results]
        except KeyboardInterrupt:
            print "Caught KeyboardInterrupt, terminating workers"
            pool.terminate()
            pool.join()
    
    
    if __name__ == "__main__":
        main()

    然而这段代码只有在运行在time.sleep(5)处的时候才能用ctrl+c中断,即前5s你按^c有效,一旦pool.join()后则完全无效!

    建议

    先确认是否真的需要用到多进程,如果是IO多的程序建议用多线程或协程,计算特别多则用多进程。如果非要用多进程,可以利用Python3的concurrent.futures包(python2.x也能装),编写更加简单易用的多线程/多进程代码,其使用和Java的concurrent框架有些相似.
    经过亲自验证,ProcessPoolExecutor是没有^c的问题的,要用多进程建议使用它

    参考

    1. http://bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges

    2. http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool#comment12678760_6191991

关键字