"""
This module is YATP (Yet another thread pool)
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright (c) 2015 VMware, Inc. All rights reserved.'
import six
import sys
import time
import threading
from six.moves import queue
[docs]class WorkItem(object):
""" Work item """
def __init__(self, fn, *args, **kwargs):
"""
Work item constructor
:type fn: function
:param fn: Work item functor
:type args: :class:`tuple`
:param args: Work item functor positional parameters
:type kwargs: :class:`dict`
:param kwargs: Work item functor key-value parameters
"""
self.fn = fn
self.args = args
self.kwargs = kwargs
self.ret = None
self.err = None
self.event = threading.Event()
[docs] def join(self, timeout=float('inf')):
"""
Wait for work item is done
:type timeout: :class:`float`
:param timeout: Timeout in seconds
:rtype: :class:`object`
:return: Work item functor's return value
"""
# Wait until work is done
self.event.wait(timeout)
# Throw exception if error occured
if self.err:
raise self.err # pylint: disable=E0702
return self.ret
[docs] def done(self):
""" Signal work item is done """
self.event.set()
def __enter__(self):
"""
with statement enter
:rtype: :class:`WorkItem`
:return: Work item object
"""
return self
def __exit__(self, typ, value, traceback):
""" with statement exit """
self.join()
del self.event, self.fn, self.args, self.kwargs
## Thread pool
#
[docs]class ThreadPool(object):
""" Thread pool """
def __init__(self, min_workers=1, max_workers=8,
idle_timeout=5 * 60, logger=None):
"""
Thread pool constructor
:type min_workers: :class:`int`
:params min_workers: Min number of worker threads
:type max_workers: :class:`int`
:params max_workers: Max number of worker threads
:type idle_timeout: :class:`int`
:params idle_timeout: Worker threads idle timeout
:type logger: :class:`logging.logger`
:params logger: logger to use. Default to stdout
"""
assert(min_workers >= 0)
assert(min_workers <= max_workers)
self.min_workers = min_workers
self.max_workers = max_workers
self.idle_timeout = idle_timeout
self.workers = {}
self.work_items = queue.Queue(0)
self.lock = threading.Lock()
self.shutting_down = False
self.logger = logger
for _ in range(0, self.min_workers):
self._add_worker()
def _log(self, msg):
""" Log message """
if self.logger:
self.logger.info(msg)
else:
print(msg)
def _worker(self):
""" Thread pool worker """
thd_name = threading.currentThread().getName()
while not self.shutting_down:
try:
# Wait for request
try:
work_item = self.work_items.get(timeout=self.idle_timeout)
except queue.Empty:
work_item = None
if not work_item:
# Worker idle timeout. Retire thread if needed
self.lock.acquire()
try:
done_thread = len(self.workers) > self.min_workers
if done_thread:
self._remove_worker(thd_name)
break
else:
continue
finally:
self.lock.release()
elif self.shutting_down:
# Put the work item back to queue
self.work_items.put(work_item)
break
# Start work
work_item.ret = work_item.fn(*work_item.args, **work_item.kwargs)
except: # pylint: disable=W0702
if sys:
import traceback
errtype, errvalue, trace = sys.exc_info()
stack_trace = " ".join(traceback.format_exception(
errtype, errvalue, trace))
self._log("\n".join([thd_name + " caught exception: " + str(errtype),
stack_trace]))
if work_item:
work_item.err = errvalue
#
# NOTE: See the Python documentation for sys.exc_info for a warning
# about an inefficiency in garbage collection and the need to
# delete the local variable to which stacktrace is assigned
try:
del trace
except: # pylint: disable=W0702
pass
else:
# System is dying and likely to be in undefined state.
# sys (and other imported modules) could be unloaded and set
# to None when we get here. Must quit as quickly as possible
return
# Signal done on work_item
work_item.done()
# One less worker
self.lock.acquire()
try:
self._remove_worker(thd_name)
finally:
self.lock.release()
def _remove_worker(self, thd_name):
"""
Remove a worker. Assume locked
:type thd_name: :class:`str`
:param thd_name: Remove a worker thread with the thread name
"""
self.workers.pop(thd_name, None)
def _add_worker(self):
""" Add a worker. Assume locked """
if len(self.workers) < self.max_workers:
thd = threading.Thread(target=self._worker)
thd.setDaemon(True)
thd.start()
self.workers[thd.getName()] = thd
[docs] def queue_work(self, fn, *args, **kwargs):
"""
Queue work
Returns a work_item when work is queued to work queue
The work will start when a ready worker is available to process the work
User could call {work_item}.join() to wait for the work item to finish
:type fn: function
:param fn: Work item functor
:type args: :class:`tuple`
:param args: Work item functor positional parameters
:type kwargs: :class:`dict`
:param kwargs: Work item functor key-value parameters
:rtype: :class:`WorkItem`
:return: work item when work is queued to work queue
"""
if self.shutting_down:
return None
# Add worker if needed
self.lock.acquire()
try:
self._add_worker()
finally:
self.lock.release()
work_item = WorkItem(fn, *args, **kwargs)
self.work_items.put(work_item)
return work_item
@staticmethod
[docs] def normalize_works(works):
"""
Generator to return work in normalize form: (fn, args, kwargs)
:type works: iteratable of fn / (fn, args) / (fn, args, kwargs)
:param works: An iteratable of possible functor form
:rtype: :class:`tuple` of (fn, args, kwargs)
:return: A normalize tuple of (functor, args, kwargs)
"""
for work in works:
args = ()
kwargs = {}
if six.callable(work):
fn = work
elif len(work) >= 3:
fn, args, kwargs = work
elif len(work) == 2:
fn, args = work
else:
fn = work[0]
yield (fn, args, kwargs)
[docs] def queue_works_and_wait(self, works):
"""
Queue a brunch of works and wait until all works are completed / error
out
:type works: iteratable of fn / (fn, args) / (fn, args, kwargs)
:param works: An iteratable of possible functor form
:rtype: :class:`list` of {tuple} of :class:`bool`,:class:`object` or :class:`bool`,:class:`Exception`
:return: A list of (True, return val) / (False, exception) when all works done
"""
work_items = [self.queue_work(fn, *args, **kwargs)
for fn, args, kwargs in self.normalize_works(works)]
results = []
for work in work_items:
if work:
try:
ret = work.join()
results.append((True, ret))
except: # pylint: disable=W0702
results.append((False, sys.exc_info()[0]))
else:
# No work queued
results.append((False, None))
return results
[docs] def queue_work_and_wait(self, fn, *args, **kwargs):
"""
Queue a work and wait until the work is completed / error out
:type fn: function
:param fn: Work item functor
:type args: :class:`tuple`
:param args: Work item functor positional parameters
:type kwargs: :class:`dict`
:param kwargs: Work item functor key-value parameters
:rtype: :class:`tuple` of (:class:`bool`, :class:`object`) or (:class:`bool`, :class:`Exception`)
:return: (True, return val) / (False, exception) when work is done
"""
return self.queue_works_and_wait([(fn, args, kwargs)])[0]
[docs] def shutdown(self, no_wait=False):
"""
Shuthdown this thread pool
:type no_wait: :class:`bool`
:param no_wait: Set to True to return immediately without waiting for all workers to quit
"""
# Set myself as shutting down.
if self.shutting_down:
return
self.shutting_down = True
# Queue a fake work item
work_item = object()
self.work_items.put(work_item)
# Wait until all workers quit
if not no_wait:
self._log("shutdown: Waiting for workers to quit...")
while True:
self.lock.acquire()
try:
num_workers = len(self.workers)
finally:
self.lock.release()
# done if no worker left or not making progress
if num_workers == 0:
break
time.sleep(0.1)
self._log("shutdown: All workers quit")
def __del__(self):
""" Destructor """
self.shutdown()
def __enter__(self):
""" with statment enter """
return self
def __exit__(self, typ, value, traceback):
""" with statment exit """
self.shutdown()