用Python实现一个简单的线程池

发布时间:2019-08-26 07:20:45编辑:auto阅读(1692)

    线程池的概念是什么?

    在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是 如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些 很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。

    我理解为线程池是一个存放很多线程的单位,同时还有一个对应的任务队列。整个执行过程其实就是使用线程池中已有有限的线程把任务 队列中的任务做完。这样做的好处就是你不需要为每个任务都创建一个线程,因为当你创建第100个线程来执行第100个任务的时候,可能前面已经有50个线 程结束工作了。因此重复利用线程来执行任务,减少系统资源的开销。

    一个不怎么恰当的比喻就是,有100台电脑主机箱需要从1楼搬到2楼,你不需要喊来100人帮忙搬,你只需要叫十个或者二十个人就足以,每个人分配十个或者五个甚至是谁搬的快谁就多搬知道完成未知。(这个比喻好像。。。。。)

    不管如何吧,大体上理解了线程池的概念。那么怎么用python实现呢?

    代码如下

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    
    #Python的线程池实现
    
    import Queue
    import threading
    import sys
    import time
    import urllib
    
    #替我们工作的线程池中的线程
    class MyThread(threading.Thread):
     def __init__(self, workQueue, resultQueue,timeout=30, **kwargs):
      threading.Thread.__init__(self, kwargs=kwargs)
      #线程在结束前等待任务队列多长时间
      self.timeout = timeout
      self.setDaemon(True)
      self.workQueue = workQueue
      self.resultQueue = resultQueue
      self.start()
    
     def run(self):
      while True:
       try:
        #从工作队列中获取一个任务
        callable, args, kwargs = self.workQueue.get(timeout=self.timeout)
        #我们要执行的任务
        res = callable(args, kwargs)
        #报任务返回的结果放在结果队列中
        self.resultQueue.put(res+" | "+self.getName())
       except Queue.Empty: #任务队列空的时候结束此线程
        break
       except :
        print sys.exc_info()
        raise
    
    class ThreadPool:
     def __init__( self, num_of_threads=10):
      self.workQueue = Queue.Queue()
      self.resultQueue = Queue.Queue()
      self.threads = []
      self.__createThreadPool( num_of_threads )
    
     def __createThreadPool( self, num_of_threads ):
      for i in range( num_of_threads ):
       thread = MyThread( self.workQueue, self.resultQueue )
       self.threads.append(thread)
    
     def wait_for_complete(self):
      #等待所有线程完成。
      while len(self.threads):
       thread = self.threads.pop()
       #等待线程结束
       if thread.isAlive():#判断线程是否还存活来决定是否调用join
        thread.join()
    
     def add_job( self, callable, *args, **kwargs ):
      self.workQueue.put( (callable,args,kwargs) )
    
    def test_job(id, sleep = 0.001 ):
     html = ""
     try:
      time.sleep(1)
      conn = urllib.urlopen('http://www.yizhan56.cn/')
      html = conn.read(20)
     except:
      print  sys.exc_info()
     return  html
    
    def test():
     print 'start testing'
     tp = ThreadPool(10)
     for i in range(50):
      time.sleep(0.2)
      tp.add_job( test_job, i, i*0.001 )
     tp.wait_for_complete()
     #处理结果
     print 'result Queue\'s length == %d '% tp.resultQueue.qsize()
     while tp.resultQueue.qsize():
      print tp.resultQueue.get()
     print 'end testing'
    if __name__ == '__main__':
     test()

    这个代码清晰易懂。

关键字