| import random |
| import unittest |
| |
| from functools import lru_cache |
| from threading import Barrier, Thread |
| |
| from test.support import threading_helper |
| |
| @threading_helper.requires_working_threading() |
| class TestLRUCache(unittest.TestCase): |
| |
| def _test_concurrent_operations(self, maxsize): |
| num_threads = 10 |
| b = Barrier(num_threads) |
| @lru_cache(maxsize=maxsize) |
| def func(arg=0): |
| return object() |
| |
| |
| def thread_func(): |
| b.wait() |
| for i in range(1000): |
| r = random.randint(0, 1000) |
| if i < 800: |
| func(i) |
| elif i < 900: |
| func.cache_info() |
| else: |
| func.cache_clear() |
| |
| threads = [] |
| for i in range(num_threads): |
| t = Thread(target=thread_func) |
| threads.append(t) |
| |
| with threading_helper.start_threads(threads): |
| pass |
| |
| def test_concurrent_operations_unbounded(self): |
| self._test_concurrent_operations(maxsize=None) |
| |
| def test_concurrent_operations_bounded(self): |
| self._test_concurrent_operations(maxsize=128) |
| |
| def _test_reentrant_cache_clear(self, maxsize): |
| num_threads = 10 |
| b = Barrier(num_threads) |
| @lru_cache(maxsize=maxsize) |
| def func(arg=0): |
| func.cache_clear() |
| return object() |
| |
| |
| def thread_func(): |
| b.wait() |
| for i in range(1000): |
| func(random.randint(0, 10000)) |
| |
| threads = [] |
| for i in range(num_threads): |
| t = Thread(target=thread_func) |
| threads.append(t) |
| |
| with threading_helper.start_threads(threads): |
| pass |
| |
| def test_reentrant_cache_clear_unbounded(self): |
| self._test_reentrant_cache_clear(maxsize=None) |
| |
| def test_reentrant_cache_clear_bounded(self): |
| self._test_reentrant_cache_clear(maxsize=128) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |