]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Add tests for Bluetooth RFCOMM, HCI and SCO (GH-132023)
authorSerhiy Storchaka <storchaka@gmail.com>
Fri, 4 Apr 2025 11:33:09 +0000 (14:33 +0300)
committerGitHub <noreply@github.com>
Fri, 4 Apr 2025 11:33:09 +0000 (14:33 +0300)
Lib/test/test_socket.py

index cd3497f0274cd6f30b774e7c794c6bf9f4fd83e5..ab17d19484d358028694e46708a56d4bac51d848 100644 (file)
@@ -1,4 +1,5 @@
 import unittest
+from unittest import mock
 from test import support
 from test.support import (
     is_apple, os_helper, refleak_helper, socket_helper, threading_helper
@@ -2670,6 +2671,107 @@ class BasicBluetoothTest(unittest.TestCase):
             addr = f.getsockname()
             self.assertEqual(addr, (socket.BDADDR_ANY, psm))
 
+    @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test')
+    def testBadL2capAddr(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f:
+            with self.assertRaises(OSError):
+                f.bind((socket.BDADDR_ANY, 0, 0, socket.BDADDR_BREDR, 0))
+            with self.assertRaises(OSError):
+                f.bind((socket.BDADDR_ANY,))
+            with self.assertRaises(OSError):
+                f.bind(socket.BDADDR_ANY)
+            with self.assertRaises(OSError):
+                f.bind((socket.BDADDR_ANY.encode(), 0x1001))
+
+    def testBindRfcommSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+            channel = 0
+            try:
+                s.bind((socket.BDADDR_ANY, channel))
+            except OSError as err:
+                if sys.platform == 'win32' and err.winerror == 10050:
+                    self.skipTest(str(err))
+                raise
+            addr = s.getsockname()
+            self.assertEqual(addr, (mock.ANY, channel))
+            self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}')
+            if sys.platform != 'win32':
+                self.assertEqual(addr, (socket.BDADDR_ANY, channel))
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+            s.bind(addr)
+            addr2 = s.getsockname()
+            self.assertEqual(addr2, addr)
+
+    def testBadRfcommAddr(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+            channel = 0
+            with self.assertRaises(OSError):
+                s.bind((socket.BDADDR_ANY.encode(), channel))
+            with self.assertRaises(OSError):
+                s.bind((socket.BDADDR_ANY,))
+            with self.assertRaises(OSError):
+                s.bind((socket.BDADDR_ANY, channel, 0))
+            with self.assertRaises(OSError):
+                s.bind((socket.BDADDR_ANY + '\0', channel))
+            with self.assertRaises(OSError):
+                s.bind(('invalid', channel))
+
+    @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
+    def testBindHciSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
+            if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
+                s.bind(socket.BDADDR_ANY.encode())
+                addr = s.getsockname()
+                self.assertEqual(addr, socket.BDADDR_ANY)
+            else:
+                dev = 0
+                s.bind((dev,))
+                addr = s.getsockname()
+                self.assertEqual(addr, dev)
+
+    @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
+    def testBadHciAddr(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
+            if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
+                with self.assertRaises(OSError):
+                    s.bind(socket.BDADDR_ANY)
+                with self.assertRaises(OSError):
+                    s.bind((socket.BDADDR_ANY.encode(),))
+                if sys.platform.startswith('freebsd'):
+                    with self.assertRaises(ValueError):
+                        s.bind(socket.BDADDR_ANY.encode() + b'\0')
+                    with self.assertRaises(ValueError):
+                        s.bind(socket.BDADDR_ANY.encode() + b' '*100)
+                with self.assertRaises(OSError):
+                    s.bind(b'invalid')
+            else:
+                dev = 0
+                with self.assertRaises(OSError):
+                    s.bind(())
+                with self.assertRaises(OSError):
+                    s.bind((dev, 0))
+                with self.assertRaises(OSError):
+                    s.bind(dev)
+                with self.assertRaises(OSError):
+                    s.bind(socket.BDADDR_ANY.encode())
+
+    @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
+    def testBindScoSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
+            s.bind(socket.BDADDR_ANY.encode())
+            addr = s.getsockname()
+            self.assertEqual(addr, socket.BDADDR_ANY)
+
+    @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
+    def testBadScoAddr(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
+            with self.assertRaises(OSError):
+                s.bind(socket.BDADDR_ANY)
+            with self.assertRaises(OSError):
+                s.bind((socket.BDADDR_ANY.encode(),))
+            with self.assertRaises(OSError):
+                s.bind(b'invalid')
+
 
 @unittest.skipUnless(HAVE_SOCKET_HYPERV,
                      'Hyper-V sockets required for this test.')