--- /dev/null
+import os
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+from uuid import SafeUUID
+
+c_uuid = import_helper.import_module("_uuid")
+
+NTHREADS = 10
+UUID_PER_THREAD = 1000
+
+
+@threading_helper.requires_working_threading()
+class UUIDTests(unittest.TestCase):
+ @unittest.skipUnless(os.name == "posix", "POSIX only")
+ def test_generate_time_safe(self):
+ uuids = []
+
+ def worker():
+ local_uuids = []
+ for _ in range(UUID_PER_THREAD):
+ uuid, is_safe = c_uuid.generate_time_safe()
+ self.assertIs(type(uuid), bytes)
+ self.assertEqual(len(uuid), 16)
+ # Collect the UUID only if it is safe. If not, we cannot ensure
+ # UUID uniqueness. According to uuid_generate_time_safe() man
+ # page, it is theoretically possible for two concurrently
+ # running processes to generate the same UUID(s) if the return
+ # value is not 0.
+ if is_safe == SafeUUID.safe:
+ local_uuids.append(uuid)
+
+ # Merge all safe uuids
+ uuids.extend(local_uuids)
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(len(uuids), len(set(uuids)))
+
+ @unittest.skipUnless(os.name == "nt", "Windows only")
+ def test_UuidCreate(self):
+ uuids = []
+
+ def worker():
+ local_uuids = []
+ for _ in range(UUID_PER_THREAD):
+ uuid = c_uuid.UuidCreate()
+ self.assertIs(type(uuid), bytes)
+ self.assertEqual(len(uuid), 16)
+ local_uuids.append(uuid)
+
+ # Merge all uuids
+ uuids.extend(local_uuids)
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(len(uuids), len(set(uuids)))
+
+
+if __name__ == "__main__":
+ unittest.main()