]>
Commit | Line | Data |
---|---|---|
ddcc8353 O |
1 | #!/usr/bin/env python |
2 | import threading | |
3 | import socket | |
4 | import sys | |
5 | import time | |
6 | import os | |
7 | from queue import Queue | |
8 | ||
9 | from recursortests import RecursorTest | |
10 | ||
11 | class TestCarbon(RecursorTest): | |
12 | _confdir = 'Carbon' | |
13 | _carbonNamespace = 'NS' | |
14 | _carbonInstance = 'Instance' | |
15 | _carbonServerName = "carbonname1" | |
16 | _carbonInterval = 2 | |
17 | _carbonServer1Port = 8000 | |
18 | _carbonServer2Port = 8001 | |
19 | _carbonQueue1 = Queue() | |
20 | _carbonQueue2 = Queue() | |
21 | _carbonCounters = {} | |
22 | _config_template = """ | |
23 | carbon-namespace=%s | |
24 | carbon-instance=%s | |
25 | carbon-interval=%s | |
26 | carbon-ourname=%s | |
27 | carbon-server=127.0.0.1:%s,127.0.01:%s | |
28 | """ % (_carbonNamespace, _carbonInstance, _carbonInterval, _carbonServerName, _carbonServer1Port, _carbonServer2Port) | |
29 | @classmethod | |
30 | def setUpClass(cls): | |
31 | ||
32 | # we don't need all the auth stuff | |
33 | cls.setUpSockets() | |
34 | cls.startResponders() | |
35 | ||
36 | confdir = os.path.join('configs', cls._confdir) | |
37 | cls.createConfigDir(confdir) | |
38 | ||
39 | cls.generateRecursorConfig(confdir) | |
40 | cls.startRecursor(confdir, cls._recursorPort) | |
41 | ||
42 | @classmethod | |
43 | def tearDownClass(cls): | |
44 | cls.tearDownRecursor() | |
45 | ||
46 | @classmethod | |
47 | def CarbonResponder(cls, port): | |
48 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
49 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
50 | try: | |
51 | sock.bind(("127.0.0.1", port)) | |
52 | except socket.error as e: | |
53 | print("Error binding in the Carbon responder: %s" % str(e)) | |
54 | sys.exit(1) | |
55 | ||
56 | sock.listen(100) | |
57 | while True: | |
58 | (conn, _) = sock.accept() | |
59 | conn.settimeout(2.0) | |
60 | lines = b'' | |
61 | while True: | |
62 | data = conn.recv(4096) | |
63 | if not data: | |
64 | break | |
65 | lines += data | |
66 | ||
67 | if port == cls._carbonServer1Port: | |
68 | cls._carbonQueue1.put(lines, True, timeout=2.0) | |
69 | else: | |
70 | cls._carbonQueue2.put(lines, True, timeout=2.0) | |
71 | if threading.currentThread().name in cls._carbonCounters: | |
72 | cls._carbonCounters[threading.currentThread().name] += 1 | |
73 | else: | |
74 | cls._carbonCounters[threading.currentThread().name] = 1 | |
75 | ||
76 | conn.close() | |
77 | sock.close() | |
78 | ||
79 | @classmethod | |
80 | def startResponders(cls): | |
81 | cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port]) | |
82 | cls._CarbonResponder1.setDaemon(True) | |
83 | cls._CarbonResponder1.start() | |
84 | ||
85 | cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port]) | |
86 | cls._CarbonResponder2.setDaemon(True) | |
87 | cls._CarbonResponder2.start() | |
88 | ||
89 | def testCarbon(self): | |
90 | """ | |
91 | Carbon: send data to 2 carbon servers | |
92 | """ | |
93 | # wait for the carbon data to be sent | |
94 | time.sleep(self._carbonInterval + 1) | |
95 | ||
96 | # check if the servers have received our data | |
97 | # we will block for a short while if the data is not already there, | |
98 | # and an exception will be raised after the timeout | |
99 | # first server | |
100 | data1 = self._carbonQueue1.get(block=True, timeout=2.0) | |
101 | # second server | |
102 | data2 = self._carbonQueue2.get(block=True, timeout=2.0) | |
103 | after = time.time() | |
104 | ||
105 | self.assertTrue(data1) | |
106 | self.assertTrue(len(data1.splitlines()) > 1) | |
107 | expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8')) | |
108 | for line in data1.splitlines(): | |
109 | self.assertTrue(line.startswith(expectedStart)) | |
110 | parts = line.split(b' ') | |
111 | self.assertEqual(len(parts), 3) | |
112 | self.assertTrue(parts[1].isdigit()) | |
113 | self.assertTrue(parts[2].isdigit()) | |
114 | self.assertTrue(int(parts[2]) <= int(after)) | |
115 | ||
116 | self.assertTrue(data2) | |
117 | self.assertTrue(len(data2.splitlines()) > 1) | |
118 | expectedStart = b"%s.%s.%s." % (self._carbonNamespace.encode('UTF8'), self._carbonServerName.encode('UTF-8'), self._carbonInstance.encode('UTF8')) | |
119 | for line in data2.splitlines(): | |
120 | self.assertTrue(line.startswith(expectedStart)) | |
121 | parts = line.split(b' ') | |
122 | self.assertEqual(len(parts), 3) | |
123 | self.assertTrue(parts[1].isdigit()) | |
124 | self.assertTrue(parts[2].isdigit()) | |
125 | self.assertTrue(int(parts[2]) <= int(after)) | |
126 | ||
127 | # make sure every carbon server has received at least one connection | |
128 | for key in self._carbonCounters: | |
129 | value = self._carbonCounters[key] | |
130 | self.assertTrue(value >= 1) | |
131 |