Files
palemoon27/python/futures/test_futures.py
T
roytam1 688c3af674 import changes from `dev' branch of rmottola/Arctic-Fox:
- Bug 1235021 - Re-emit ChromeManifestEntries from the jar manifest handler code in the FasterMake backend. r=gps (a8d899a6da)
- Bug 1233282 - Make CONFIGURE_DEFINE_FILES considered more as GENERATED_FILES in the emitter. r=gps (d29506fb55)
- Bug 1235021 - Add a RenamedSourcePath helper class. r=gps (62e78b867b)
- Bug 1235021 - Re-emit FinalTarget{,Preprocessed}Files from the jar manifest handler code in the FasterMake backend. r=gps (c37287a5d7)
- Bug 1235021 - Avoid passing defines to FasterMakeBackend._consume_jar_manifest. r=gps (dc0d17c3a6)
- Bug 1235021 - Move FasterMakeBackend._consume_jar_manifest to CommonBackend. r=gps (b9bb6b7d1e)
- Bug 1239217 - Add the notion of Partial and Hybrid build backends. r=gps Make the FasterMake backend a partial build backend. (219c0811e6)
- Bug 1239217 - Stop making the FasterMake build system refresh the backend on its own. r=gps (4f79f966ce)
- Bug 1241398 - Show the diff for created and deleted files in `mach build-backend --diff`. r=gps (d497d3aef8)
- Bug 1214885 - Add a "ChromeUrl" build backend to write out information useful for resolving chrome urls. r=glandium (83ad13d109)
- Bug 1216817 - Part 1: Add install_callback to artifacts. r=gps (95b4860d09)
- Bug 1216817 - Part 2: Narrow distdir to bindir in artifacts. r=gps (e48b531455)
- Bug 1216817 - Part 4: Add --enable-artifact-builds and MOZ_ARTIFACT_BUILDS. r=glandium (8d7ed76621)
- bug 1164816 - Import concurrent.futures into the tree. r=gps (bc83211833)
- bug 1190603 - import PyECC library r=gps,gerv (e0c5afeee0)
- Bug 1216817 - Part 5: Run |mach artifact install| automatically when asked. r=glandium (835c27d9c2)
- Bug 1216817 - Follow-up: Fix "KeyError: uMOZ_ARTIFACT_BUILDS" in config.status. r=bustage (e87e04e23b)
- Bug 1216817 - Follow-up: Fix "KeyError: u'MOZ_ARTIFACT_BUILDS'". r=bustage (797331293b)
- Bug 1207897 - Add a configure option to build multiple build backends. r=gps (35f62c27ca)
- Bug 1241398 - Allow to pass the --verbose flag down to config.status from `mach build-backend`. r=gps (78610c40d0)
- Bug 1236111 - part 1: avoid configure.in Windows-only goop when running with disable-compile-environment, r=gps (d27a7e522a)
- Bug 1236111 - part 2: fix mozbuild to use the file mode modifiers specified for opening when writing a FileAvoidWrite, r=gps,nalexander (e240c613b7)
- Bug 1207890 - Part 1: Add rich ArtifactJob extension point. r=glandium (e402f5fcec)
- Bug 1207890 - Part 2: Stop extracting build ID from artifacts. r=glandium (314d6895c1)
- Bug 1207890 - Part 3: Post-process downloaded artifacts. r=glandium (09d60ac030)
- Bug 1207890 - Part 4: Download and process Mac OS X artifacts. r=glandium (181ba370b1)
- Bug 1207890 - Pre: Make JarWriter handle inputs with read() but not seek(). r=glandium (6ebb5dfe94)
- Bug 1207890 - Post: Hacks to make --disable-compile-environment work on Mac OS X. r=glandium (c5f88b6adf)
- Bug 1207890 - Post: Move |mach artifact| command out of mobile/android. r=glandium (a06f97dfb9)
- Bug 1207890 - Post: Hack to make |mach run| for Mac OS X artifact builds. r=me (4c6d2f6bfe)
- Bug 1207890 - Follow-up: Fix |mach artifact install| for mobile/android. r=me (a2e4347ca9)
- Bug 1236111 - part 3: ensure calls to hg and mach work on Windows, and that we use the right file mode when writing artifacts, r=nalexander,gps (d0090a5a56)
- Bug 1236111 - part 4: actually add Windows support to artifact code, r=nalexander,gps (ab40057ffa)
- Bug 1236111 - part 0: improve logging from process mixin, r=gps (d85265c134)
- Bug 1241398 - Add a dry-run mode to mach build-backend. r=gps (b300169915)
- Bug 1239217 - Make the RecursiveMake build system create backend files generically. r=gps (fba90d6bcb)
- fix minor misspatch of 1240990 (b7d44692bc)
- Bug 1239296 - Use telemetry_handler to store build resource data r=gps (58d7c3a260)
- Bug 1244143 - Record whether or not an artifact build was used in build telemetry data r=gps (d1821d1987)
- Bug 1246264 - Ensure cache directory exists for artifacts installation r=chmanchester (ef5c4a0fba)
- bug 1237619: save resource usage for "what" builds r=gps (6a311c71bc)
- Bug 1239296 - Add telemetry_handler function to mach context r=gps (4a7a67740d)
- Bug 1246402 - Environment variable to disable mercurial setup check. r=gps (d9cf129b6c)
- Bug 1239296 - add post_dispatch_handler hook to mach r=gps (aa55c9a36e)
- Bug 1236110 - Extend mach artifact to handle Linux Desktop builds. r=gps (cb29ca6d1d)
- Bug 1234912 - Check for mozext and pushlog entries after |mach artifact install| hg failure. r=gps (7bfb064c7c)
- Bug 1239096 - Improve English is artifacts.py comments. r=me (38aa5ecb19)
- Bug 1238320 - Part 1 (Linux): Download test binaries necessary to run xpcshell tests and mochitests in artifact builds. r=nalexander (f6407791ae)
- Bug 1238320 - Part 2 (Mac): Download test binaries necessary to run xpcshell tests and mochitests in artifact builds. r=nalexander (4d72cfc6f2)
- Bug 1238320 - Part 3 (Windows): Download test binaries necessary to run xpcshell tests and mochitests in artifact builds. r=nalexander# Please enter the commit message for your changes. Lines starting (40ac9f9f7d)
- Bug 1239678 - fix dll inclusion pattern on Windows and the placement of nested dlls like browsercomps and clearkey, r=nalexander (ad9015c9d9)
- Bug 1239738 - Handle artifact builds with no test binaries cleanly. r=ahunt (ba1593837a)
- Bug 1240323 - Fix installation of binary components in a subdir of dist/bin for linux artifact builds. r=nalexander (2f4b719ea3)
- Bug 1240239 - Install test plugins in artifact based builds. r=nalexander (edc24f4fd2)
- Bug 1240667 - Detect a tree to use for artifact builds based on recent changesets. r=nalexander (947879cb19)
- Bug 1244941 - Don't fill install manifest with artifacts. r=nalexander (8fa9793c53)
- Bug 1237619: Record build objects in resource_usage.json r=gps (c323d21c9f)
- bug 1237619: Add system and command metadata to resouce_usage.json r=gps (c93fb18c37)
- Bug 1240059 - Treat psutil as optional in record_resource_usage. r=gps (c91103ebce)
- Bug 1244160 - Create json-schema for build telemetry data r=gps (d8b3419cfd)
- Bug 1250624 - Overall system resources is displayed twice; r=chmanchester (a115c86902)
- Bug 1144842 (part 1) - Don't use MOZ_PROFILING before all the places it can be set. r=glandium. (3c12a2e29a)
- Bug 1144842 (part 2) - Make --enable-dmd imply --enable-profiling. r=glandium. (85c9ff5c32)
- Bug 1144842 (part 3) - Remove --enable-dmd code from js/src/configure.in. r=glandium. (52cf663bc7)
- Bug 1204260 - Pre: Don't expose ANDROID_{BUILD,PLATFORM}_TOOLS. r=glandium,gbrown (d4f560dd46)
- Bug 1219803 - Support 'mach run' for Android; r=jmaher (5a1a1ab16e)
- Bug 1219807 - Add tooltool manifests for jimdb; r=jmaher (4d7a211569)
- Bug 1221846 - Get Task Tracer building on desktop r=cyu. (5d1a0fabe9)
- Bug 1216681 - Add a fileid utility to extract the breakpad GUID from object files for identification in fix_stack_using_bpsyms. r=ted (e53eb5acc6)
- Bug 1237156 - Only build the fileid utility when MOZ_CRASHREPORTER is set. r=ted.mielczarek (328a80ae18)
- Bug 1239866 - Remove signaling standalone tests. r=bwc (b05b091059)
2023-09-27 11:04:31 +08:00

725 lines
24 KiB
Python

import os
import subprocess
import sys
import threading
import functools
import contextlib
import logging
import re
import time
from StringIO import StringIO
from test import test_support
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
try:
import unittest2 as unittest
except ImportError:
import unittest
def reap_threads(func):
"""Use this function when threads are being used. This will
ensure that the threads are cleaned up even when the test fails.
If threading is unavailable this function does nothing.
"""
@functools.wraps(func)
def decorator(*args):
key = test_support.threading_setup()
try:
return func(*args)
finally:
test_support.threading_cleanup(*key)
return decorator
# Executing the interpreter in a subprocess
def _assert_python(expected_success, *args, **env_vars):
cmd_line = [sys.executable]
if not env_vars:
cmd_line.append('-E')
# Need to preserve the original environment, for in-place testing of
# shared library builds.
env = os.environ.copy()
# But a special flag that can be set to override -- in this case, the
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
env.update(env_vars)
cmd_line.extend(args)
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
try:
out, err = p.communicate()
finally:
subprocess._cleanup()
p.stdout.close()
p.stderr.close()
rc = p.returncode
err = strip_python_stderr(err)
if (rc and expected_success) or (not rc and not expected_success):
raise AssertionError(
"Process return code is %d, "
"stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
return rc, out, err
def assert_python_ok(*args, **env_vars):
"""
Assert that running the interpreter with `args` and optional environment
variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
"""
return _assert_python(True, *args, **env_vars)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
return stderr
@contextlib.contextmanager
def captured_stderr():
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO."""
logging_stream = StringIO()
handler = logging.StreamHandler(logging_stream)
logging.root.addHandler(handler)
try:
yield logging_stream
finally:
logging.root.removeHandler(handler)
def create_future(state=PENDING, exception=None, result=None):
f = Future()
f._state = state
f._exception = exception
f._result = result
return f
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
def mul(x, y):
return x * y
def sleep_and_raise(t):
time.sleep(t)
raise Exception('this is an exception')
def sleep_and_print(t, msg):
time.sleep(t)
print(msg)
sys.stdout.flush()
class ExecutorMixin:
worker_count = 5
def setUp(self):
self.t1 = time.time()
try:
self.executor = self.executor_type(max_workers=self.worker_count)
except NotImplementedError:
e = sys.exc_info()[1]
self.skipTest(str(e))
self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
dt = time.time() - self.t1
if test_support.verbose:
print("%.2fs" % dt)
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def _prime_executor(self):
# Make sure that the executor is ready to do work before running the
# tests. This should reduce the probability of timeouts in the tests.
futures = [self.executor.submit(time.sleep, 0.1)
for _ in range(self.worker_count)]
for f in futures:
f.result()
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
class ProcessPoolMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
class ExecutorShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
def test_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
from concurrent.futures import %s
from time import sleep
from test_futures import sleep_and_print
t = %s(5)
t.submit(sleep_and_print, 1.0, "apple")
""" % (self.executor_type.__name__, self.executor_type.__name__))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertFalse(err)
self.assertEqual(out.strip(), "apple".encode())
def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
for f in fs:
f.result()
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
pass
def test_threads_terminate(self):
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for t in executor._threads:
t.join()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
pass
def test_processes_terminate(self):
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
for p in processes:
p.join()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in processes:
p.join()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
del executor
queue_management_thread.join()
for p in processes:
p.join()
class WaitTests(unittest.TestCase):
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 3)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1.5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
fs = set(self.executor.submit(future_func) for i in range(100))
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setcheckinterval(oldswitchinterval)
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
pass
class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
completed = set(futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]))
self.assertEqual(set(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]),
completed)
def test_zero_timeout(self):
future1 = self.executor.submit(time.sleep, 2)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
future1 = self.executor.submit(time.sleep, 2)
completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1)
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
pass
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
pass
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(next(i), (0, 1))
self.assertEqual(next(i), (0, 1))
self.assertRaises(ZeroDivisionError, next, i)
def test_map_timeout(self):
results = []
try:
for i in self.executor.map(time.sleep,
[0, 0, 3],
timeout=1.5):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')
self.assertEqual([None, None], results)
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):
"""Tests verifying issue 11777."""
finished = []
def record_finished(n):
finished.append(n)
self.executor.map(record_finished, range(10))
self.executor.shutdown(wait=True)
self.assertEqual(len(finished), 10)
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
pass
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
callback_result = [None]
def fn(callback_future):
callback_result[0] = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
self.assertEqual(5, callback_result[0])
def test_done_callback_with_exception(self):
callback_exception = [None]
def fn(callback_future):
callback_exception[0] = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_with_cancel(self):
was_cancelled = [None]
def fn(callback_future):
was_cancelled[0] = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
self.assertTrue(was_cancelled[0])
def test_done_callback_raises(self):
with captured_stderr() as stderr:
raising_was_called = [False]
fn_was_called = [False]
def raising_fn(callback_future):
raising_was_called[0] = True
raise Exception('doh!')
def fn(callback_future):
fn_was_called[0] = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
self.assertTrue(raising_was_called)
self.assertTrue(fn_was_called)
self.assertIn('Exception: doh!', stderr.getvalue())
def test_done_callback_already_successful(self):
callback_result = [None]
def fn(callback_future):
callback_result[0] = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
self.assertEqual(5, callback_result[0])
def test_done_callback_already_failed(self):
callback_exception = [None]
def fn(callback_future):
callback_exception[0] = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_already_cancelled(self):
was_cancelled = [None]
def fn(callback_future):
was_cancelled[0] = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
self.assertTrue(was_cancelled[0])
def test_repr(self):
self.assertRegexpMatches(repr(PENDING_FUTURE),
'<Future at 0x[0-9a-f]+ state=pending>')
self.assertRegexpMatches(repr(RUNNING_FUTURE),
'<Future at 0x[0-9a-f]+ state=running>')
self.assertRegexpMatches(repr(CANCELLED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(
repr(EXCEPTION_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished raised IOError>')
self.assertRegexpMatches(
repr(SUCCESSFUL_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
f2 = create_future(state=RUNNING)
f3 = create_future(state=CANCELLED)
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
f5 = create_future(state=FINISHED, exception=IOError())
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
self.assertEqual(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
self.assertEqual(f2._state, RUNNING)
self.assertTrue(f3.cancel())
self.assertEqual(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
self.assertEqual(f5._state, FINISHED)
self.assertFalse(f6.cancel())
self.assertEqual(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
self.assertFalse(RUNNING_FUTURE.cancelled())
self.assertTrue(CANCELLED_FUTURE.cancelled())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
self.assertFalse(EXCEPTION_FUTURE.cancelled())
self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
def test_done(self):
self.assertFalse(PENDING_FUTURE.done())
self.assertFalse(RUNNING_FUTURE.done())
self.assertTrue(CANCELLED_FUTURE.done())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
self.assertTrue(EXCEPTION_FUTURE.done())
self.assertTrue(SUCCESSFUL_FUTURE.done())
def test_running(self):
self.assertFalse(PENDING_FUTURE.running())
self.assertTrue(RUNNING_FUTURE.running())
self.assertFalse(CANCELLED_FUTURE.running())
self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
self.assertFalse(EXCEPTION_FUTURE.running())
self.assertFalse(SUCCESSFUL_FUTURE.running())
def test_result_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.result, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertEqual(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertRaises(futures.CancelledError, f1.result, timeout=5)
def test_exception_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.exception, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
IOError))
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
def test_exception_with_success(self):
def notification():
# Wait until the main thread is waiting for the exception.
time.sleep(1)
with f1._condition:
f1._state = FINISHED
f1._exception = IOError()
f1._condition.notify_all()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
@reap_threads
def test_main():
try:
test_support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
finally:
test_support.reap_children()
if __name__ == "__main__":
test_main()