]>
Commit | Line | Data |
---|---|---|
f9584307 | 1 | #!/usr/bin/env python |
f9584307 RG |
2 | import threading |
3 | import socket | |
55baa1f2 | 4 | import sys |
f9584307 | 5 | import time |
630eb526 | 6 | from dnsdisttests import DNSDistTest, Queue, pickAvailablePort |
f9584307 RG |
7 | |
8 | class TestCarbon(DNSDistTest): | |
9 | ||
630eb526 | 10 | _carbonServer1Port = pickAvailablePort() |
f9584307 | 11 | _carbonServer1Name = "carbonname1" |
630eb526 | 12 | _carbonServer2Port = pickAvailablePort() |
f9584307 | 13 | _carbonServer2Name = "carbonname2" |
b4f23783 CH |
14 | _carbonQueue1 = Queue() |
15 | _carbonQueue2 = Queue() | |
f9584307 RG |
16 | _carbonInterval = 2 |
17 | _carbonCounters = {} | |
59d875c4 LM |
18 | _config_params = ['_carbonServer1Port', '_carbonServer1Name', '_carbonInterval', |
19 | '_carbonServer2Port', '_carbonServer2Name', '_carbonInterval'] | |
f9584307 | 20 | _config_template = """ |
08ed4af4 LM |
21 | s = newServer{address="127.0.0.1:5353"} |
22 | s:setDown() | |
23 | s = newServer{address="127.0.0.1:5354"} | |
24 | s:setUp() | |
25 | s = newServer{address="127.0.0.1:5355"} | |
26 | s:setUp() | |
f9584307 RG |
27 | carbonServer("127.0.0.1:%s", "%s", %s) |
28 | carbonServer("127.0.0.1:%s", "%s", %s) | |
29 | """ | |
30 | ||
31 | @classmethod | |
32 | def CarbonResponder(cls, port): | |
33 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
34 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
35 | try: | |
36 | sock.bind(("127.0.0.1", port)) | |
37 | except socket.error as e: | |
38 | print("Error binding in the Carbon responder: %s" % str(e)) | |
39 | sys.exit(1) | |
40 | ||
41 | sock.listen(100) | |
42 | while True: | |
43 | (conn, _) = sock.accept() | |
44 | conn.settimeout(2.0) | |
b4f23783 | 45 | lines = b'' |
f9584307 RG |
46 | while True: |
47 | data = conn.recv(4096) | |
48 | if not data: | |
49 | break | |
50 | lines += data | |
51 | ||
52 | if port == cls._carbonServer1Port: | |
53 | cls._carbonQueue1.put(lines, True, timeout=2.0) | |
54 | else: | |
55 | cls._carbonQueue2.put(lines, True, timeout=2.0) | |
630eb526 RG |
56 | if threading.current_thread().name in cls._carbonCounters: |
57 | cls._carbonCounters[threading.current_thread().name] += 1 | |
f9584307 | 58 | else: |
630eb526 | 59 | cls._carbonCounters[threading.current_thread().name] = 1 |
f9584307 RG |
60 | |
61 | conn.close() | |
62 | sock.close() | |
63 | ||
64 | @classmethod | |
65 | def startResponders(cls): | |
66 | cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port]) | |
630eb526 | 67 | cls._CarbonResponder1.daemon = True |
f9584307 RG |
68 | cls._CarbonResponder1.start() |
69 | ||
70 | cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port]) | |
630eb526 | 71 | cls._CarbonResponder2.daemon = True |
f9584307 RG |
72 | cls._CarbonResponder2.start() |
73 | ||
554c7feb OM |
74 | def isfloat(self, num): |
75 | try: | |
76 | float(num) | |
77 | return True | |
78 | except ValueError: | |
79 | return False | |
80 | ||
f9584307 RG |
81 | def testCarbon(self): |
82 | """ | |
83 | Carbon: send data to 2 carbon servers | |
84 | """ | |
85 | # wait for the carbon data to be sent | |
86 | time.sleep(self._carbonInterval + 1) | |
87 | ||
cf8ae7bb RG |
88 | # check if the servers have received our data |
89 | # we will block for a short while if the data is not already there, | |
90 | # and an exception will be raised after the timeout | |
f9584307 | 91 | # first server |
cf8ae7bb | 92 | data1 = self._carbonQueue1.get(block=True, timeout=2.0) |
f9584307 | 93 | # second server |
cf8ae7bb | 94 | data2 = self._carbonQueue2.get(block=True, timeout=2.0) |
f9584307 RG |
95 | after = time.time() |
96 | ||
97 | self.assertTrue(data1) | |
98 | self.assertTrue(len(data1.splitlines()) > 1) | |
b4f23783 | 99 | expectedStart = b"dnsdist.%s.main." % self._carbonServer1Name.encode('UTF-8') |
f9584307 RG |
100 | for line in data1.splitlines(): |
101 | self.assertTrue(line.startswith(expectedStart)) | |
b4f23783 | 102 | parts = line.split(b' ') |
4bfebc93 | 103 | self.assertEqual(len(parts), 3) |
554c7feb | 104 | self.assertTrue(self.isfloat(parts[1])) |
f9584307 RG |
105 | self.assertTrue(parts[2].isdigit()) |
106 | self.assertTrue(int(parts[2]) <= int(after)) | |
107 | ||
108 | self.assertTrue(data2) | |
109 | self.assertTrue(len(data2.splitlines()) > 1) | |
b4f23783 | 110 | expectedStart = b"dnsdist.%s.main." % self._carbonServer2Name.encode('UTF-8') |
f9584307 RG |
111 | for line in data2.splitlines(): |
112 | self.assertTrue(line.startswith(expectedStart)) | |
b4f23783 | 113 | parts = line.split(b' ') |
4bfebc93 | 114 | self.assertEqual(len(parts), 3) |
554c7feb | 115 | self.assertTrue(self.isfloat(parts[1])) |
f9584307 RG |
116 | self.assertTrue(parts[2].isdigit()) |
117 | self.assertTrue(int(parts[2]) <= int(after)) | |
118 | ||
119 | # make sure every carbon server has received at least one connection | |
120 | for key in self._carbonCounters: | |
121 | value = self._carbonCounters[key] | |
122 | self.assertTrue(value >= 1) | |
59d875c4 LM |
123 | |
124 | def testCarbonServerUp(self): | |
cdc2fb01 JS |
125 | """ |
126 | Carbon: set up 2 carbon servers | |
127 | """ | |
59d875c4 LM |
128 | # wait for the carbon data to be sent |
129 | time.sleep(self._carbonInterval + 1) | |
130 | ||
cf8ae7bb RG |
131 | # check if the servers have received our data |
132 | # we will block for a short while if the data is not already there, | |
133 | # and an exception will be raised after the timeout | |
59d875c4 | 134 | # first server |
cf8ae7bb | 135 | data1 = self._carbonQueue1.get(block=True, timeout=2.0) |
59d875c4 | 136 | # second server |
cf8ae7bb | 137 | data2 = self._carbonQueue2.get(block=True, timeout=2.0) |
59d875c4 LM |
138 | after = time.time() |
139 | ||
140 | # check the first carbon server got both servers and | |
141 | # servers-up metrics and that they are the same as | |
142 | # configured in the class definition | |
143 | self.assertTrue(data1) | |
144 | self.assertTrue(len(data1.splitlines()) > 1) | |
145 | expectedStart = b"dnsdist.%s.main.pools._default_.servers" % self._carbonServer1Name.encode('UTF-8') | |
146 | for line in data1.splitlines(): | |
147 | if expectedStart in line: | |
148 | parts = line.split(b' ') | |
06b0e003 | 149 | if b'servers-up' in line: |
4bfebc93 | 150 | self.assertEqual(len(parts), 3) |
59d875c4 | 151 | self.assertTrue(parts[1].isdigit()) |
4bfebc93 | 152 | self.assertEqual(int(parts[1]), 2) |
59d875c4 LM |
153 | self.assertTrue(parts[2].isdigit()) |
154 | self.assertTrue(int(parts[2]) <= int(after)) | |
155 | else: | |
4bfebc93 | 156 | self.assertEqual(len(parts), 3) |
59d875c4 | 157 | self.assertTrue(parts[1].isdigit()) |
4bfebc93 | 158 | self.assertEqual(int(parts[1]), 3) |
59d875c4 LM |
159 | self.assertTrue(parts[2].isdigit()) |
160 | self.assertTrue(int(parts[2]) <= int(after)) | |
161 | ||
162 | # check the second carbon server got both servers and | |
163 | # servers-up metrics and that they are the same as | |
164 | # configured in the class definition and the same as | |
165 | # the first carbon server | |
166 | self.assertTrue(data2) | |
167 | self.assertTrue(len(data2.splitlines()) > 1) | |
168 | expectedStart = b"dnsdist.%s.main.pools._default_.servers" % self._carbonServer2Name.encode('UTF-8') | |
169 | for line in data2.splitlines(): | |
170 | if expectedStart in line: | |
171 | parts = line.split(b' ') | |
06b0e003 | 172 | if b'servers-up' in line: |
4bfebc93 | 173 | self.assertEqual(len(parts), 3) |
59d875c4 | 174 | self.assertTrue(parts[1].isdigit()) |
4bfebc93 | 175 | self.assertEqual(int(parts[1]), 2) |
59d875c4 LM |
176 | self.assertTrue(parts[2].isdigit()) |
177 | self.assertTrue(int(parts[2]) <= int(after)) | |
178 | else: | |
4bfebc93 | 179 | self.assertEqual(len(parts), 3) |
59d875c4 | 180 | self.assertTrue(parts[1].isdigit()) |
4bfebc93 | 181 | self.assertEqual(int(parts[1]), 3) |
59d875c4 LM |
182 | self.assertTrue(parts[2].isdigit()) |
183 | self.assertTrue(int(parts[2]) <= int(after)) |