]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_Carbon.py
Merge pull request #10945 from rgacogne/auth-pdns-control-link
[thirdparty/pdns.git] / regression-tests.auth-py / test_Carbon.py
1 #!/usr/bin/env python
2 import threading
3 import socket
4 import sys
5 import time
6 import os
7 from queue import Queue
8
9 from authtests import AuthTest
10
11 class TestCarbon(AuthTest):
12 _carbonNamespace = 'NS'
13 _carbonInstance = 'Instance'
14 _carbonServerName = "carbonname1"
15 _carbonInterval = 2
16 _carbonServer1Port = 8000
17 _carbonServer2Port = 8001
18 _carbonQueue1 = Queue()
19 _carbonQueue2 = Queue()
20 _carbonCounters = {}
21 _config_template = """
22 launch=bind
23 carbon-namespace=%s
24 carbon-instance=%s
25 carbon-interval=%s
26 carbon-ourname=%s
27 carbon-server=127.0.0.1:%s,127.0.01:%s
28 """ % (_carbonNamespace, _carbonInstance, _carbonInterval, _carbonServerName, _carbonServer1Port, _carbonServer2Port)
29
30 @classmethod
31 def CarbonResponder(cls, port):
32 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
33 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
34 try:
35 sock.bind(("127.0.0.1", port))
36 except socket.error as e:
37 print("Error binding in the Carbon responder: %s" % str(e))
38 sys.exit(1)
39
40 sock.listen(100)
41 while True:
42 (conn, _) = sock.accept()
43 conn.settimeout(2.0)
44 lines = b''
45 while True:
46 data = conn.recv(4096)
47 if not data:
48 break
49 lines += data
50
51 if port == cls._carbonServer1Port:
52 cls._carbonQueue1.put(lines, True, timeout=2.0)
53 else:
54 cls._carbonQueue2.put(lines, True, timeout=2.0)
55 if threading.currentThread().name in cls._carbonCounters:
56 cls._carbonCounters[threading.currentThread().name] += 1
57 else:
58 cls._carbonCounters[threading.currentThread().name] = 1
59
60 conn.close()
61 sock.close()
62
63 @classmethod
64 def startResponders(cls):
65 cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port])
66 cls._CarbonResponder1.setDaemon(True)
67 cls._CarbonResponder1.start()
68
69 cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port])
70 cls._CarbonResponder2.setDaemon(True)
71 cls._CarbonResponder2.start()
72
73 def testCarbon(self):
74 """
75 Carbon: send data to 2 carbon servers
76 """
77 # wait for the carbon data to be sent
78 time.sleep(self._carbonInterval + 1)
79
80 # check if the servers have received our data
81 # we will block for a short while if the data is not already there,
82 # and an exception will be raised after the timeout
83 # first server
84 data1 = self._carbonQueue1.get(block=True, timeout=2.0)
85 # second server
86 data2 = self._carbonQueue2.get(block=True, timeout=2.0)
87 after = time.time()
88
89 self.assertTrue(data1)
90 self.assertTrue(len(data1.splitlines()) > 1)
91 expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8'))
92 for line in data1.splitlines():
93 self.assertTrue(line.startswith(expectedStart))
94 parts = line.split(b' ')
95 self.assertEqual(len(parts), 3)
96 self.assertTrue(parts[1].isdigit())
97 self.assertTrue(parts[2].isdigit())
98 self.assertTrue(int(parts[2]) <= int(after))
99
100 self.assertTrue(data2)
101 self.assertTrue(len(data2.splitlines()) > 1)
102 expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8'))
103 for line in data2.splitlines():
104 self.assertTrue(line.startswith(expectedStart))
105 parts = line.split(b' ')
106 self.assertEqual(len(parts), 3)
107 self.assertTrue(parts[1].isdigit())
108 self.assertTrue(parts[2].isdigit())
109 self.assertTrue(int(parts[2]) <= int(after))
110
111 # make sure every carbon server has received at least one connection
112 for key in self._carbonCounters:
113 value = self._carbonCounters[key]
114 self.assertTrue(value >= 1)
115