blob: 251020319b20f3a3edc3c723864ee3e462fc9b4a [file] [log] [blame]
import unittest
from threading import Thread, Barrier
from test.support import threading_helper
class TestSetRepr(unittest.TestCase):
def test_repr_clear(self):
"""Test repr() of a set while another thread is calling clear()"""
NUM_ITERS = 10
NUM_REPR_THREADS = 10
barrier = Barrier(NUM_REPR_THREADS + 1, timeout=2)
s = {1, 2, 3, 4, 5, 6, 7, 8}
def clear_set():
barrier.wait()
s.clear()
def repr_set():
barrier.wait()
set_reprs.append(repr(s))
for _ in range(NUM_ITERS):
set_reprs = []
threads = [Thread(target=clear_set)]
for _ in range(NUM_REPR_THREADS):
threads.append(Thread(target=repr_set))
for t in threads:
t.start()
for t in threads:
t.join()
for set_repr in set_reprs:
self.assertIn(set_repr, ("set()", "{1, 2, 3, 4, 5, 6, 7, 8}"))
class RaceTestBase:
def test_contains_mutate(self):
"""Test set contains operation combined with mutation."""
barrier = Barrier(2, timeout=2)
s = set()
done = False
NUM_LOOPS = 1000
def read_set():
barrier.wait()
while not done:
for i in range(self.SET_SIZE):
item = i >> 1
result = item in s
def mutate_set():
nonlocal done
barrier.wait()
for i in range(NUM_LOOPS):
s.clear()
for j in range(self.SET_SIZE):
s.add(j)
for j in range(self.SET_SIZE):
s.discard(j)
# executes the set_swap_bodies() function
s.__iand__(set(k for k in range(10, 20)))
done = True
threads = [Thread(target=read_set), Thread(target=mutate_set)]
for t in threads:
t.start()
for t in threads:
t.join()
def test_contains_frozenset(self):
barrier = Barrier(3, timeout=2)
done = False
NUM_LOOPS = 1_000
# This mutates the key used for contains test, not the container
# itself. This works because frozenset allows the key to be a set().
s = set()
def mutate_set():
barrier.wait()
while not done:
s.add(0)
s.add(1)
s.clear()
def read_set():
nonlocal done
barrier.wait()
container = frozenset([frozenset([0])])
self.assertTrue(set([0]) in container)
for _ in range(NUM_LOOPS):
# Will return True when {0} is the key and False otherwise
result = s in container
done = True
threads = [
Thread(target=read_set),
Thread(target=read_set),
Thread(target=mutate_set),
]
for t in threads:
t.start()
for t in threads:
t.join()
def test_contains_hash_mutate(self):
"""Test set contains operation with mutating hash method."""
barrier = Barrier(2, timeout=2)
NUM_LOOPS = 1_000
SET_SIZE = self.SET_SIZE
s = set(range(SET_SIZE))
class Key:
def __init__(self):
self.count = 0
self.value = 0
def __hash__(self):
self.count += 1
# This intends to trigger the SET_LOOKKEY_CHANGED case
# of set_lookkey_threadsafe() since calling clear()
# will cause the 'table' pointer to change.
if self.count % 2 == 0:
s.clear()
else:
s.update(range(SET_SIZE))
return hash(self.value)
def __eq__(self, other):
return self.value == other
key = Key()
self.assertTrue(key in s)
self.assertFalse(key in s)
self.assertTrue(key in s)
self.assertFalse(key in s)
def read_set():
barrier.wait()
for i in range(NUM_LOOPS):
result = key in s
threads = [Thread(target=read_set), Thread(target=read_set)]
for t in threads:
t.start()
for t in threads:
t.join()
@threading_helper.requires_working_threading()
class SmallSetTest(RaceTestBase, unittest.TestCase):
SET_SIZE = 6 # smaller than PySet_MINSIZE
@threading_helper.requires_working_threading()
class LargeSetTest(RaceTestBase, unittest.TestCase):
SET_SIZE = 20 # larger than PySet_MINSIZE
if __name__ == "__main__":
unittest.main()