"""Classes and functions dealing with augmentation on multiple CPU cores."""
from __future__ import print_function, division, absolute_import
import sys
import multiprocessing
import threading
import traceback
import time
import random
import platform
import numpy as np
import cv2
import imgaug.imgaug as ia
import imgaug.random as iarandom
from imgaug.augmentables.batches import Batch, UnnormalizedBatch
if sys.version_info[0] == 2:
# pylint: disable=redefined-builtin, import-error
import cPickle as pickle
from Queue import Empty as QueueEmpty, Full as QueueFull
import socket
BrokenPipeError = socket.error
elif sys.version_info[0] == 3:
import pickle
from queue import Empty as QueueEmpty, Full as QueueFull
_CONTEXT = None
# Added in 0.4.0.
def _get_context_method():
vinfo = sys.version_info
# get_context() is only supported in 3.5 and later (same for
# set_start_method)
get_context_unsupported = (
vinfo[0] == 2
or (vinfo[0] == 3 and vinfo[1] <= 3))
method = None
# Fix random hanging code in NixOS by switching to spawn method,
# see issue #414
# TODO This is only a workaround and doesn't really fix the underlying
# issue. The cause of the underlying issue is currently unknown.
# Its possible that #535 fixes the issue, though earlier tests
# indicated that the cause was something else.
# TODO this might break the semaphore used to prevent out of memory
# errors
if "NixOS" in platform.version():
method = "spawn"
if get_context_unsupported:
ia.warn("Detected usage of imgaug.multicore in python <=3.4 "
"and NixOS. This is known to sometimes cause endlessly "
"hanging programs when also making use of multicore "
"augmentation (aka background augmentation). Use "
"python 3.5 or later to prevent this.")
if get_context_unsupported:
return False
return method
# Added in 0.4.0.
def _set_context(method):
# method=False indicates that multiprocessing module (i.e. no context)
# should be used, e.g. because get_context() is not supported
globals()["_CONTEXT"] = (
multiprocessing if method is False
else multiprocessing.get_context(method))
# Added in 0.4.0.
def _reset_context():
globals()["_CONTEXT"] = None
# Added in 0.4.0.
def _autoset_context():
_set_context(_get_context_method())
# Added in 0.4.0.
def _get_context():
if _CONTEXT is None:
_autoset_context()
return _CONTEXT
[docs]class Pool(object):
"""
Wrapper around ``multiprocessing.Pool`` for multicore augmentation.
Parameters
----------
augseq : imgaug.augmenters.meta.Augmenter
The augmentation sequence to apply to batches.
processes : None or int, optional
The number of background workers, similar to the same parameter in
multiprocessing.Pool. If ``None``, the number of the machine's CPU
cores will be used (this counts hyperthreads as CPU cores). If this is
set to a negative value ``p``, then ``P - abs(p)`` will be used,
where ``P`` is the number of CPU cores. E.g. ``-1`` would use all
cores except one (this is useful to e.g. reserve one core to feed
batches to the GPU).
maxtasksperchild : None or int, optional
The number of tasks done per worker process before the process is
killed and restarted, similar to the same parameter in
multiprocessing.Pool. If ``None``, worker processes will not be
automatically restarted.
seed : None or int, optional
The seed to use for child processes. If ``None``, a random seed will
be used.
"""
# This attribute saves the augmentation sequence for background workers so
# that it does not have to be resend with every batch. The attribute is set
# once per worker in the worker's initializer. As each worker has its own
# process, it is a different variable per worker (though usually should be
# of equal content).
_WORKER_AUGSEQ = None
# This attribute saves the initial seed for background workers so that for
# any future batch the batch's specific seed can be derived, roughly via
# SEED_START+SEED_BATCH. As each worker has its own process, this seed can
# be unique per worker even though all seemingly use the same constant
# attribute.
_WORKER_SEED_START = None
def __init__(self, augseq, processes=None, maxtasksperchild=None,
seed=None):
# make sure that don't call pool again in a child process
assert Pool._WORKER_AUGSEQ is None, (
"_WORKER_AUGSEQ was already set when calling Pool.__init__(). "
"Did you try to instantiate a Pool within a Pool?")
assert processes is None or processes != 0, (
"Expected `processes` to be `None` (\"use as many cores as "
"available\") or a negative integer (\"use as many as available "
"MINUS this number\") or an integer>1 (\"use exactly that many "
"processes\"). Got type %s, value %s instead." % (
type(processes), str(processes))
)
self.augseq = augseq
self.processes = processes
self.maxtasksperchild = maxtasksperchild
if seed is not None:
assert iarandom.SEED_MIN_VALUE <= seed <= iarandom.SEED_MAX_VALUE, (
"Expected `seed` to be either `None` or a value between "
"%d and %d. Got type %s, value %s instead." % (
iarandom.SEED_MIN_VALUE,
iarandom.SEED_MAX_VALUE,
type(seed),
str(seed)
)
)
self.seed = seed
# multiprocessing.Pool instance
self._pool = None
# Running counter of the number of augmented batches. This will be
# used to send indexes for each batch to the workers so that they can
# augment using SEED_BASE+SEED_BATCH and ensure consistency of applied
# augmentation order between script runs.
self._batch_idx = 0
@property
def pool(self):
"""Return or create the ``multiprocessing.Pool`` instance.
This creates a new instance upon the first call and afterwards
returns that instance (until the property ``_pool`` is set to
``None`` again).
Returns
-------
multiprocessing.Pool
The ``multiprocessing.Pool`` used internally by this
``imgaug.multicore.Pool``.
"""
if self._pool is None:
processes = self.processes
if processes is not None and processes < 0:
# cpu count returns the number of logical cpu cores, i.e.
# including hyperthreads could also use
# os.sched_getaffinity(0) here, which seems to not exist on
# BSD though.
# In python 3.4+, there is also os.cpu_count(), which
# multiprocessing.cpu_count() then redirects to.
# At least one guy on stackoverflow.com/questions/1006289
# reported that only os.* existed, not the multiprocessing
# method.
# TODO make this also check if os.cpu_count exists as a
# fallback
try:
processes = _get_context().cpu_count() - abs(processes)
processes = max(processes, 1)
except (ImportError, NotImplementedError):
ia.warn(
"Could not find method multiprocessing.cpu_count(). "
"This will likely lead to more CPU cores being used "
"for the background augmentation than originally "
"intended.")
processes = None
self._pool = _get_context().Pool(
processes,
initializer=_Pool_initialize_worker,
initargs=(self.augseq, self.seed),
maxtasksperchild=self.maxtasksperchild)
return self._pool
[docs] def map_batches(self, batches, chunksize=None):
"""
Augment a list of batches.
Parameters
----------
batches : list of imgaug.augmentables.batches.Batch
The batches to augment.
chunksize : None or int, optional
Rough indicator of how many tasks should be sent to each worker.
Increasing this number can improve performance.
Returns
-------
list of imgaug.augmentables.batches.Batch
Augmented batches.
"""
self._assert_batches_is_list(batches)
return self.pool.map(
_Pool_starworker,
self._handle_batch_ids(batches),
chunksize=chunksize)
[docs] def map_batches_async(self, batches, chunksize=None, callback=None,
error_callback=None):
"""
Augment batches asynchonously.
Parameters
----------
batches : list of imgaug.augmentables.batches.Batch
The batches to augment.
chunksize : None or int, optional
Rough indicator of how many tasks should be sent to each worker.
Increasing this number can improve performance.
callback : None or callable, optional
Function to call upon finish. See ``multiprocessing.Pool``.
error_callback : None or callable, optional
Function to call upon errors. See ``multiprocessing.Pool``.
Returns
-------
multiprocessing.MapResult
Asynchonous result. See ``multiprocessing.Pool``.
"""
self._assert_batches_is_list(batches)
return self.pool.map_async(
_Pool_starworker,
self._handle_batch_ids(batches),
chunksize=chunksize,
callback=callback,
error_callback=error_callback)
@classmethod
def _assert_batches_is_list(cls, batches):
assert isinstance(batches, list), (
"Expected `batches` to be a list, got type %s. Call "
"imap_batches() if you use generators.") % (type(batches),)
[docs] def imap_batches(self, batches, chunksize=1, output_buffer_size=None):
"""
Augment batches from a generator.
Pattern for output buffer constraint is from
https://stackoverflow.com/a/47058399.
Parameters
----------
batches : generator of imgaug.augmentables.batches.Batch
The batches to augment, provided as a generator. Each call to the
generator should yield exactly one batch.
chunksize : None or int, optional
Rough indicator of how many tasks should be sent to each worker.
Increasing this number can improve performance.
output_buffer_size : None or int, optional
Max number of batches to handle *at the same time* in the *whole*
pipeline (including already augmented batches that are waiting to
be requested). If the buffer size is reached, no new batches will
be loaded from `batches` until a produced (i.e. augmented) batch is
consumed (i.e. requested from this method).
The buffer is unlimited if this is set to ``None``. For large
datasets, this should be set to an integer value to avoid filling
the whole RAM if loading+augmentation happens faster than training.
*New in version 0.3.0.*
Yields
------
imgaug.augmentables.batches.Batch
Augmented batch.
"""
self._assert_batches_is_generator(batches)
# buffer is either None or a Semaphore
output_buffer_left = _create_output_buffer_left(output_buffer_size)
# TODO change this to 'yield from' once switched to 3.3+
gen = self.pool.imap(
_Pool_starworker,
self._ibuffer_batch_loading(
self._handle_batch_ids_gen(batches),
output_buffer_left
),
chunksize=chunksize)
for batch in gen:
yield batch
if output_buffer_left is not None:
output_buffer_left.release()
[docs] def imap_batches_unordered(self, batches, chunksize=1,
output_buffer_size=None):
"""Augment batches from a generator (without preservation of order).
Pattern for output buffer constraint is from
https://stackoverflow.com/a/47058399.
Parameters
----------
batches : generator of imgaug.augmentables.batches.Batch
The batches to augment, provided as a generator. Each call to the
generator should yield exactly one batch.
chunksize : None or int, optional
Rough indicator of how many tasks should be sent to each worker.
Increasing this number can improve performance.
output_buffer_size : None or int, optional
Max number of batches to handle *at the same time* in the *whole*
pipeline (including already augmented batches that are waiting to
be requested). If the buffer size is reached, no new batches will
be loaded from `batches` until a produced (i.e. augmented) batch is
consumed (i.e. requested from this method).
The buffer is unlimited if this is set to ``None``. For large
datasets, this should be set to an integer value to avoid filling
the whole RAM if loading+augmentation happens faster than training.
*New in version 0.3.0.*
Yields
------
imgaug.augmentables.batches.Batch
Augmented batch.
"""
self._assert_batches_is_generator(batches)
# buffer is either None or a Semaphore
output_buffer_left = _create_output_buffer_left(output_buffer_size)
gen = self.pool.imap_unordered(
_Pool_starworker,
self._ibuffer_batch_loading(
self._handle_batch_ids_gen(batches),
output_buffer_left
),
chunksize=chunksize
)
for batch in gen:
yield batch
if output_buffer_left is not None:
output_buffer_left.release()
@classmethod
def _assert_batches_is_generator(cls, batches):
assert ia.is_generator(batches), (
"Expected `batches` to be generator, got type %s. Call "
"map_batches() if you use lists.") % (type(batches),)
def __enter__(self):
assert self._pool is None, (
"Tried to __enter__ a pool that has already been initialized.")
_ = self.pool # initialize internal multiprocessing pool instance
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
"""Close the pool gracefully."""
if self._pool is not None:
self._pool.close()
self._pool.join()
self._pool = None
[docs] def terminate(self):
"""Terminate the pool immediately."""
if self._pool is not None:
self._pool.terminate()
self._pool.join()
self._pool = None
# TODO why does this function exist if it may only be called after
# close/terminate and both of these two already call join() themselves
[docs] def join(self):
"""
Wait for the workers to exit.
This may only be called after first calling
:func:`~imgaug.multicore.Pool.close` or
:func:`~imgaug.multicore.Pool.terminate`.
"""
if self._pool is not None:
self._pool.join()
def _handle_batch_ids(self, batches):
ids = np.arange(self._batch_idx, self._batch_idx + len(batches))
inputs = list(zip(ids, batches))
self._batch_idx += len(batches)
return inputs
def _handle_batch_ids_gen(self, batches):
for batch in batches:
batch_idx = self._batch_idx
yield batch_idx, batch
self._batch_idx += 1
@classmethod
def _ibuffer_batch_loading(cls, batches, output_buffer_left):
for batch in batches:
if output_buffer_left is not None:
output_buffer_left.acquire()
yield batch
def _create_output_buffer_left(output_buffer_size):
output_buffer_left = None
if output_buffer_size:
assert output_buffer_size > 0, (
"Expected buffer size to be greater than zero, but got size %d "
"instead." % (output_buffer_size,))
output_buffer_left = _get_context().Semaphore(output_buffer_size)
return output_buffer_left
# This could be a classmethod or staticmethod of Pool in 3.x, but in 2.7 that
# leads to pickle errors.
def _Pool_initialize_worker(augseq, seed_start):
# pylint: disable=invalid-name, protected-access
# Not using this seems to have caused infinite hanging in the case
# of gaussian blur on at least MacOSX.
# It is also in most cases probably not sensible to use multiple
# threads while already running augmentation in multiple processes.
cv2.setNumThreads(0)
if seed_start is None:
# pylint falsely thinks in older versions that
# multiprocessing.current_process() was not callable, see
# https://github.com/PyCQA/pylint/issues/1699
# pylint: disable=not-callable
process_name = _get_context().current_process().name
# pylint: enable=not-callable
# time_ns() exists only in 3.7+
if sys.version_info[0] == 3 and sys.version_info[1] >= 7:
seed_offset = time.time_ns()
else:
seed_offset = int(time.time() * 10**6) % 10**6
seed = hash(process_name) + seed_offset
_reseed_global_local(seed, augseq)
Pool._WORKER_SEED_START = seed_start
Pool._WORKER_AUGSEQ = augseq
# not sure if really necessary, but shouldn't hurt either
Pool._WORKER_AUGSEQ.localize_random_state_()
# This could be a classmethod or staticmethod of Pool in 3.x, but in 2.7 that
# leads to pickle errors.
def _Pool_worker(batch_idx, batch):
# pylint: disable=invalid-name, protected-access
assert ia.is_single_integer(batch_idx), (
"Expected `batch_idx` to be an integer. Got type %s instead." % (
type(batch_idx)
))
assert isinstance(batch, (UnnormalizedBatch, Batch)), (
"Expected `batch` to be either an instance of "
"`imgaug.augmentables.batches.UnnormalizedBatch` or "
"`imgaug.augmentables.batches.Batch`. Got type %s instead." % (
type(batch)
))
assert Pool._WORKER_AUGSEQ is not None, (
"Expected `Pool._WORKER_AUGSEQ` to NOT be `None`. Did you manually "
"call _Pool_worker()?")
augseq = Pool._WORKER_AUGSEQ
# TODO why is this if here? _WORKER_SEED_START should always be set?
if Pool._WORKER_SEED_START is not None:
seed = Pool._WORKER_SEED_START + batch_idx
_reseed_global_local(seed, augseq)
result = augseq.augment_batch_(batch)
return result
# could be a classmethod or staticmethod of Pool in 3.x, but in 2.7 that leads
# to pickle errors starworker is here necessary, because starmap does not exist
# in 2.7
def _Pool_starworker(inputs):
# pylint: disable=invalid-name
return _Pool_worker(*inputs)
def _reseed_global_local(base_seed, augseq):
seed_global = _derive_seed(base_seed, -10**9)
seed_local = _derive_seed(base_seed)
iarandom.seed(seed_global)
augseq.seed_(seed_local)
def _derive_seed(base_seed, offset=0):
return (
iarandom.SEED_MIN_VALUE
+ (base_seed + offset)
% (iarandom.SEED_MAX_VALUE - iarandom.SEED_MIN_VALUE)
)
[docs]class BatchLoader(object):
"""**Deprecated**. Load batches in the background.
Deprecated. Use ``imgaug.multicore.Pool`` instead.
Loaded batches can be accesses using :attr:`imgaug.BatchLoader.queue`.
Parameters
----------
load_batch_func : callable or generator
Generator or generator function (i.e. function that yields Batch
objects) or a function that returns a list of Batch objects.
Background loading automatically stops when the last batch was yielded
or the last batch in the list was reached.
queue_size : int, optional
Maximum number of batches to store in the queue. May be set higher
for small images and/or small batches.
nb_workers : int, optional
Number of workers to run in the background.
threaded : bool, optional
Whether to run the background processes using threads (True) or full
processes (False).
"""
@ia.deprecated(alt_func="imgaug.multicore.Pool")
def __init__(self, load_batch_func, queue_size=50, nb_workers=1,
threaded=True):
assert queue_size >= 2, (
"Queue size for BatchLoader must be at least 2, "
"got %d." % (queue_size,))
assert nb_workers >= 1, (
"Number of workers for BatchLoader must be at least 1, "
"got %d" % (nb_workers,))
self._queue_internal = multiprocessing.Queue(queue_size//2)
self.queue = multiprocessing.Queue(queue_size//2)
self.join_signal = multiprocessing.Event()
self.workers = []
self.threaded = threaded
seeds = iarandom.get_global_rng().generate_seeds_(nb_workers)
for i in range(nb_workers):
if threaded:
worker = threading.Thread(
target=self._load_batches,
args=(load_batch_func, self._queue_internal,
self.join_signal, None)
)
else:
worker = multiprocessing.Process(
target=self._load_batches,
args=(load_batch_func, self._queue_internal,
self.join_signal, seeds[i])
)
worker.daemon = True
worker.start()
self.workers.append(worker)
self.main_worker_thread = threading.Thread(
target=self._main_worker,
args=()
)
self.main_worker_thread.daemon = True
self.main_worker_thread.start()
[docs] def count_workers_alive(self):
return sum([int(worker.is_alive()) for worker in self.workers])
[docs] def all_finished(self):
"""
Determine whether the workers have finished the loading process.
Returns
-------
out : bool
True if all workers have finished. Else False.
"""
return self.count_workers_alive() == 0
def _main_worker(self):
workers_running = self.count_workers_alive()
while workers_running > 0 and not self.join_signal.is_set():
# wait for a new batch in the source queue and load it
try:
batch_str = self._queue_internal.get(timeout=0.1)
if batch_str == "":
workers_running -= 1
else:
self.queue.put(batch_str)
except QueueEmpty:
time.sleep(0.01)
except (EOFError, BrokenPipeError):
break
workers_running = self.count_workers_alive()
# All workers have finished, move the remaining entries from internal
# to external queue
while True:
try:
batch_str = self._queue_internal.get(timeout=0.005)
if batch_str != "":
self.queue.put(batch_str)
except QueueEmpty:
break
except (EOFError, BrokenPipeError):
break
self.queue.put(pickle.dumps(None, protocol=-1))
time.sleep(0.01)
@classmethod
def _load_batches(cls, load_batch_func, queue_internal, join_signal,
seedval):
# pylint: disable=broad-except
if seedval is not None:
random.seed(seedval)
np.random.seed(seedval)
iarandom.seed(seedval)
try:
gen = (
load_batch_func()
if not ia.is_generator(load_batch_func)
else load_batch_func
)
for batch in gen:
assert isinstance(batch, Batch), (
"Expected batch returned by load_batch_func to "
"be of class imgaug.Batch, got %s." % (
type(batch),))
batch_pickled = pickle.dumps(batch, protocol=-1)
while not join_signal.is_set():
try:
queue_internal.put(batch_pickled, timeout=0.005)
break
except QueueFull:
pass
if join_signal.is_set():
break
except Exception:
traceback.print_exc()
finally:
queue_internal.put("")
time.sleep(0.01)
[docs] def terminate(self):
"""Stop all workers."""
# pylint: disable=protected-access
if not self.join_signal.is_set():
self.join_signal.set()
# give minimal time to put generated batches in queue and gracefully
# shut down
time.sleep(0.01)
if self.main_worker_thread.is_alive():
self.main_worker_thread.join()
if self.threaded:
for worker in self.workers:
if worker.is_alive():
worker.join()
else:
for worker in self.workers:
if worker.is_alive():
worker.terminate()
worker.join()
# wait until all workers are fully terminated
while not self.all_finished():
time.sleep(0.001)
# empty queue until at least one element can be added and place None
# as signal that BL finished
if self.queue.full():
self.queue.get()
self.queue.put(pickle.dumps(None, protocol=-1))
time.sleep(0.01)
# clean the queue, this reportedly prevents hanging threads
while True:
try:
self._queue_internal.get(timeout=0.005)
except QueueEmpty:
break
if not self._queue_internal._closed:
self._queue_internal.close()
if not self.queue._closed:
self.queue.close()
self._queue_internal.join_thread()
self.queue.join_thread()
time.sleep(0.025)
def __del__(self):
if not self.join_signal.is_set():
self.join_signal.set()
[docs]class BackgroundAugmenter(object):
"""
**Deprecated**. Augment batches in the background processes.
Deprecated. Use ``imgaug.multicore.Pool`` instead.
This is a wrapper around the multiprocessing module.
Parameters
----------
batch_loader : BatchLoader or multiprocessing.Queue
BatchLoader object that loads the data fed into the
BackgroundAugmenter, or alternatively a Queue. If a Queue, then it
must be made sure that a final ``None`` in the Queue signals that the
loading is finished and no more batches will follow. Otherwise the
BackgroundAugmenter will wait forever for the next batch.
augseq : Augmenter
An augmenter to apply to all loaded images.
This may be e.g. a Sequential to apply multiple augmenters.
queue_size : int
Size of the queue that is used to temporarily save the augmentation
results. Larger values offer the background processes more room
to save results when the main process doesn't load much, i.e. they
can lead to smoother and faster training. For large images, high
values can block a lot of RAM though.
nb_workers : 'auto' or int
Number of background workers to spawn.
If ``auto``, it will be set to ``C-1``, where ``C`` is the number of
CPU cores.
"""
@ia.deprecated(alt_func="imgaug.multicore.Pool")
def __init__(self, batch_loader, augseq, queue_size=50, nb_workers="auto"):
assert queue_size > 0, (
"Expected 'queue_size' to be at least 1, got %d." % (queue_size,))
self.augseq = augseq
self.queue_source = (
batch_loader
if isinstance(batch_loader, multiprocessing.queues.Queue)
else batch_loader.queue
)
self.queue_result = multiprocessing.Queue(queue_size)
if nb_workers == "auto":
try:
nb_workers = multiprocessing.cpu_count()
except (ImportError, NotImplementedError):
nb_workers = 1
# try to reserve at least one core for the main process
nb_workers = max(1, nb_workers - 1)
else:
assert nb_workers >= 1, (
"Expected 'nb_workers' to be \"auto\" or at least 1, "
"got %d instead." % (nb_workers,))
self.nb_workers = nb_workers
self.workers = []
self.nb_workers_finished = 0
seeds = iarandom.get_global_rng().generate_seeds_(nb_workers)
for i in range(nb_workers):
worker = multiprocessing.Process(
target=self._augment_images_worker,
args=(augseq, self.queue_source, self.queue_result, seeds[i])
)
worker.daemon = True
worker.start()
self.workers.append(worker)
[docs] def all_finished(self):
return self.nb_workers_finished == self.nb_workers
[docs] def get_batch(self):
"""
Returns a batch from the queue of augmented batches.
If workers are still running and there are no batches in the queue,
it will automatically wait for the next batch.
Returns
-------
out : None or imgaug.Batch
One batch or None if all workers have finished.
"""
if self.all_finished():
return None
batch_str = self.queue_result.get()
batch = pickle.loads(batch_str)
if batch is not None:
return batch
self.nb_workers_finished += 1
if self.nb_workers_finished >= self.nb_workers:
try:
# remove `None` from the source queue
self.queue_source.get(timeout=0.001)
except QueueEmpty:
pass
return None
return self.get_batch()
@classmethod
def _augment_images_worker(cls, augseq, queue_source, queue_result,
seedval):
"""
Augment endlessly images in the source queue.
This is a worker function for that endlessly queries the source queue
(input batches), augments batches in it and sends the result to the
output queue.
"""
np.random.seed(seedval)
random.seed(seedval)
augseq.seed_(seedval)
iarandom.seed(seedval)
loader_finished = False
while not loader_finished:
# wait for a new batch in the source queue and load it
try:
batch_str = queue_source.get(timeout=0.1)
batch = pickle.loads(batch_str)
if batch is None:
loader_finished = True
# put it back in so that other workers know that the
# loading queue is finished
queue_source.put(pickle.dumps(None, protocol=-1))
else:
batch_aug = augseq.augment_batch_(batch)
# send augmented batch to output queue
batch_str = pickle.dumps(batch_aug, protocol=-1)
queue_result.put(batch_str)
except QueueEmpty:
time.sleep(0.01)
queue_result.put(pickle.dumps(None, protocol=-1))
time.sleep(0.01)
[docs] def terminate(self):
"""
Terminates all background processes immediately.
This will also free their RAM.
"""
# pylint: disable=protected-access
for worker in self.workers:
if worker.is_alive():
worker.terminate()
self.nb_workers_finished = len(self.workers)
if not self.queue_result._closed:
self.queue_result.close()
time.sleep(0.01)
def __del__(self):
time.sleep(0.1)
self.terminate()