您现在的位置是:网站首页>文章详情文章详情

python实现超时强停的线程池

inlike2019-05-31 原创文章 浏览(4220) 评论(0) 喜欢(33)

简介python的多线程在大都数人看来是鸡肋,其实不然,python多线程在网络密集型的应用场景中还是很有价值的;与进程不同的是多线程对计算机资源的占用较少;但是在python自带的threading库中并未实现线程池,反倒有第三方库concurrent实现线程池,但是存在一个缺

python的多线程在大都数人看来是鸡肋,其实不然,python多线程在网络密集型的应用场景中还是很有价值的;与进程不同的是多线程对计算机资源的占用较少;但是在python自带的threading库中并未实现线程池,反倒有第三方库concurrent实现线程池,但是存在一个缺点,concurrent设置的超时时间,是返回线程执行结果的返回时间,在达到超时时间后线程池强制返回结果,但不会停止线程。

image.png

而有这样的一个场景:在做一个站点的全栈克隆的时候,会存在超大型的网站,那么使用多个线程后我们希望,克隆一个网站的时间最大是半小时或者一个小时,而不会无限制的去请求从而占用大量资源,导致本地磁盘被垃圾内容填满;对于这种网络密集型的场景来说,线程池就是绝佳的方案,同时可以开启六七十个线程,一百个G的资源在两三个小时可以下载完成。

image.png

下面将从concurrent第三方库实现的线程池和threading自带线程库实现线程池,来讲讲解使用方法。

concurrent的Dome

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


# 参数times用来模拟网络请求的时间
def get_html(times):
    print("get page {}s finished".format(times))
    time.sleep(times)
    print('end')
    return times


executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]  # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()

concurrent可以设置堵塞方式,会等待线程池运行完成后统一返回结果,同时也可以设置超时等待,超过时间不管线程池运行完与否都将返回结果,但是不会结束线程,很明显这是个缺陷。


threading实现线程池

下面我们用threading模块来实现一个超时强制停止的模块,github代码

import threading
import time
import inspect
import ctypes


class ThreadPool:
    def __init__(self, size: int, timeout: int):
        self._size = size
        self._timeout = timeout

    def _async_raise(self, tid, exctype):
        """raises the exception, performs cleanup if needed"""
        tid = ctypes.c_long(tid)
        if not inspect.isclass(exctype):
            exctype = type(exctype)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid,
                                                         ctypes.py_object(
                                                             exctype))
        if res == 0:
            raise ValueError("invalid thread id")
        elif res != 1:
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")

    def _stop_thread(self, thread):
        self._async_raise(thread.ident, SystemExit)

    def start(self, func, task: list):
        record = dict()
        while len(task) or len(record) > 0:
            while len(record) < self._size and len(task) > 0:
                item = task.pop()
                t = threading.Thread(target=func, args=(item,))
                t.start()
                record[t.getName()] = {'thread': t, 'time': time.time()}
            dellist = []
            for k, v in record.items():
                print('检测:' + k)
                if v['thread'].isAlive():
                    if time.time() - v['time'] > self._timeout:
                        self._stop_thread(v['thread'])
                        dellist.append(k)
                else:
                    dellist.append(k)
            for dl in dellist:
                del (record[dl])


if __name__ == '__main__':
    def task(name):
        print("输出name", name)
        time.sleep(name)


    t1 = time.time()
    items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
    pool = ThreadPool(4, 3)
    pool.start(task, items)
    t2 = time.time()
    items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
    for i in items:
        task(i)
    t3 = time.time()
    print('多线程统计用时:{} 非多线程统计:{}'.format(t2 - t1, t3 - t2))

TreadPool类初始化参数是线程池的大小以及单个线程的超时时间,核心方法是start,该方法需要两个参数,一个是开启多线程的函数名,一个是任务列表,start方法实现了将任务列表加进线程池-检测任务完成或者超时-在增加任务;核心的是实现了超时强制结束线程,从而保证了线程的最大运行时间不会超过我们设定的时间。

image.png


很赞哦! ( 33)
    《Python实战进阶》
    None
    None
    夏至已深

站点信息

  • 建站时间:2019-5-24
  • 网站程序:like in love
  • 主题模板《今夕何夕》
  • 文章统计:104条
  • 文章评论:***条
  • 微信公众号:扫描二维码,关注我们
  • 个人微信公众号