]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Carbon.py
Merge 'master', but take geoipbackend from master as
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Carbon.py
1 #!/usr/bin/env python
2 import threading
3 import socket
4 import sys
5 import time
6 from dnsdisttests import DNSDistTest, Queue
7
8 class TestCarbon(DNSDistTest):
9
10 _carbonServer1Port = 8000
11 _carbonServer1Name = "carbonname1"
12 _carbonServer2Port = 8001
13 _carbonServer2Name = "carbonname2"
14 _carbonQueue1 = Queue()
15 _carbonQueue2 = Queue()
16 _carbonInterval = 2
17 _carbonCounters = {}
18 _config_params = ['_carbonServer1Port', '_carbonServer1Name', '_carbonInterval', '_carbonServer2Port', '_carbonServer2Name', '_carbonInterval']
19 _config_template = """
20 carbonServer("127.0.0.1:%s", "%s", %s)
21 carbonServer("127.0.0.1:%s", "%s", %s)
22 """
23
24 @classmethod
25 def CarbonResponder(cls, port):
26 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
28 try:
29 sock.bind(("127.0.0.1", port))
30 except socket.error as e:
31 print("Error binding in the Carbon responder: %s" % str(e))
32 sys.exit(1)
33
34 sock.listen(100)
35 while True:
36 (conn, _) = sock.accept()
37 conn.settimeout(2.0)
38 lines = b''
39 while True:
40 data = conn.recv(4096)
41 if not data:
42 break
43 lines += data
44
45 if port == cls._carbonServer1Port:
46 cls._carbonQueue1.put(lines, True, timeout=2.0)
47 else:
48 cls._carbonQueue2.put(lines, True, timeout=2.0)
49 if threading.currentThread().name in cls._carbonCounters:
50 cls._carbonCounters[threading.currentThread().name] += 1
51 else:
52 cls._carbonCounters[threading.currentThread().name] = 1
53
54 conn.close()
55 sock.close()
56
57 @classmethod
58 def startResponders(cls):
59 cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port])
60 cls._CarbonResponder1.setDaemon(True)
61 cls._CarbonResponder1.start()
62
63 cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port])
64 cls._CarbonResponder2.setDaemon(True)
65 cls._CarbonResponder2.start()
66
67 def testCarbon(self):
68 """
69 Carbon: send data to 2 carbon servers
70 """
71 # wait for the carbon data to be sent
72 time.sleep(self._carbonInterval + 1)
73
74 # first server
75 self.assertFalse(self._carbonQueue1.empty())
76 data1 = self._carbonQueue1.get(False)
77 # second server
78 self.assertFalse(self._carbonQueue2.empty())
79 data2 = self._carbonQueue2.get(False)
80 after = time.time()
81
82 self.assertTrue(data1)
83 self.assertTrue(len(data1.splitlines()) > 1)
84 expectedStart = b"dnsdist.%s.main." % self._carbonServer1Name.encode('UTF-8')
85 for line in data1.splitlines():
86 self.assertTrue(line.startswith(expectedStart))
87 parts = line.split(b' ')
88 self.assertEquals(len(parts), 3)
89 self.assertTrue(parts[1].isdigit())
90 self.assertTrue(parts[2].isdigit())
91 self.assertTrue(int(parts[2]) <= int(after))
92
93 self.assertTrue(data2)
94 self.assertTrue(len(data2.splitlines()) > 1)
95 expectedStart = b"dnsdist.%s.main." % self._carbonServer2Name.encode('UTF-8')
96 for line in data2.splitlines():
97 self.assertTrue(line.startswith(expectedStart))
98 parts = line.split(b' ')
99 self.assertEquals(len(parts), 3)
100 self.assertTrue(parts[1].isdigit())
101 self.assertTrue(parts[2].isdigit())
102 self.assertTrue(int(parts[2]) <= int(after))
103
104 # make sure every carbon server has received at least one connection
105 for key in self._carbonCounters:
106 value = self._carbonCounters[key]
107 self.assertTrue(value >= 1)