您现在的位置是:网站首页>文章详情文章详情
python实现超时强停的线程池
inlike2019-05-31【
原创文章
】
浏览(3526)
评论(0)
喜欢(30)
简介python的多线程在大都数人看来是鸡肋,其实不然,python多线程在网络密集型的应用场景中还是很有价值的;与进程不同的是多线程对计算机资源的占用较少;但是在python自带的threading库中并未实现线程池,反倒有第三方库concurrent实现线程池,但是存在一个缺
python的多线程在大都数人看来是鸡肋,其实不然,python多线程在网络密集型的应用场景中还是很有价值的;与进程不同的是多线程对计算机资源的占用较少;但是在python自带的threading库中并未实现线程池,反倒有第三方库concurrent实现线程池,但是存在一个缺点,concurrent设置的超时时间,是返回线程执行结果的返回时间,在达到超时时间后线程池强制返回结果,但不会停止线程。
而有这样的一个场景:在做一个站点的全栈克隆的时候,会存在超大型的网站,那么使用多个线程后我们希望,克隆一个网站的时间最大是半小时或者一个小时,而不会无限制的去请求从而占用大量资源,导致本地磁盘被垃圾内容填满;对于这种网络密集型的场景来说,线程池就是绝佳的方案,同时可以开启六七十个线程,一百个G的资源在两三个小时可以下载完成。
下面将从concurrent第三方库实现的线程池和threading自带线程库实现线程池,来讲讲解使用方法。
concurrent的Dome
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 参数times用来模拟网络请求的时间
def get_html(times):
print("get page {}s finished".format(times))
time.sleep(times)
print('end')
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
for future in as_completed(all_task):
data = future.result()
concurrent可以设置堵塞方式,会等待线程池运行完成后统一返回结果,同时也可以设置超时等待,超过时间不管线程池运行完与否都将返回结果,但是不会结束线程,很明显这是个缺陷。
threading实现线程池
下面我们用threading模块来实现一个超时强制停止的模块,github代码
import threading
import time
import inspect
import ctypes
class ThreadPool:
def __init__(self, size: int, timeout: int):
self._size = size
self._timeout = timeout
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid,
ctypes.py_object(
exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def _stop_thread(self, thread):
self._async_raise(thread.ident, SystemExit)
def start(self, func, task: list):
record = dict()
while len(task) or len(record) > 0:
while len(record) < self._size and len(task) > 0:
item = task.pop()
t = threading.Thread(target=func, args=(item,))
t.start()
record[t.getName()] = {'thread': t, 'time': time.time()}
dellist = []
for k, v in record.items():
print('检测:' + k)
if v['thread'].isAlive():
if time.time() - v['time'] > self._timeout:
self._stop_thread(v['thread'])
dellist.append(k)
else:
dellist.append(k)
for dl in dellist:
del (record[dl])
if __name__ == '__main__':
def task(name):
print("输出name", name)
time.sleep(name)
t1 = time.time()
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
pool = ThreadPool(4, 3)
pool.start(task, items)
t2 = time.time()
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
for i in items:
task(i)
t3 = time.time()
print('多线程统计用时:{} 非多线程统计:{}'.format(t2 - t1, t3 - t2))
TreadPool类初始化参数是线程池的大小以及单个线程的超时时间,核心方法是start,该方法需要两个参数,一个是开启多线程的函数名,一个是任务列表,start方法实现了将任务列表加进线程池-检测任务完成或者超时-在增加任务;核心的是实现了超时强制结束线程,从而保证了线程的最大运行时间不会超过我们设定的时间。
相关文章
本栏推荐

标签云
猜你喜欢
站点信息
- 建站时间:2019-5-24
- 网站程序:like in love
- 主题模板:《今夕何夕》
- 文章统计:104条
- 文章评论:***条
- 微信公众号:扫描二维码,关注我们
