]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Carbon.py
Merge branch 'master' into dnsrr
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Carbon.py
1 #!/usr/bin/env python
2 import Queue
3 import threading
4 import socket
5 import sys
6 import time
7 from dnsdisttests import DNSDistTest
8
9 class TestCarbon(DNSDistTest):
10
11 _carbonServer1Port = 8000
12 _carbonServer1Name = "carbonname1"
13 _carbonServer2Port = 8001
14 _carbonServer2Name = "carbonname2"
15 _carbonQueue1 = Queue.Queue()
16 _carbonQueue2 = Queue.Queue()
17 _carbonInterval = 2
18 _carbonCounters = {}
19 _config_params = ['_carbonServer1Port', '_carbonServer1Name', '_carbonInterval', '_carbonServer2Port', '_carbonServer2Name', '_carbonInterval']
20 _config_template = """
21 carbonServer("127.0.0.1:%s", "%s", %s)
22 carbonServer("127.0.0.1:%s", "%s", %s)
23 """
24
25 @classmethod
26 def CarbonResponder(cls, port):
27 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
28 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
29 try:
30 sock.bind(("127.0.0.1", port))
31 except socket.error as e:
32 print("Error binding in the Carbon responder: %s" % str(e))
33 sys.exit(1)
34
35 sock.listen(100)
36 while True:
37 (conn, _) = sock.accept()
38 conn.settimeout(2.0)
39 lines = ""
40 while True:
41 data = conn.recv(4096)
42 if not data:
43 break
44 lines += data
45
46 if port == cls._carbonServer1Port:
47 cls._carbonQueue1.put(lines, True, timeout=2.0)
48 else:
49 cls._carbonQueue2.put(lines, True, timeout=2.0)
50 if threading.currentThread().name in cls._carbonCounters:
51 cls._carbonCounters[threading.currentThread().name] += 1
52 else:
53 cls._carbonCounters[threading.currentThread().name] = 1
54
55 conn.close()
56 sock.close()
57
58 @classmethod
59 def startResponders(cls):
60 cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port])
61 cls._CarbonResponder1.setDaemon(True)
62 cls._CarbonResponder1.start()
63
64 cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port])
65 cls._CarbonResponder2.setDaemon(True)
66 cls._CarbonResponder2.start()
67
68 def testCarbon(self):
69 """
70 Carbon: send data to 2 carbon servers
71 """
72 # wait for the carbon data to be sent
73 time.sleep(self._carbonInterval + 1)
74
75 # first server
76 self.assertFalse(self._carbonQueue1.empty())
77 data1 = self._carbonQueue1.get(False)
78 # second server
79 self.assertFalse(self._carbonQueue2.empty())
80 data2 = self._carbonQueue2.get(False)
81 after = time.time()
82
83 self.assertTrue(data1)
84 self.assertTrue(len(data1.splitlines()) > 1)
85 expectedStart = "dnsdist." + self._carbonServer1Name + ".main."
86 for line in data1.splitlines():
87 self.assertTrue(line.startswith(expectedStart))
88 parts = line.split(' ')
89 self.assertEquals(len(parts), 3)
90 self.assertTrue(parts[1].isdigit())
91 self.assertTrue(parts[2].isdigit())
92 self.assertTrue(int(parts[2]) <= int(after))
93
94 self.assertTrue(data2)
95 self.assertTrue(len(data2.splitlines()) > 1)
96 expectedStart = "dnsdist." + self._carbonServer2Name + ".main."
97 for line in data2.splitlines():
98 self.assertTrue(line.startswith(expectedStart))
99 parts = line.split(' ')
100 self.assertEquals(len(parts), 3)
101 self.assertTrue(parts[1].isdigit())
102 self.assertTrue(parts[2].isdigit())
103 self.assertTrue(int(parts[2]) <= int(after))
104
105 # make sure every carbon server has received at least one connection
106 for key in self._carbonCounters:
107 value = self._carbonCounters[key]
108 self.assertTrue(value >= 1)