]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Add more syslog tests (GH-97953)
authorSerhiy Storchaka <storchaka@gmail.com>
Fri, 7 Oct 2022 17:17:08 +0000 (20:17 +0300)
committerGitHub <noreply@github.com>
Fri, 7 Oct 2022 17:17:08 +0000 (20:17 +0300)
Lib/test/audit-tests.py
Lib/test/test_audit.py
Lib/test/test_syslog.py
Modules/syslogmodule.c

index 66c08f7f2ff8d518f99324f47e6580f39e83fc82..4abf33d7cad1b69029293acaf687b5a153046e7b 100644 (file)
@@ -429,6 +429,26 @@ def test_wmi_exec_query():
     sys.addaudithook(hook)
     _wmi.exec_query("SELECT * FROM Win32_OperatingSystem")
 
+def test_syslog():
+    import syslog
+
+    def hook(event, args):
+        if event.startswith("syslog."):
+            print(event, *args)
+
+    sys.addaudithook(hook)
+    syslog.openlog('python')
+    syslog.syslog('test')
+    syslog.setlogmask(syslog.LOG_DEBUG)
+    syslog.closelog()
+    # implicit open
+    syslog.syslog('test2')
+    # open with default ident
+    syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0)
+    sys.argv = None
+    syslog.openlog()
+    syslog.closelog()
+
 
 if __name__ == "__main__":
     from test.support import suppress_msvcrt_asserts
index 09b3333afe184f00b7cb183a19e5858a1c70651b..50f78f2a6b8a464c39851cde8cec9b6d059e7858 100644 (file)
@@ -16,6 +16,7 @@ AUDIT_TESTS_PY = support.findfile("audit-tests.py")
 
 
 class AuditTest(unittest.TestCase):
+    maxDiff = None
 
     @support.requires_subprocess()
     def do_test(self, *args):
@@ -185,7 +186,6 @@ class AuditTest(unittest.TestCase):
 
         self.assertEqual(actual, expected)
 
-
     def test_wmi_exec_query(self):
         import_helper.import_module("_wmi")
         returncode, events, stderr = self.run_python("test_wmi_exec_query")
@@ -199,6 +199,29 @@ class AuditTest(unittest.TestCase):
 
         self.assertEqual(actual, expected)
 
+    def test_syslog(self):
+        syslog = import_helper.import_module("syslog")
+
+        returncode, events, stderr = self.run_python("test_syslog")
+        if returncode:
+            self.fail(stderr)
+
+        if support.verbose:
+            print('Events:', *events, sep='\n  ')
+
+        self.assertSequenceEqual(
+            events,
+            [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
+            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
+            ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
+            ('syslog.closelog', '', ''),
+            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
+            ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
+            ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
+            ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
+            ('syslog.closelog', '', '')]
+        )
+
 
 if __name__ == "__main__":
     unittest.main()
index fe09bd39f8b7fd95a401cbe0b5d3d8e0e2125db5..2125ec58d87e031b34c145aa6931efab0e863fed 100644 (file)
@@ -1,5 +1,9 @@
-from test.support import import_helper
+from test.support import import_helper, threading_helper
 syslog = import_helper.import_module("syslog") #skip if not supported
+from test import support
+import sys
+import threading
+import time
 import unittest
 
 # XXX(nnorwitz): This test sucks.  I don't know of a platform independent way
@@ -8,6 +12,9 @@ import unittest
 
 class Test(unittest.TestCase):
 
+    def tearDown(self):
+        syslog.closelog()
+
     def test_openlog(self):
         syslog.openlog('python')
         # Issue #6697.
@@ -18,22 +25,59 @@ class Test(unittest.TestCase):
         syslog.syslog('test message from python test_syslog')
         syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog')
 
+    def test_syslog_implicit_open(self):
+        syslog.closelog() # Make sure log is closed
+        syslog.syslog('test message from python test_syslog')
+        syslog.syslog(syslog.LOG_ERR, 'test error from python test_syslog')
+
     def test_closelog(self):
         syslog.openlog('python')
         syslog.closelog()
+        syslog.closelog()  # idempotent operation
 
     def test_setlogmask(self):
-        syslog.setlogmask(syslog.LOG_DEBUG)
+        mask = syslog.LOG_UPTO(syslog.LOG_WARNING)
+        oldmask = syslog.setlogmask(mask)
+        self.assertEqual(syslog.setlogmask(0), mask)
+        self.assertEqual(syslog.setlogmask(oldmask), mask)
 
     def test_log_mask(self):
-        syslog.LOG_MASK(syslog.LOG_INFO)
-
-    def test_log_upto(self):
-        syslog.LOG_UPTO(syslog.LOG_INFO)
+        mask = syslog.LOG_UPTO(syslog.LOG_WARNING)
+        self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_WARNING))
+        self.assertTrue(mask & syslog.LOG_MASK(syslog.LOG_ERR))
+        self.assertFalse(mask & syslog.LOG_MASK(syslog.LOG_INFO))
 
     def test_openlog_noargs(self):
         syslog.openlog()
         syslog.syslog('test message from python test_syslog')
 
+    @threading_helper.requires_working_threading()
+    def test_syslog_threaded(self):
+        start = threading.Event()
+        stop = False
+        def opener():
+            start.wait(10)
+            i = 1
+            while not stop:
+                syslog.openlog(f'python-test-{i}')  # new string object
+                i += 1
+        def logger():
+            start.wait(10)
+            while not stop:
+                syslog.syslog('test message from python test_syslog')
+
+        orig_si = sys.getswitchinterval()
+        support.setswitchinterval(1e-9)
+        try:
+            threads = [threading.Thread(target=opener)]
+            threads += [threading.Thread(target=logger) for k in range(10)]
+            with threading_helper.start_threads(threads):
+                start.set()
+                time.sleep(0.1)
+                stop = True
+        finally:
+            sys.setswitchinterval(orig_si)
+
+
 if __name__ == "__main__":
     unittest.main()
index c409fe968f8894a6337092ff9ba2eb458ade5f8a..1593eea94a62bf407834959f10251c9f44956e73 100644 (file)
@@ -235,7 +235,7 @@ syslog_setlogmask(PyObject *self, PyObject *args)
 
     if (!PyArg_ParseTuple(args, "l;mask for priority", &maskpri))
         return NULL;
-    if (PySys_Audit("syslog.setlogmask", "(O)", args ? args : Py_None) < 0) {
+    if (PySys_Audit("syslog.setlogmask", "l", maskpri) < 0) {
         return NULL;
     }
     omaskpri = setlogmask(maskpri);