]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_Carbon.py
Merge pull request #12086 from zeha/apizonepost
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Carbon.py
CommitLineData
ddcc8353
O
1#!/usr/bin/env python
2import threading
3import socket
4import sys
5import time
6import os
7from queue import Queue
8
9from recursortests import RecursorTest
10
11class TestCarbon(RecursorTest):
12 _confdir = 'Carbon'
13 _carbonNamespace = 'NS'
14 _carbonInstance = 'Instance'
15 _carbonServerName = "carbonname1"
16 _carbonInterval = 2
17 _carbonServer1Port = 8000
18 _carbonServer2Port = 8001
19 _carbonQueue1 = Queue()
20 _carbonQueue2 = Queue()
21 _carbonCounters = {}
22 _config_template = """
23 carbon-namespace=%s
24 carbon-instance=%s
25 carbon-interval=%s
26 carbon-ourname=%s
27 carbon-server=127.0.0.1:%s,127.0.01:%s
28 """ % (_carbonNamespace, _carbonInstance, _carbonInterval, _carbonServerName, _carbonServer1Port, _carbonServer2Port)
29 @classmethod
30 def setUpClass(cls):
31
32 # we don't need all the auth stuff
33 cls.setUpSockets()
34 cls.startResponders()
35
36 confdir = os.path.join('configs', cls._confdir)
37 cls.createConfigDir(confdir)
38
39 cls.generateRecursorConfig(confdir)
40 cls.startRecursor(confdir, cls._recursorPort)
41
42 @classmethod
43 def tearDownClass(cls):
44 cls.tearDownRecursor()
45
46 @classmethod
47 def CarbonResponder(cls, port):
48 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
50 try:
51 sock.bind(("127.0.0.1", port))
52 except socket.error as e:
53 print("Error binding in the Carbon responder: %s" % str(e))
54 sys.exit(1)
55
56 sock.listen(100)
57 while True:
58 (conn, _) = sock.accept()
59 conn.settimeout(2.0)
60 lines = b''
61 while True:
62 data = conn.recv(4096)
63 if not data:
64 break
65 lines += data
66
67 if port == cls._carbonServer1Port:
68 cls._carbonQueue1.put(lines, True, timeout=2.0)
69 else:
70 cls._carbonQueue2.put(lines, True, timeout=2.0)
71 if threading.currentThread().name in cls._carbonCounters:
72 cls._carbonCounters[threading.currentThread().name] += 1
73 else:
74 cls._carbonCounters[threading.currentThread().name] = 1
75
76 conn.close()
77 sock.close()
78
79 @classmethod
80 def startResponders(cls):
81 cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port])
82 cls._CarbonResponder1.setDaemon(True)
83 cls._CarbonResponder1.start()
84
85 cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port])
86 cls._CarbonResponder2.setDaemon(True)
87 cls._CarbonResponder2.start()
88
89 def testCarbon(self):
90 """
91 Carbon: send data to 2 carbon servers
92 """
93 # wait for the carbon data to be sent
94 time.sleep(self._carbonInterval + 1)
95
96 # check if the servers have received our data
97 # we will block for a short while if the data is not already there,
98 # and an exception will be raised after the timeout
99 # first server
100 data1 = self._carbonQueue1.get(block=True, timeout=2.0)
101 # second server
102 data2 = self._carbonQueue2.get(block=True, timeout=2.0)
103 after = time.time()
104
105 self.assertTrue(data1)
106 self.assertTrue(len(data1.splitlines()) > 1)
107 expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8'))
108 for line in data1.splitlines():
109 self.assertTrue(line.startswith(expectedStart))
110 parts = line.split(b' ')
111 self.assertEqual(len(parts), 3)
112 self.assertTrue(parts[1].isdigit())
113 self.assertTrue(parts[2].isdigit())
114 self.assertTrue(int(parts[2]) <= int(after))
115
116 self.assertTrue(data2)
117 self.assertTrue(len(data2.splitlines()) > 1)
118 expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8'))
119 for line in data2.splitlines():
120 self.assertTrue(line.startswith(expectedStart))
121 parts = line.split(b' ')
122 self.assertEqual(len(parts), 3)
123 self.assertTrue(parts[1].isdigit())
124 self.assertTrue(parts[2].isdigit())
125 self.assertTrue(int(parts[2]) <= int(after))
126
127 # make sure every carbon server has received at least one connection
128 for key in self._carbonCounters:
129 value = self._carbonCounters[key]
130 self.assertTrue(value >= 1)
131