local_sms.buf[:len(binary_data)] = binary_data
local_sms.close()
+ def _new_shm_name(self, prefix):
+ # Add a PID to the name of a POSIX shared memory object to allow
+ # running multiprocessing tests (test_multiprocessing_fork,
+ # test_multiprocessing_spawn, etc) in parallel.
+ return prefix + str(os.getpid())
+
def test_shared_memory_basics(self):
- sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
+ name_tsmb = self._new_shm_name('test01_tsmb')
+ sms = shared_memory.SharedMemory(name_tsmb, create=True, size=512)
self.addCleanup(sms.unlink)
# Verify attributes are readable.
- self.assertEqual(sms.name, 'test01_tsmb')
+ self.assertEqual(sms.name, name_tsmb)
self.assertGreaterEqual(sms.size, 512)
self.assertGreaterEqual(len(sms.buf), sms.size)
self.assertEqual(sms.buf[0], 42)
# Attach to existing shared memory segment.
- also_sms = shared_memory.SharedMemory('test01_tsmb')
+ also_sms = shared_memory.SharedMemory(name_tsmb)
self.assertEqual(also_sms.buf[0], 42)
also_sms.close()
# Attach to existing shared memory segment but specify a new size.
- same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
+ same_sms = shared_memory.SharedMemory(name_tsmb, size=20*sms.size)
self.assertLess(same_sms.size, 20*sms.size) # Size was ignored.
same_sms.close()
# manages unlinking on its own and unlink() does nothing).
# True release of shared memory segment does not necessarily
# happen until process exits, depending on the OS platform.
+ name_dblunlink = self._new_shm_name('test01_dblunlink')
+ sms_uno = shared_memory.SharedMemory(
+ name_dblunlink,
+ create=True,
+ size=5000
+ )
with self.assertRaises(FileNotFoundError):
- sms_uno = shared_memory.SharedMemory(
- 'test01_dblunlink',
- create=True,
- size=5000
- )
-
try:
self.assertGreaterEqual(sms_uno.size, 5000)
- sms_duo = shared_memory.SharedMemory('test01_dblunlink')
+ sms_duo = shared_memory.SharedMemory(name_dblunlink)
sms_duo.unlink() # First shm_unlink() call.
sms_duo.close()
sms_uno.close()
# Attempting to create a new shared memory segment with a
# name that is already in use triggers an exception.
there_can_only_be_one_sms = shared_memory.SharedMemory(
- 'test01_tsmb',
+ name_tsmb,
create=True,
size=512
)
# case of MacOS/darwin, requesting a smaller size is disallowed.
class OptionalAttachSharedMemory(shared_memory.SharedMemory):
_flags = os.O_CREAT | os.O_RDWR
- ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
+ ok_if_exists_sms = OptionalAttachSharedMemory(name_tsmb)
self.assertEqual(ok_if_exists_sms.size, sms.size)
ok_if_exists_sms.close()
self.assertEqual(sl.count(b'adios'), 0)
# Exercise creating a duplicate.
- sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
+ name_duplicate = self._new_shm_name('test03_duplicate')
+ sl_copy = shared_memory.ShareableList(sl, name=name_duplicate)
try:
self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
- self.assertEqual('test03_duplicate', sl_copy.shm.name)
+ self.assertEqual(name_duplicate, sl_copy.shm.name)
self.assertEqual(list(sl), list(sl_copy))
self.assertEqual(sl.format, sl_copy.format)
sl_copy[-1] = 77