"""Unit tests for the memoryview

   Some tests are in test_bytes. Many tests that require _testbuffer.ndarray
   are in test_buffer.
"""

import unittest
import test.support
import sys
import gc
import weakref
import array
import io
import copy
import pickle
import struct

from itertools import product
from test import support
from test.support import import_helper, threading_helper


class MyObject:
    pass


class AbstractMemoryTests:
    source_bytes = b"abcdef"

    @property
    def _source(self):
        return self.source_bytes

    @property
    def _types(self):
        return filter(None, [self.ro_type, self.rw_type])

    def check_getitem_with_type(self, tp):
        b = tp(self._source)
        oldrefcount = sys.getrefcount(b)
        m = self._view(b)
        self.assertEqual(m[0], ord(b"a"))
        self.assertIsInstance(m[0], int)
        self.assertEqual(m[5], ord(b"f"))
        self.assertEqual(m[-1], ord(b"f"))
        self.assertEqual(m[-6], ord(b"a"))
        # Bounds checking
        self.assertRaises(IndexError, lambda: m[6])
        self.assertRaises(IndexError, lambda: m[-7])
        self.assertRaises(IndexError, lambda: m[sys.maxsize])
        self.assertRaises(IndexError, lambda: m[-sys.maxsize])
        # Type checking
        self.assertRaises(TypeError, lambda: m[None])
        self.assertRaises(TypeError, lambda: m[0.0])
        self.assertRaises(TypeError, lambda: m["a"])
        m = None
        self.assertEqual(sys.getrefcount(b), oldrefcount)

    def test_getitem(self):
        for tp in self._types:
            self.check_getitem_with_type(tp)

    def test_index(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)  # may be a sub-view
            l = m.tolist()
            k = 2 * len(self._source)

            for chi in self._source:
                if chi in l:
                    self.assertEqual(m.index(chi), l.index(chi))
                else:
                    self.assertRaises(ValueError, m.index, chi)

                for start, stop in product(range(-k, k), range(-k, k)):
                    index = -1
                    try:
                        index = l.index(chi, start, stop)
                    except ValueError:
                        pass

                    if index == -1:
                        self.assertRaises(ValueError, m.index, chi, start, stop)
                    else:
                        self.assertEqual(m.index(chi, start, stop), index)

    def test_iter(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            self.assertEqual(list(m), [m[i] for i in range(len(m))])

    def test_count(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            l = m.tolist()
            for ch in list(m):
                self.assertEqual(m.count(ch), l.count(ch))

            b = tp((b'a' * 5) + (b'c' * 3))
            m = self._view(b)  # may be sliced
            l = m.tolist()
            with self.subTest('count', buffer=b):
                self.assertEqual(m.count(ord('a')), l.count(ord('a')))
                self.assertEqual(m.count(ord('b')), l.count(ord('b')))
                self.assertEqual(m.count(ord('c')), l.count(ord('c')))

    def test_setitem_readonly(self):
        if not self.ro_type:
            self.skipTest("no read-only type to test")
        b = self.ro_type(self._source)
        oldrefcount = sys.getrefcount(b)
        m = self._view(b)
        def setitem(value):
            m[0] = value
        self.assertRaises(TypeError, setitem, b"a")
        self.assertRaises(TypeError, setitem, 65)
        self.assertRaises(TypeError, setitem, memoryview(b"a"))
        m = None
        self.assertEqual(sys.getrefcount(b), oldrefcount)

    def test_setitem_writable(self):
        if not self.rw_type:
            self.skipTest("no writable type to test")
        tp = self.rw_type
        b = self.rw_type(self._source)
        oldrefcount = sys.getrefcount(b)
        m = self._view(b)
        m[0] = ord(b'1')
        self._check_contents(tp, b, b"1bcdef")
        m[0:1] = tp(b"0")
        self._check_contents(tp, b, b"0bcdef")
        m[1:3] = tp(b"12")
        self._check_contents(tp, b, b"012def")
        m[1:1] = tp(b"")
        self._check_contents(tp, b, b"012def")
        m[:] = tp(b"abcdef")
        self._check_contents(tp, b, b"abcdef")

        # Overlapping copies of a view into itself
        m[0:3] = m[2:5]
        self._check_contents(tp, b, b"cdedef")
        m[:] = tp(b"abcdef")
        m[2:5] = m[0:3]
        self._check_contents(tp, b, b"ababcf")

        def setitem(key, value):
            m[key] = tp(value)
        # Bounds checking
        self.assertRaises(IndexError, setitem, 6, b"a")
        self.assertRaises(IndexError, setitem, -7, b"a")
        self.assertRaises(IndexError, setitem, sys.maxsize, b"a")
        self.assertRaises(IndexError, setitem, -sys.maxsize, b"a")
        # Wrong index/slice types
        self.assertRaises(TypeError, setitem, 0.0, b"a")
        self.assertRaises(TypeError, setitem, (0,), b"a")
        self.assertRaises(TypeError, setitem, (slice(0,1,1), 0), b"a")
        self.assertRaises(TypeError, setitem, (0, slice(0,1,1)), b"a")
        self.assertRaises(TypeError, setitem, (0,), b"a")
        self.assertRaises(TypeError, setitem, "a", b"a")
        # Not implemented: multidimensional slices
        slices = (slice(0,1,1), slice(0,1,2))
        self.assertRaises(NotImplementedError, setitem, slices, b"a")
        # Trying to resize the memory object
        exc = ValueError if m.format == 'c' else TypeError
        self.assertRaises(exc, setitem, 0, b"")
        self.assertRaises(exc, setitem, 0, b"ab")
        self.assertRaises(ValueError, setitem, slice(1,1), b"a")
        self.assertRaises(ValueError, setitem, slice(0,2), b"a")

        m = None
        self.assertEqual(sys.getrefcount(b), oldrefcount)

    def test_delitem(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            with self.assertRaises(TypeError):
                del m[1]
            with self.assertRaises(TypeError):
                del m[1:4]

    def test_tobytes(self):
        for tp in self._types:
            m = self._view(tp(self._source))
            b = m.tobytes()
            # This calls self.getitem_type() on each separate byte of b"abcdef"
            expected = b"".join(
                self.getitem_type(bytes([c])) for c in b"abcdef")
            self.assertEqual(b, expected)
            self.assertIsInstance(b, bytes)

    def test_tolist(self):
        for tp in self._types:
            m = self._view(tp(self._source))
            l = m.tolist()
            self.assertEqual(l, list(b"abcdef"))

    def test_compare(self):
        # memoryviews can compare for equality with other objects
        # having the buffer interface.
        for tp in self._types:
            m = self._view(tp(self._source))
            for tp_comp in self._types:
                self.assertTrue(m == tp_comp(b"abcdef"))
                self.assertFalse(m != tp_comp(b"abcdef"))
                self.assertFalse(m == tp_comp(b"abcde"))
                self.assertTrue(m != tp_comp(b"abcde"))
                self.assertFalse(m == tp_comp(b"abcde1"))
                self.assertTrue(m != tp_comp(b"abcde1"))
            self.assertTrue(m == m)
            self.assertTrue(m == m[:])
            self.assertTrue(m[0:6] == m[:])
            self.assertFalse(m[0:5] == m)

            # Comparison with objects which don't support the buffer API
            self.assertFalse(m == "abcdef")
            self.assertTrue(m != "abcdef")
            self.assertFalse("abcdef" == m)
            self.assertTrue("abcdef" != m)

            # Unordered comparisons
            for c in (m, b"abcdef"):
                self.assertRaises(TypeError, lambda: m < c)
                self.assertRaises(TypeError, lambda: c <= m)
                self.assertRaises(TypeError, lambda: m >= c)
                self.assertRaises(TypeError, lambda: c > m)

    def check_attributes_with_type(self, tp):
        m = self._view(tp(self._source))
        self.assertEqual(m.format, self.format)
        self.assertEqual(m.itemsize, self.itemsize)
        self.assertEqual(m.ndim, 1)
        self.assertEqual(m.shape, (6,))
        self.assertEqual(len(m), 6)
        self.assertEqual(m.strides, (self.itemsize,))
        self.assertEqual(m.suboffsets, ())
        return m

    def test_attributes_readonly(self):
        if not self.ro_type:
            self.skipTest("no read-only type to test")
        m = self.check_attributes_with_type(self.ro_type)
        self.assertEqual(m.readonly, True)

    def test_attributes_writable(self):
        if not self.rw_type:
            self.skipTest("no writable type to test")
        m = self.check_attributes_with_type(self.rw_type)
        self.assertEqual(m.readonly, False)

    def test_getbuffer(self):
        # Test PyObject_GetBuffer() on a memoryview object.
        for tp in self._types:
            b = tp(self._source)
            oldrefcount = sys.getrefcount(b)
            m = self._view(b)
            oldviewrefcount = sys.getrefcount(m)
            s = str(m, "utf-8")
            self._check_contents(tp, b, s.encode("utf-8"))
            self.assertEqual(sys.getrefcount(m), oldviewrefcount)
            m = None
            self.assertEqual(sys.getrefcount(b), oldrefcount)

    def test_gc(self):
        for tp in self._types:
            if not isinstance(tp, type):
                # If tp is a factory rather than a plain type, skip
                continue

            class MyView():
                def __init__(self, base):
                    self.m = memoryview(base)
            class MySource(tp):
                pass

            # Create a reference cycle through a memoryview object.
            # This exercises mbuf_clear().
            b = MySource(tp(b'abc'))
            m = self._view(b)
            o = MyObject()
            b.m = m
            b.o = o
            wr = weakref.ref(o)
            b = m = o = None
            # The cycle must be broken
            gc.collect()
            self.assertTrue(wr() is None, wr())

            # This exercises memory_clear().
            m = MyView(tp(b'abc'))
            o = MyObject()
            m.x = m
            m.o = o
            wr = weakref.ref(o)
            m = o = None
            # The cycle must be broken
            gc.collect()
            self.assertTrue(wr() is None, wr())

    def _check_released(self, m, tp):
        check = self.assertRaisesRegex(ValueError, "released")
        with check: bytes(m)
        with check: m.tobytes()
        with check: m.tolist()
        with check: m[0]
        with check: m[0] = b'x'
        with check: len(m)
        with check: m.format
        with check: m.itemsize
        with check: m.ndim
        with check: m.readonly
        with check: m.shape
        with check: m.strides
        with check:
            with m:
                pass
        # str() and repr() still function
        self.assertIn("released memory", str(m))
        self.assertIn("released memory", repr(m))
        self.assertEqual(m, m)
        self.assertNotEqual(m, memoryview(tp(self._source)))
        self.assertNotEqual(m, tp(self._source))

    def test_contextmanager(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            with m as cm:
                self.assertIs(cm, m)
            self._check_released(m, tp)
            m = self._view(b)
            # Can release explicitly inside the context manager
            with m:
                m.release()

    def test_release(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            m.release()
            self._check_released(m, tp)
            # Can be called a second time (it's a no-op)
            m.release()
            self._check_released(m, tp)

    def test_writable_readonly(self):
        # Issue #10451: memoryview incorrectly exposes a readonly
        # buffer as writable causing a segfault if using mmap
        tp = self.ro_type
        if tp is None:
            self.skipTest("no read-only type to test")
        b = tp(self._source)
        m = self._view(b)
        i = io.BytesIO(b'ZZZZ')
        self.assertRaises(TypeError, i.readinto, m)

    def test_getbuf_fail(self):
        self.assertRaises(TypeError, self._view, {})

    def test_hash(self):
        # Memoryviews of readonly (hashable) types are hashable, and they
        # hash as hash(obj.tobytes()).
        tp = self.ro_type
        if tp is None:
            self.skipTest("no read-only type to test")
        b = tp(self._source)
        m = self._view(b)
        self.assertEqual(hash(m), hash(b"abcdef"))
        # Releasing the memoryview keeps the stored hash value (as with weakrefs)
        m.release()
        self.assertEqual(hash(m), hash(b"abcdef"))
        # Hashing a memoryview for the first time after it is released
        # results in an error (as with weakrefs).
        m = self._view(b)
        m.release()
        self.assertRaises(ValueError, hash, m)

    def test_hash_writable(self):
        # Memoryviews of writable types are unhashable
        tp = self.rw_type
        if tp is None:
            self.skipTest("no writable type to test")
        b = tp(self._source)
        m = self._view(b)
        self.assertRaises(ValueError, hash, m)

    def test_weakref(self):
        # Check memoryviews are weakrefable
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            L = []
            def callback(wr, b=b):
                L.append(b)
            wr = weakref.ref(m, callback)
            self.assertIs(wr(), m)
            del m
            test.support.gc_collect()
            self.assertIs(wr(), None)
            self.assertIs(L[0], b)

    def test_reversed(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            aslist = list(reversed(m.tolist()))
            self.assertEqual(list(reversed(m)), aslist)
            self.assertEqual(list(reversed(m)), list(m[::-1]))

    def test_toreadonly(self):
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            mm = m.toreadonly()
            self.assertTrue(mm.readonly)
            self.assertTrue(memoryview(mm).readonly)
            self.assertEqual(mm.tolist(), m.tolist())
            mm.release()
            m.tolist()

    def test_issue22668(self):
        a = array.array('H', [256, 256, 256, 256])
        x = memoryview(a)
        m = x.cast('B')
        b = m.cast('H')
        c = b[0:2]
        d = memoryview(b)

        del b

        self.assertEqual(c[0], 256)
        self.assertEqual(d[0], 256)
        self.assertEqual(c.format, "H")
        self.assertEqual(d.format, "H")

        _ = m.cast('I')
        self.assertEqual(c[0], 256)
        self.assertEqual(d[0], 256)
        self.assertEqual(c.format, "H")
        self.assertEqual(d.format, "H")


# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.
# NOTE: support for multi-dimensional objects is unimplemented.

class BaseBytesMemoryTests(AbstractMemoryTests):
    ro_type = bytes
    rw_type = bytearray
    getitem_type = bytes
    itemsize = 1
    format = 'B'

class BaseArrayMemoryTests(AbstractMemoryTests):
    ro_type = None
    rw_type = lambda self, b: array.array('i', list(b))
    getitem_type = lambda self, b: array.array('i', list(b)).tobytes()
    itemsize = array.array('i').itemsize
    format = 'i'

    @unittest.skip('XXX test should be adapted for non-byte buffers')
    def test_getbuffer(self):
        pass

    @unittest.skip('XXX NotImplementedError: tolist() only supports byte views')
    def test_tolist(self):
        pass


# Variations on indirection levels: memoryview, slice of memoryview,
# slice of slice of memoryview.
# This is important to test allocation subtleties.

class BaseMemoryviewTests:
    def _view(self, obj):
        return memoryview(obj)

    def _check_contents(self, tp, obj, contents):
        self.assertEqual(obj, tp(contents))

    def test_count(self):
        super().test_count()
        for tp in self._types:
            b = tp((b'a' * 5) + (b'c' * 3))
            m = self._view(b)  # should not be sliced
            self.assertEqual(len(b), len(m))
            with self.subTest('count', buffer=b):
                self.assertEqual(m.count(ord('a')), 5)
                self.assertEqual(m.count(ord('b')), 0)
                self.assertEqual(m.count(ord('c')), 3)


class BaseMemorySliceTests:
    source_bytes = b"XabcdefY"

    def _view(self, obj):
        m = memoryview(obj)
        return m[1:7]

    def _check_contents(self, tp, obj, contents):
        self.assertEqual(obj[1:7], tp(contents))

    def test_refs(self):
        for tp in self._types:
            m = memoryview(tp(self._source))
            oldrefcount = sys.getrefcount(m)
            m[1:2]
            self.assertEqual(sys.getrefcount(m), oldrefcount)

class BaseMemorySliceSliceTests:
    source_bytes = b"XabcdefY"

    def _view(self, obj):
        m = memoryview(obj)
        return m[:7][1:]

    def _check_contents(self, tp, obj, contents):
        self.assertEqual(obj[1:7], tp(contents))


# Concrete test classes

class BytesMemoryviewTest(unittest.TestCase,
    BaseMemoryviewTests, BaseBytesMemoryTests):

    def test_constructor(self):
        for tp in self._types:
            ob = tp(self._source)
            self.assertTrue(memoryview(ob))
            self.assertTrue(memoryview(object=ob))
            self.assertRaises(TypeError, memoryview)
            self.assertRaises(TypeError, memoryview, ob, ob)
            self.assertRaises(TypeError, memoryview, argument=ob)
            self.assertRaises(TypeError, memoryview, ob, argument=True)

class ArrayMemoryviewTest(unittest.TestCase,
    BaseMemoryviewTests, BaseArrayMemoryTests):

    def test_array_assign(self):
        # Issue #4569: segfault when mutating a memoryview with itemsize != 1
        a = array.array('i', range(10))
        m = memoryview(a)
        new_a = array.array('i', range(9, -1, -1))
        m[:] = new_a
        self.assertEqual(a, new_a)


class BytesMemorySliceTest(unittest.TestCase,
    BaseMemorySliceTests, BaseBytesMemoryTests):
    pass

class ArrayMemorySliceTest(unittest.TestCase,
    BaseMemorySliceTests, BaseArrayMemoryTests):
    pass

class BytesMemorySliceSliceTest(unittest.TestCase,
    BaseMemorySliceSliceTests, BaseBytesMemoryTests):
    pass

class ArrayMemorySliceSliceTest(unittest.TestCase,
    BaseMemorySliceSliceTests, BaseArrayMemoryTests):
    pass


class OtherTest(unittest.TestCase):
    def test_ctypes_cast(self):
        # Issue 15944: Allow all source formats when casting to bytes.
        ctypes = import_helper.import_module("ctypes")
        p6 = bytes(ctypes.c_double(0.6))

        d = ctypes.c_double()
        m = memoryview(d).cast("B")
        m[:2] = p6[:2]
        m[2:] = p6[2:]
        self.assertEqual(d.value, 0.6)

        for format in "Bbc":
            with self.subTest(format):
                d = ctypes.c_double()
                m = memoryview(d).cast(format)
                m[:2] = memoryview(p6).cast(format)[:2]
                m[2:] = memoryview(p6).cast(format)[2:]
                self.assertEqual(d.value, 0.6)

    def test_half_float(self):
        half_data = struct.pack('eee', 0.0, -1.5, 1.5)
        float_data = struct.pack('fff', 0.0, -1.5, 1.5)
        half_view = memoryview(half_data).cast('e')
        float_view = memoryview(float_data).cast('f')
        self.assertEqual(half_view.nbytes * 2, float_view.nbytes)
        self.assertListEqual(half_view.tolist(), float_view.tolist())

    def test_memoryview_hex(self):
        # Issue #9951: memoryview.hex() segfaults with non-contiguous buffers.
        x = b'0' * 200000
        m1 = memoryview(x)
        m2 = m1[::-1]
        self.assertEqual(m2.hex(), '30' * 200000)

    def test_memoryview_hex_separator(self):
        x = bytes(range(97, 102))
        m1 = memoryview(x)
        m2 = m1[::-1]
        self.assertEqual(m2.hex(':'), '65:64:63:62:61')
        self.assertEqual(m2.hex(':', 2), '65:6463:6261')
        self.assertEqual(m2.hex(':', -2), '6564:6362:61')
        self.assertEqual(m2.hex(sep=':', bytes_per_sep=2), '65:6463:6261')
        self.assertEqual(m2.hex(sep=':', bytes_per_sep=-2), '6564:6362:61')
        for bytes_per_sep in 5, -5, 2**31-1, -(2**31-1):
            with self.subTest(bytes_per_sep=bytes_per_sep):
                self.assertEqual(m2.hex(':', bytes_per_sep), '6564636261')
        for bytes_per_sep in 2**31, -2**31, 2**1000, -2**1000:
            with self.subTest(bytes_per_sep=bytes_per_sep):
                try:
                    self.assertEqual(m2.hex(':', bytes_per_sep), '6564636261')
                except OverflowError:
                    pass

    def test_copy(self):
        m = memoryview(b'abc')
        with self.assertRaises(TypeError):
            copy.copy(m)

    def test_pickle(self):
        m = memoryview(b'abc')
        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
            with self.assertRaises(TypeError):
                pickle.dumps(m, proto)

    def test_use_released_memory(self):
        # gh-92888: Previously it was possible to use a memoryview even after
        # backing buffer is freed in certain cases. This tests that those
        # cases raise an exception.
        size = 128
        def release():
            m.release()
            nonlocal ba
            ba = bytearray(size)
        class MyIndex:
            def __index__(self):
                release()
                return 4
        class MyFloat:
            def __float__(self):
                release()
                return 4.25
        class MyBool:
            def __bool__(self):
                release()
                return True

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        with self.assertRaises(ValueError):
            m[MyIndex()]

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        self.assertEqual(list(m[:MyIndex()]), [255] * 4)

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        self.assertEqual(list(m[MyIndex():8]), [255] * 4)

        ba = None
        m = memoryview(bytearray(b'\xff'*size)).cast('B', (64, 2))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[MyIndex(), 0]

        ba = None
        m = memoryview(bytearray(b'\xff'*size)).cast('B', (2, 64))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[0, MyIndex()]

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[MyIndex()] = 42
        self.assertEqual(ba[:8], b'\0'*8)

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[:MyIndex()] = b'spam'
        self.assertEqual(ba[:8], b'\0'*8)

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[MyIndex():8] = b'spam'
        self.assertEqual(ba[:8], b'\0'*8)

        ba = None
        m = memoryview(bytearray(b'\xff'*size)).cast('B', (64, 2))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[MyIndex(), 0] = 42
        self.assertEqual(ba[8:16], b'\0'*8)
        ba = None
        m = memoryview(bytearray(b'\xff'*size)).cast('B', (2, 64))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[0, MyIndex()] = 42
        self.assertEqual(ba[:8], b'\0'*8)

        ba = None
        m = memoryview(bytearray(b'\xff'*size))
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[0] = MyIndex()
        self.assertEqual(ba[:8], b'\0'*8)

        for fmt in 'bhilqnBHILQN':
            with self.subTest(fmt=fmt):
                ba = None
                m = memoryview(bytearray(b'\xff'*size)).cast(fmt)
                with self.assertRaisesRegex(ValueError, "operation forbidden"):
                    m[0] = MyIndex()
                self.assertEqual(ba[:8], b'\0'*8)

        for fmt in 'fd':
            with self.subTest(fmt=fmt):
                ba = None
                m = memoryview(bytearray(b'\xff'*size)).cast(fmt)
                with self.assertRaisesRegex(ValueError, "operation forbidden"):
                    m[0] = MyFloat()
                self.assertEqual(ba[:8], b'\0'*8)

        ba = None
        m = memoryview(bytearray(b'\xff'*size)).cast('?')
        with self.assertRaisesRegex(ValueError, "operation forbidden"):
            m[0] = MyBool()
        self.assertEqual(ba[:8], b'\0'*8)

    def test_buffer_reference_loop(self):
        m = memoryview(b'abc').__buffer__(0)
        o = MyObject()
        o.m = m
        o.o = o
        wr = weakref.ref(o)
        del m, o
        gc.collect()
        self.assertIsNone(wr())

    def test_picklebuffer_reference_loop(self):
        pb = pickle.PickleBuffer(memoryview(b'abc'))
        o = MyObject()
        o.pb = pb
        o.o = o
        wr = weakref.ref(o)
        del pb, o
        gc.collect()
        self.assertIsNone(wr())


@threading_helper.requires_working_threading()
@support.requires_resource("cpu")
class RacingTest(unittest.TestCase):
    def test_racing_getbuf_and_releasebuf(self):
        """Repeatly access the memoryview for racing."""
        try:
            from multiprocessing.managers import SharedMemoryManager
        except ImportError:
            self.skipTest("Test requires multiprocessing")
        from threading import Thread, Event

        start = Event()
        with SharedMemoryManager() as smm:
            obj = smm.ShareableList(range(100))
            def test():
                # Issue gh-127085, the `ShareableList.count` is just a
                # convenient way to mess the `exports` counter of `memoryview`,
                # this issue has no direct relation with `ShareableList`.
                start.wait(support.SHORT_TIMEOUT)
                for i in range(10):
                    obj.count(1)
            threads = [Thread(target=test) for _ in range(10)]
            with threading_helper.start_threads(threads):
                start.set()

            del obj


if __name__ == "__main__":
    unittest.main()
