| import re |
| import unittest |
| |
| from test.support import threading_helper |
| from test.support.threading_helper import run_concurrently |
| |
| |
| NTHREADS = 10 |
| |
| |
| @threading_helper.requires_working_threading() |
| class TestRe(unittest.TestCase): |
| def test_pattern_sub(self): |
| """Pattern substitution should work across threads""" |
| pattern = re.compile(r"\w+@\w+\.\w+") |
| text = "e-mail: [email protected] or [email protected]. " * 5 |
| results = [] |
| |
| def worker(): |
| substituted = pattern.sub("(redacted)", text) |
| results.append(substituted.count("(redacted)")) |
| |
| run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| self.assertEqual(results, [2 * 5] * NTHREADS) |
| |
| def test_pattern_search(self): |
| """Pattern search should work across threads.""" |
| emails = ["[email protected]", "[email protected]"] * 10 |
| pattern = re.compile(r"\w+@\w+\.\w+") |
| results = [] |
| |
| def worker(): |
| matches = [pattern.search(e).group() for e in emails] |
| results.append(len(matches)) |
| |
| run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| self.assertEqual(results, [2 * 10] * NTHREADS) |
| |
| def test_scanner_concurrent_access(self): |
| """Shared scanner should reject concurrent access.""" |
| pattern = re.compile(r"\w+") |
| scanner = pattern.scanner("word " * 10) |
| |
| def worker(): |
| for _ in range(100): |
| try: |
| scanner.search() |
| except ValueError as e: |
| if "already executing" in str(e): |
| pass |
| else: |
| raise |
| |
| run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| # This test has no assertions. Its purpose is to catch crashes and |
| # enable thread sanitizer to detect race conditions. While "already |
| # executing" errors are very likely, they're not guaranteed due to |
| # non-deterministic thread scheduling, so we can't assert errors > 0. |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |