]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Coordinator.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Coordinator.cc
CommitLineData
10cefb7b 1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
f7f3304a 8#include "squid.h"
e0d28505 9#include "base/Subscription.h"
8822ebee
AR
10#include "base/TextException.h"
11#include "CacheManager.h"
0d0bce6a 12#include "comm.h"
e0d28505 13#include "comm/Connection.h"
10cefb7b 14#include "ipc/Coordinator.h"
0d0bce6a 15#include "ipc/SharedListen.h"
8822ebee
AR
16#include "mgr/Inquirer.h"
17#include "mgr/Request.h"
18#include "mgr/Response.h"
582c2af2 19#include "protos.h"
f738d783 20#if SQUID_SNMP
d6e3ad20
CT
21#include "snmp/Inquirer.h"
22#include "snmp/Request.h"
23#include "snmp/Response.h"
f738d783 24#endif
21d845b1
FC
25#if HAVE_ERRNO_H
26#include <errno.h>
27#endif
10cefb7b 28
29CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
a2c48c98 30Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
10cefb7b 31
10cefb7b 32Ipc::Coordinator::Coordinator():
5667a628 33 Port(coordinatorAddr)
10cefb7b 34{
35}
36
37void Ipc::Coordinator::start()
38{
ba568924 39 Port::start();
10cefb7b 40}
41
1bac0258 42Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
10cefb7b 43{
8822ebee
AR
44 typedef StrandCoords::iterator SI;
45 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
10cefb7b 46 if (iter->kidId == kidId)
47 return &(*iter);
48 }
49 return NULL;
50}
51
1bac0258 52void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
10cefb7b 53{
254912f3
AR
54 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
55 ' ' << strand.tag);
b04c601b
AR
56 if (StrandCoord* found = findStrand(strand.kidId)) {
57 const String oldTag = found->tag;
10cefb7b 58 *found = strand;
b04c601b
AR
59 if (oldTag.size() && !strand.tag.size())
60 found->tag = oldTag; // keep more detailed info (XXX?)
61 } else {
8822ebee 62 strands_.push_back(strand);
b04c601b 63 }
254912f3
AR
64
65 // notify searchers waiting for this new strand, if any
66 typedef Searchers::iterator SRI;
67 for (SRI i = searchers.begin(); i != searchers.end();) {
68 if (i->tag == strand.tag) {
69 notifySearcher(*i, strand);
70 i = searchers.erase(i);
e67d0508 71 } else {
254912f3 72 ++i;
e67d0508 73 }
254912f3 74 }
10cefb7b 75}
76
1bac0258 77void Ipc::Coordinator::receive(const TypedMsgHdr& message)
10cefb7b 78{
79 switch (message.type()) {
ba568924 80 case mtRegistration:
10cefb7b 81 debugs(54, 6, HERE << "Registration request");
254912f3 82 handleRegistrationRequest(HereIamMessage(message));
10cefb7b 83 break;
84
9a51593d
DK
85 case mtStrandSearchRequest: {
86 const StrandSearchRequest sr(message);
87 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
88 " tag: " << sr.tag);
254912f3
AR
89 handleSearchRequest(sr);
90 break;
9a51593d 91 }
254912f3 92
0d0bce6a
AR
93 case mtSharedListenRequest:
94 debugs(54, 6, HERE << "Shared listen request");
95 handleSharedListenRequest(SharedListenRequest(message));
96 break;
97
3595fd68 98 case mtCacheMgrRequest: {
8822ebee 99 debugs(54, 6, HERE << "Cache manager request");
3595fd68
CT
100 const Mgr::Request req(message);
101 handleCacheMgrRequest(req);
102 }
103 break;
8822ebee 104
3595fd68 105 case mtCacheMgrResponse: {
8822ebee 106 debugs(54, 6, HERE << "Cache manager response");
3595fd68
CT
107 const Mgr::Response resp(message);
108 handleCacheMgrResponse(resp);
109 }
110 break;
8822ebee 111
f738d783 112#if SQUID_SNMP
3595fd68 113 case mtSnmpRequest: {
51ea0904 114 debugs(54, 6, HERE << "SNMP request");
3595fd68
CT
115 const Snmp::Request req(message);
116 handleSnmpRequest(req);
117 }
118 break;
51ea0904 119
3595fd68 120 case mtSnmpResponse: {
51ea0904 121 debugs(54, 6, HERE << "SNMP response");
3595fd68
CT
122 const Snmp::Response resp(message);
123 handleSnmpResponse(resp);
124 }
125 break;
f738d783 126#endif
8822ebee 127
10cefb7b 128 default:
e0236918 129 debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
10cefb7b 130 break;
131 }
132}
133
254912f3 134void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
10cefb7b 135{
254912f3 136 registerStrand(msg.strand);
ba568924
AR
137
138 // send back an acknowledgement; TODO: remove as not needed?
1bac0258 139 TypedMsgHdr message;
254912f3
AR
140 msg.pack(message);
141 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
10cefb7b 142}
a2c48c98 143
0d0bce6a
AR
144void
145Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
146{
147 debugs(54, 4, HERE << "kid" << request.requestorId <<
5667a628 148 " needs shared listen FD for " << request.params.addr);
0d0bce6a
AR
149 Listeners::const_iterator i = listeners.find(request.params);
150 int errNo = 0;
e0d28505 151 const Comm::ConnectionPointer c = (i != listeners.end()) ?
dc49061a 152 i->second : openListenSocket(request, errNo);
0d0bce6a 153
e0d28505 154 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
5667a628
AR
155 request.params.addr << " to kid" << request.requestorId <<
156 " mapId=" << request.mapId);
0d0bce6a 157
b72fb55d 158 SharedListenResponse response(c->fd, errNo, request.mapId);
0d0bce6a
AR
159 TypedMsgHdr message;
160 response.pack(message);
161 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
162}
163
8822ebee
AR
164void
165Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
166{
167 debugs(54, 4, HERE);
168
46a7dc66
AR
169 try {
170 Mgr::Action::Pointer action =
171 CacheManager::GetInstance()->createRequestedAction(request.params);
172 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
3077b2d4 173 } catch (const std::exception &ex) {
46a7dc66
AR
174 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
175 request.params.actionName << ": " << ex.what());
176 // TODO: Avoid half-baked Connections or teach them how to close.
177 ::close(request.conn->fd);
178 request.conn->fd = -1;
179 return; // the worker will timeout and close
180 }
181
8822ebee
AR
182 // Let the strand know that we are now responsible for handling the request
183 Mgr::Response response(request.requestId);
184 TypedMsgHdr message;
185 response.pack(message);
186 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
187
8822ebee
AR
188}
189
190void
191Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
192{
193 Mgr::Inquirer::HandleRemoteAck(response);
194}
195
254912f3
AR
196void
197Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
198{
199 // do we know of a strand with the given search tag?
200 const StrandCoord *strand = NULL;
201 typedef StrandCoords::const_iterator SCCI;
202 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
203 if (i->tag == request.tag)
204 strand = &(*i);
205 }
206
207 if (strand) {
208 notifySearcher(request, *strand);
209 return;
e67d0508 210 }
254912f3
AR
211
212 searchers.push_back(request);
213 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
9199139f 214 " who " << request.tag << " is");
254912f3
AR
215}
216
217void
218Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
219 const StrandCoord& strand)
220{
221 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
9199139f 222 request.tag << " is kid" << strand.kidId);
b2aa0934 223 const StrandSearchResponse response(strand);
254912f3 224 TypedMsgHdr message;
9a51593d 225 response.pack(message);
254912f3
AR
226 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
227}
228
f738d783 229#if SQUID_SNMP
51ea0904
CT
230void
231Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
232{
233 debugs(54, 4, HERE);
234
235 Snmp::Response response(request.requestId);
236 TypedMsgHdr message;
237 response.pack(message);
238 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
239
240 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
241}
242
243void
244Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
245{
246 debugs(54, 4, HERE);
247 Snmp::Inquirer::HandleRemoteAck(response);
248}
f738d783 249#endif
254912f3 250
e0d28505 251Comm::ConnectionPointer
0d0bce6a 252Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
5667a628 253 int &errNo)
0d0bce6a
AR
254{
255 const OpenListenerParams &p = request.params;
256
257 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
5667a628 258 request.requestorId);
0d0bce6a 259
e0d28505
AJ
260 Comm::ConnectionPointer conn = new Comm::Connection;
261 conn->local = p.addr; // comm_open_listener may modify it
262 conn->flags = p.flags;
0d0bce6a
AR
263
264 enter_suid();
e0d28505
AJ
265 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
266 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
0d0bce6a
AR
267 leave_suid();
268
e0d28505
AJ
269 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
270 request.requestorId);
271
0d0bce6a 272 // cache positive results
e0d28505
AJ
273 if (Comm::IsConnOpen(conn))
274 listeners[request.params] = conn;
0d0bce6a 275
e0d28505 276 return conn;
0d0bce6a
AR
277}
278
a2c48c98
AR
279void Ipc::Coordinator::broadcastSignal(int sig) const
280{
8822ebee
AR
281 typedef StrandCoords::const_iterator SCI;
282 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
a2c48c98 283 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
5667a628 284 ", PID=" << iter->pid);
a2c48c98
AR
285 kill(iter->pid, sig);
286 }
287}
288
289Ipc::Coordinator* Ipc::Coordinator::Instance()
290{
291 if (!TheInstance)
292 TheInstance = new Coordinator;
293 // XXX: if the Coordinator job quits, this pointer will become invalid
294 // we could make Coordinator death fatal, except during exit, but since
295 // Strands do not re-register, even process death would be pointless.
296 return TheInstance;
297}
8822ebee
AR
298
299const Ipc::StrandCoords&
300Ipc::Coordinator::strands() const
301{
302 return strands_;
303}