]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Interprocess Communication */
10
11 #include "squid.h"
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "ipc/Coordinator.h"
18 #include "ipc/SharedListen.h"
19 #include "mgr/Inquirer.h"
20 #include "mgr/Request.h"
21 #include "mgr/Response.h"
22 #include "tools.h"
23 #if SQUID_SNMP
24 #include "snmp/Inquirer.h"
25 #include "snmp/Request.h"
26 #include "snmp/Response.h"
27 #endif
28
29 #include <cerrno>
30
31 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
32 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
33
34 Ipc::Coordinator::Coordinator():
35 Port(Ipc::Port::CoordinatorAddr())
36 {
37 }
38
39 void Ipc::Coordinator::start()
40 {
41 Port::start();
42 }
43
44 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
45 {
46 typedef StrandCoords::iterator SI;
47 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
48 if (iter->kidId == kidId)
49 return &(*iter);
50 }
51 return NULL;
52 }
53
54 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
55 {
56 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
57 ' ' << strand.tag);
58 if (StrandCoord* found = findStrand(strand.kidId)) {
59 const String oldTag = found->tag;
60 *found = strand;
61 if (oldTag.size() && !strand.tag.size())
62 found->tag = oldTag; // keep more detailed info (XXX?)
63 } else {
64 strands_.push_back(strand);
65 }
66
67 // notify searchers waiting for this new strand, if any
68 typedef Searchers::iterator SRI;
69 for (SRI i = searchers.begin(); i != searchers.end();) {
70 if (i->tag == strand.tag) {
71 notifySearcher(*i, strand);
72 i = searchers.erase(i);
73 } else {
74 ++i;
75 }
76 }
77 }
78
79 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
80 {
81 switch (message.type()) {
82 case mtRegistration:
83 debugs(54, 6, HERE << "Registration request");
84 handleRegistrationRequest(HereIamMessage(message));
85 break;
86
87 case mtStrandSearchRequest: {
88 const StrandSearchRequest sr(message);
89 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
90 " tag: " << sr.tag);
91 handleSearchRequest(sr);
92 break;
93 }
94
95 case mtSharedListenRequest:
96 debugs(54, 6, HERE << "Shared listen request");
97 handleSharedListenRequest(SharedListenRequest(message));
98 break;
99
100 case mtCacheMgrRequest: {
101 debugs(54, 6, HERE << "Cache manager request");
102 const Mgr::Request req(message);
103 handleCacheMgrRequest(req);
104 }
105 break;
106
107 case mtCacheMgrResponse: {
108 debugs(54, 6, HERE << "Cache manager response");
109 const Mgr::Response resp(message);
110 handleCacheMgrResponse(resp);
111 }
112 break;
113
114 #if SQUID_SNMP
115 case mtSnmpRequest: {
116 debugs(54, 6, HERE << "SNMP request");
117 const Snmp::Request req(message);
118 handleSnmpRequest(req);
119 }
120 break;
121
122 case mtSnmpResponse: {
123 debugs(54, 6, HERE << "SNMP response");
124 const Snmp::Response resp(message);
125 handleSnmpResponse(resp);
126 }
127 break;
128 #endif
129
130 default:
131 debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
132 break;
133 }
134 }
135
136 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
137 {
138 registerStrand(msg.strand);
139
140 // send back an acknowledgement; TODO: remove as not needed?
141 TypedMsgHdr message;
142 msg.pack(message);
143 SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
144 }
145
146 void
147 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
148 {
149 debugs(54, 4, HERE << "kid" << request.requestorId <<
150 " needs shared listen FD for " << request.params.addr);
151 Listeners::const_iterator i = listeners.find(request.params);
152 int errNo = 0;
153 const Comm::ConnectionPointer c = (i != listeners.end()) ?
154 i->second : openListenSocket(request, errNo);
155
156 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
157 request.params.addr << " to kid" << request.requestorId <<
158 " mapId=" << request.mapId);
159
160 SharedListenResponse response(c->fd, errNo, request.mapId);
161 TypedMsgHdr message;
162 response.pack(message);
163 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
164 }
165
166 void
167 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
168 {
169 debugs(54, 4, HERE);
170
171 try {
172 Mgr::Action::Pointer action =
173 CacheManager::GetInstance()->createRequestedAction(request.params);
174 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
175 } catch (const std::exception &ex) {
176 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
177 request.params.actionName << ": " << ex.what());
178 // TODO: Avoid half-baked Connections or teach them how to close.
179 ::close(request.conn->fd);
180 request.conn->fd = -1;
181 return; // the worker will timeout and close
182 }
183
184 // Let the strand know that we are now responsible for handling the request
185 Mgr::Response response(request.requestId);
186 TypedMsgHdr message;
187 response.pack(message);
188 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
189
190 }
191
192 void
193 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
194 {
195 Mgr::Inquirer::HandleRemoteAck(response);
196 }
197
198 void
199 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
200 {
201 // do we know of a strand with the given search tag?
202 const StrandCoord *strand = NULL;
203 typedef StrandCoords::const_iterator SCCI;
204 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205 if (i->tag == request.tag)
206 strand = &(*i);
207 }
208
209 if (strand) {
210 notifySearcher(request, *strand);
211 return;
212 }
213
214 searchers.push_back(request);
215 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
216 " who " << request.tag << " is");
217 }
218
219 void
220 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
221 const StrandCoord& strand)
222 {
223 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
224 request.tag << " is kid" << strand.kidId);
225 const StrandSearchResponse response(strand);
226 TypedMsgHdr message;
227 response.pack(message);
228 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
229 }
230
231 #if SQUID_SNMP
232 void
233 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
234 {
235 debugs(54, 4, HERE);
236
237 Snmp::Response response(request.requestId);
238 TypedMsgHdr message;
239 response.pack(message);
240 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
241
242 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
243 }
244
245 void
246 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
247 {
248 debugs(54, 4, HERE);
249 Snmp::Inquirer::HandleRemoteAck(response);
250 }
251 #endif
252
253 Comm::ConnectionPointer
254 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
255 int &errNo)
256 {
257 const OpenListenerParams &p = request.params;
258
259 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
260 request.requestorId);
261
262 Comm::ConnectionPointer newConn = new Comm::Connection;
263 newConn->local = p.addr; // comm_open_listener may modify it
264 newConn->flags = p.flags;
265
266 enter_suid();
267 comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
268 errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
269 leave_suid();
270
271 debugs(54, 6, HERE << "tried listening on " << newConn << " for kid" <<
272 request.requestorId);
273
274 // cache positive results
275 if (Comm::IsConnOpen(newConn))
276 listeners[request.params] = newConn;
277
278 return newConn;
279 }
280
281 void Ipc::Coordinator::broadcastSignal(int sig) const
282 {
283 typedef StrandCoords::const_iterator SCI;
284 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
285 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
286 ", PID=" << iter->pid);
287 kill(iter->pid, sig);
288 }
289 }
290
291 Ipc::Coordinator* Ipc::Coordinator::Instance()
292 {
293 if (!TheInstance)
294 TheInstance = new Coordinator;
295 // XXX: if the Coordinator job quits, this pointer will become invalid
296 // we could make Coordinator death fatal, except during exit, but since
297 // Strands do not re-register, even process death would be pointless.
298 return TheInstance;
299 }
300
301 const Ipc::StrandCoords&
302 Ipc::Coordinator::strands() const
303 {
304 return strands_;
305 }
306