import unittest
+from unittest import mock
from test import support
from test.support import (
is_apple, os_helper, refleak_helper, socket_helper, threading_helper
addr = f.getsockname()
self.assertEqual(addr, (socket.BDADDR_ANY, psm))
+ @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test')
+ def testBadL2capAddr(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f:
+ with self.assertRaises(OSError):
+ f.bind((socket.BDADDR_ANY, 0, 0, socket.BDADDR_BREDR, 0))
+ with self.assertRaises(OSError):
+ f.bind((socket.BDADDR_ANY,))
+ with self.assertRaises(OSError):
+ f.bind(socket.BDADDR_ANY)
+ with self.assertRaises(OSError):
+ f.bind((socket.BDADDR_ANY.encode(), 0x1001))
+
+ def testBindRfcommSocket(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+ channel = 0
+ try:
+ s.bind((socket.BDADDR_ANY, channel))
+ except OSError as err:
+ if sys.platform == 'win32' and err.winerror == 10050:
+ self.skipTest(str(err))
+ raise
+ addr = s.getsockname()
+ self.assertEqual(addr, (mock.ANY, channel))
+ self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}')
+ if sys.platform != 'win32':
+ self.assertEqual(addr, (socket.BDADDR_ANY, channel))
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+ s.bind(addr)
+ addr2 = s.getsockname()
+ self.assertEqual(addr2, addr)
+
+ def testBadRfcommAddr(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
+ channel = 0
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY.encode(), channel))
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY,))
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY, channel, 0))
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY + '\0', channel))
+ with self.assertRaises(OSError):
+ s.bind(('invalid', channel))
+
+ @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
+ def testBindHciSocket(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
+ if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
+ s.bind(socket.BDADDR_ANY.encode())
+ addr = s.getsockname()
+ self.assertEqual(addr, socket.BDADDR_ANY)
+ else:
+ dev = 0
+ s.bind((dev,))
+ addr = s.getsockname()
+ self.assertEqual(addr, dev)
+
+ @unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
+ def testBadHciAddr(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
+ if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
+ with self.assertRaises(OSError):
+ s.bind(socket.BDADDR_ANY)
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY.encode(),))
+ if sys.platform.startswith('freebsd'):
+ with self.assertRaises(ValueError):
+ s.bind(socket.BDADDR_ANY.encode() + b'\0')
+ with self.assertRaises(ValueError):
+ s.bind(socket.BDADDR_ANY.encode() + b' '*100)
+ with self.assertRaises(OSError):
+ s.bind(b'invalid')
+ else:
+ dev = 0
+ with self.assertRaises(OSError):
+ s.bind(())
+ with self.assertRaises(OSError):
+ s.bind((dev, 0))
+ with self.assertRaises(OSError):
+ s.bind(dev)
+ with self.assertRaises(OSError):
+ s.bind(socket.BDADDR_ANY.encode())
+
+ @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
+ def testBindScoSocket(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
+ s.bind(socket.BDADDR_ANY.encode())
+ addr = s.getsockname()
+ self.assertEqual(addr, socket.BDADDR_ANY)
+
+ @unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
+ def testBadScoAddr(self):
+ with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
+ with self.assertRaises(OSError):
+ s.bind(socket.BDADDR_ANY)
+ with self.assertRaises(OSError):
+ s.bind((socket.BDADDR_ANY.encode(),))
+ with self.assertRaises(OSError):
+ s.bind(b'invalid')
+
@unittest.skipUnless(HAVE_SOCKET_HYPERV,
'Hyper-V sockets required for this test.')