| import asyncio |
| import unittest |
| from threading import Thread |
| from unittest import TestCase |
| |
| from test.support import threading_helper |
| |
| threading_helper.requires_working_threading(module=True) |
| |
| def tearDownModule(): |
| asyncio._set_event_loop_policy(None) |
| |
| |
| class TestFreeThreading: |
| def test_all_tasks_race(self) -> None: |
| async def main(): |
| loop = asyncio.get_running_loop() |
| future = loop.create_future() |
| |
| async def coro(): |
| await future |
| |
| tasks = set() |
| |
| async with asyncio.TaskGroup() as tg: |
| for _ in range(100): |
| tasks.add(tg.create_task(coro())) |
| |
| all_tasks = self.all_tasks(loop) |
| self.assertEqual(len(all_tasks), 101) |
| |
| for task in all_tasks: |
| self.assertEqual(task.get_loop(), loop) |
| self.assertFalse(task.done()) |
| |
| current = self.current_task() |
| self.assertEqual(current.get_loop(), loop) |
| self.assertSetEqual(all_tasks, tasks | {current}) |
| future.set_result(None) |
| |
| def runner(): |
| with asyncio.Runner() as runner: |
| loop = runner.get_loop() |
| loop.set_task_factory(self.factory) |
| runner.run(main()) |
| |
| threads = [] |
| |
| for _ in range(10): |
| thread = Thread(target=runner) |
| threads.append(thread) |
| |
| with threading_helper.start_threads(threads): |
| pass |
| |
| |
| class TestPyFreeThreading(TestFreeThreading, TestCase): |
| all_tasks = staticmethod(asyncio.tasks._py_all_tasks) |
| current_task = staticmethod(asyncio.tasks._py_current_task) |
| |
| def factory(self, loop, coro, context=None): |
| return asyncio.tasks._PyTask(coro, loop=loop, context=context) |
| |
| |
| @unittest.skipUnless(hasattr(asyncio.tasks, "_c_all_tasks"), "requires _asyncio") |
| class TestCFreeThreading(TestFreeThreading, TestCase): |
| all_tasks = staticmethod(getattr(asyncio.tasks, "_c_all_tasks", None)) |
| current_task = staticmethod(getattr(asyncio.tasks, "_c_current_task", None)) |
| |
| def factory(self, loop, coro, context=None): |
| return asyncio.tasks._CTask(coro, loop=loop, context=context) |
| |
| |
| class TestEagerPyFreeThreading(TestPyFreeThreading): |
| def factory(self, loop, coro, context=None): |
| return asyncio.tasks._PyTask(coro, loop=loop, context=context, eager_start=True) |
| |
| |
| @unittest.skipUnless(hasattr(asyncio.tasks, "_c_all_tasks"), "requires _asyncio") |
| class TestEagerCFreeThreading(TestCFreeThreading, TestCase): |
| def factory(self, loop, coro, context=None): |
| return asyncio.tasks._CTask(coro, loop=loop, context=context, eager_start=True) |