if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
- chunk = self.connection.recv(slen)
+ chunk = conn.recv(slen)
while len(chunk) < slen:
- chunk = chunk + self.connection.recv(slen - len(chunk))
- obj = self.unpickle(chunk)
+ chunk = chunk + conn.recv(slen - len(chunk))
+ obj = pickle.loads(chunk)
record = logging.makeLogRecord(obj)
- self.handle_log_record(record)
+ self.log_output += record.msg + '\n'
+ self.handled.release()
- def unpickle(self, data):
- return pickle.loads(data)
+ def test_output(self):
+ # The log message sent to the SocketHandler is properly received.
+ logger = logging.getLogger("tcp")
+ logger.error("spam")
+ self.handled.acquire()
+ logger.debug("eggs")
+ self.handled.acquire()
+ self.assertEqual(self.log_output, "spam\neggs\n")
- def handle_log_record(self, record):
- # If the end-of-messages sentinel is seen, tell the server to
- # terminate.
- if self.TCP_LOG_END in record.msg:
- self.server.abort = 1
- return
- self.server.log_output += record.msg + "\n"
+ def test_noserver(self):
+ # Kill the server
+ self.server.stop(2.0)
+ #The logging call should try to connect, which should fail
+ try:
+ raise RuntimeError('Deliberate mistake')
+ except RuntimeError:
+ self.root_logger.exception('Never sent')
+ self.root_logger.error('Never sent, either')
+ now = time.time()
+ self.assertTrue(self.sock_hdlr.retryTime > now)
+ time.sleep(self.sock_hdlr.retryTime - now + 0.001)
+ self.root_logger.error('Nor this')
-class LogRecordSocketReceiver(ThreadingTCPServer):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class DatagramHandlerTest(BaseTest):
- """A simple-minded TCP socket-based logging receiver suitable for test
- purposes."""
+ """Test for DatagramHandler."""
- allow_reuse_address = 1
- log_output = ""
+ def setUp(self):
+ """Set up a UDP server to receive log messages, and a DatagramHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ addr = ('localhost', 0)
- self.server = server = TestUDPServer(addr, self.handle_datagram,
- 0.01)
++ self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01)
+ server.start()
+ server.ready.wait()
+ self.sock_hdlr = logging.handlers.DatagramHandler('localhost',
+ server.port)
+ self.log_output = ''
+ self.root_logger.removeHandler(self.root_logger.handlers[0])
+ self.root_logger.addHandler(self.sock_hdlr)
+ self.handled = threading.Event()
- def __init__(self, host='localhost',
- port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
- handler=LogRecordStreamHandler):
- ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = False
- self.timeout = 0.1
- self.finished = threading.Event()
+ def tearDown(self):
+ """Shutdown the UDP server."""
+ try:
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.sock_hdlr)
+ self.sock_hdlr.close()
+ finally:
+ BaseTest.tearDown(self)
+
+ def handle_datagram(self, request):
+ slen = struct.pack('>L', 0) # length of prefix
+ packet = request.packet[len(slen):]
+ obj = pickle.loads(packet)
+ record = logging.makeLogRecord(obj)
+ self.log_output += record.msg + '\n'
+ self.handled.set()
- def serve_until_stopped(self):
- while not self.abort:
- rd, wr, ex = select.select([self.socket.fileno()], [], [],
- self.timeout)
- if rd:
- self.handle_request()
- # Notify the main thread that we're about to exit
- self.finished.set()
- # close the listen socket
- self.server_close()
+ def test_output(self):
+ # The log message sent to the DatagramHandler is properly received.
+ logger = logging.getLogger("udp")
+ logger.error("spam")
+ self.handled.wait()
+ self.handled.clear()
+ logger.error("eggs")
+ self.handled.wait()
+ self.assertEqual(self.log_output, "spam\neggs\n")
@unittest.skipUnless(threading, 'Threading required for this test.')
finally:
BaseTest.tearDown(self)
- def get_output(self):
- """Get the log output as received by the TCP server."""
- # Signal the TCP receiver and wait for it to terminate.
- self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
- self.tcpserver.finished.wait(2.0)
- return self.tcpserver.log_output
+ def handle_datagram(self, request):
+ self.log_output = request.packet
+ self.handled.set()
def test_output(self):
- # The log message sent to the SocketHandler is properly received.
- logger = logging.getLogger("tcp")
- logger.error("spam")
- logger.debug("eggs")
- self.assertEqual(self.get_output(), "spam\neggs\n")
+ # The log message sent to the SysLogHandler is properly received.
+ logger = logging.getLogger("slh")
+ logger.error("sp\xe4m")
+ self.handled.wait()
+ self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m\x00')
++ self.handled.clear()
++ self.sl_hdlr.append_nul = False
++ logger.error("sp\xe4m")
++ self.handled.wait()
++ self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m')
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class HTTPHandlerTest(BaseTest):
+ """Test for HTTPHandler."""
+
+ PEMFILE = """-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDGT4xS5r91rbLJQK2nUDenBhBG6qFk+bVOjuAGC/LSHlAoBnvG
+zQG3agOG+e7c5z2XT8m2ktORLqG3E4mYmbxgyhDrzP6ei2Anc+pszmnxPoK3Puh5
+aXV+XKt0bU0C1m2+ACmGGJ0t3P408art82nOxBw8ZHgIg9Dtp6xIUCyOqwIDAQAB
+AoGBAJFTnFboaKh5eUrIzjmNrKsG44jEyy+vWvHN/FgSC4l103HxhmWiuL5Lv3f7
+0tMp1tX7D6xvHwIG9VWvyKb/Cq9rJsDibmDVIOslnOWeQhG+XwJyitR0pq/KlJIB
+5LjORcBw795oKWOAi6RcOb1ON59tysEFYhAGQO9k6VL621gRAkEA/Gb+YXULLpbs
+piXN3q4zcHzeaVANo69tUZ6TjaQqMeTxE4tOYM0G0ZoSeHEdaP59AOZGKXXNGSQy
+2z/MddcYGQJBAMkjLSYIpOLJY11ja8OwwswFG2hEzHe0cS9bzo++R/jc1bHA5R0Y
+i6vA5iPi+wopPFvpytdBol7UuEBe5xZrxWMCQQCWxELRHiP2yWpEeLJ3gGDzoXMN
+PydWjhRju7Bx3AzkTtf+D6lawz1+eGTuEss5i0JKBkMEwvwnN2s1ce+EuF4JAkBb
+E96h1lAzkVW5OAfYOPY8RCPA90ZO/hoyg7PpSxR0ECuDrgERR8gXIeYUYfejBkEa
+rab4CfRoVJKKM28Yq/xZAkBvuq670JRCwOgfUTdww7WpdOQBYPkzQccsKNCslQW8
+/DyW6y06oQusSENUvynT6dr3LJxt/NgZPhZX2+k1eYDV
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICGzCCAYSgAwIBAgIJAIq84a2Q/OvlMA0GCSqGSIb3DQEBBQUAMBQxEjAQBgNV
+BAMTCWxvY2FsaG9zdDAeFw0xMTA1MjExMDIzMzNaFw03NTAzMjEwMzU1MTdaMBQx
+EjAQBgNVBAMTCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA
+xk+MUua/da2yyUCtp1A3pwYQRuqhZPm1To7gBgvy0h5QKAZ7xs0Bt2oDhvnu3Oc9
+l0/JtpLTkS6htxOJmJm8YMoQ68z+notgJ3PqbM5p8T6Ctz7oeWl1flyrdG1NAtZt
+vgAphhidLdz+NPGq7fNpzsQcPGR4CIPQ7aesSFAsjqsCAwEAAaN1MHMwHQYDVR0O
+BBYEFLWaUPO6N7efGiuoS9i3DVYcUwn0MEQGA1UdIwQ9MDuAFLWaUPO6N7efGiuo
+S9i3DVYcUwn0oRikFjAUMRIwEAYDVQQDEwlsb2NhbGhvc3SCCQCKvOGtkPzr5TAM
+BgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAMK5whPjLNQK1Ivvk88oqJqq
+4f889OwikGP0eUhOBhbFlsZs+jq5YZC2UzHz+evzKBlgAP1u4lP/cB85CnjvWqM+
+1c/lywFHQ6HOdDeQ1L72tSYMrNOG4XNmLn0h7rx6GoTU7dcFRfseahBCq8mv0IDt
+IRbTpvlHWPjsSvHz0ZOH
+-----END CERTIFICATE-----"""
+
+ def setUp(self):
+ """Set up an HTTP server to receive log messages, and a HTTPHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ self.handled = threading.Event()
+
+ def handle_request(self, request):
+ self.command = request.command
+ self.log_data = urlparse(request.path)
+ if self.command == 'POST':
+ try:
+ rlen = int(request.headers['Content-Length'])
+ self.post_data = request.rfile.read(rlen)
+ except:
+ self.post_data = None
+ request.send_response(200)
+ request.end_headers()
+ self.handled.set()
+
+ def test_output(self):
+ # The log message sent to the HTTPHandler is properly received.
+ logger = logging.getLogger("http")
+ root_logger = self.root_logger
+ root_logger.removeHandler(self.root_logger.handlers[0])
+ for secure in (False, True):
+ addr = ('localhost', 0)
+ if secure:
+ try:
+ import ssl
+ fd, fn = tempfile.mkstemp()
+ os.close(fd)
+ with open(fn, 'w') as f:
+ f.write(self.PEMFILE)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslctx.load_cert_chain(fn)
+ os.unlink(fn)
+ except ImportError:
+ sslctx = None
+ else:
+ sslctx = None
+ self.server = server = TestHTTPServer(addr, self.handle_request,
+ 0.01, sslctx=sslctx)
+ server.start()
+ server.ready.wait()
+ host = 'localhost:%d' % server.server_port
+ secure_client = secure and sslctx
+ self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
+ secure=secure_client)
+ self.log_data = None
+ root_logger.addHandler(self.h_hdlr)
+
+ for method in ('GET', 'POST'):
+ self.h_hdlr.method = method
+ self.handled.clear()
+ msg = "sp\xe4m"
+ logger.error(msg)
+ self.handled.wait()
+ self.assertEqual(self.log_data.path, '/frob')
+ self.assertEqual(self.command, method)
+ if method == 'GET':
+ d = parse_qs(self.log_data.query)
+ else:
+ d = parse_qs(self.post_data.decode('utf-8'))
+ self.assertEqual(d['name'], ['http'])
+ self.assertEqual(d['funcName'], ['test_output'])
+ self.assertEqual(d['msg'], [msg])
+
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.h_hdlr)
+ self.h_hdlr.close()
class MemoryTest(BaseTest):