]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Coordinator.cc
Temporary fix: Avoid killing Coordinator with unregistered cache mgr actions
[thirdparty/squid.git] / src / ipc / Coordinator.cc
CommitLineData
10cefb7b 1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9#include "config.h"
e0d28505 10#include "base/Subscription.h"
8822ebee
AR
11#include "base/TextException.h"
12#include "CacheManager.h"
0d0bce6a 13#include "comm.h"
e0d28505 14#include "comm/Connection.h"
10cefb7b 15#include "ipc/Coordinator.h"
0d0bce6a 16#include "ipc/SharedListen.h"
8822ebee
AR
17#include "mgr/Inquirer.h"
18#include "mgr/Request.h"
19#include "mgr/Response.h"
f738d783 20#if SQUID_SNMP
d6e3ad20
CT
21#include "snmp/Inquirer.h"
22#include "snmp/Request.h"
23#include "snmp/Response.h"
f738d783 24#endif
10cefb7b 25
26CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
a2c48c98 27Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
10cefb7b 28
29
30Ipc::Coordinator::Coordinator():
5667a628 31 Port(coordinatorAddr)
10cefb7b 32{
33}
34
35void Ipc::Coordinator::start()
36{
ba568924 37 Port::start();
10cefb7b 38}
39
1bac0258 40Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
10cefb7b 41{
8822ebee
AR
42 typedef StrandCoords::iterator SI;
43 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
10cefb7b 44 if (iter->kidId == kidId)
45 return &(*iter);
46 }
47 return NULL;
48}
49
1bac0258 50void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
10cefb7b 51{
254912f3
AR
52 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
53 ' ' << strand.tag);
b04c601b
AR
54 if (StrandCoord* found = findStrand(strand.kidId)) {
55 const String oldTag = found->tag;
10cefb7b 56 *found = strand;
b04c601b
AR
57 if (oldTag.size() && !strand.tag.size())
58 found->tag = oldTag; // keep more detailed info (XXX?)
59 } else {
8822ebee 60 strands_.push_back(strand);
b04c601b 61 }
254912f3
AR
62
63 // notify searchers waiting for this new strand, if any
64 typedef Searchers::iterator SRI;
65 for (SRI i = searchers.begin(); i != searchers.end();) {
66 if (i->tag == strand.tag) {
67 notifySearcher(*i, strand);
68 i = searchers.erase(i);
e67d0508 69 } else {
254912f3 70 ++i;
e67d0508 71 }
254912f3 72 }
10cefb7b 73}
74
1bac0258 75void Ipc::Coordinator::receive(const TypedMsgHdr& message)
10cefb7b 76{
77 switch (message.type()) {
ba568924 78 case mtRegistration:
10cefb7b 79 debugs(54, 6, HERE << "Registration request");
254912f3 80 handleRegistrationRequest(HereIamMessage(message));
10cefb7b 81 break;
82
9a51593d
DK
83 case mtStrandSearchRequest: {
84 const StrandSearchRequest sr(message);
85 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
86 " tag: " << sr.tag);
254912f3
AR
87 handleSearchRequest(sr);
88 break;
9a51593d 89 }
254912f3 90
0d0bce6a
AR
91 case mtSharedListenRequest:
92 debugs(54, 6, HERE << "Shared listen request");
93 handleSharedListenRequest(SharedListenRequest(message));
94 break;
95
3595fd68 96 case mtCacheMgrRequest: {
8822ebee 97 debugs(54, 6, HERE << "Cache manager request");
3595fd68
CT
98 const Mgr::Request req(message);
99 handleCacheMgrRequest(req);
100 }
101 break;
8822ebee 102
3595fd68 103 case mtCacheMgrResponse: {
8822ebee 104 debugs(54, 6, HERE << "Cache manager response");
3595fd68
CT
105 const Mgr::Response resp(message);
106 handleCacheMgrResponse(resp);
107 }
108 break;
8822ebee 109
f738d783 110#if SQUID_SNMP
3595fd68 111 case mtSnmpRequest: {
51ea0904 112 debugs(54, 6, HERE << "SNMP request");
3595fd68
CT
113 const Snmp::Request req(message);
114 handleSnmpRequest(req);
115 }
116 break;
51ea0904 117
3595fd68 118 case mtSnmpResponse: {
51ea0904 119 debugs(54, 6, HERE << "SNMP response");
3595fd68
CT
120 const Snmp::Response resp(message);
121 handleSnmpResponse(resp);
122 }
123 break;
f738d783 124#endif
8822ebee 125
10cefb7b 126 default:
7230e9c7 127 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
10cefb7b 128 break;
129 }
130}
131
254912f3 132void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
10cefb7b 133{
254912f3 134 registerStrand(msg.strand);
ba568924
AR
135
136 // send back an acknowledgement; TODO: remove as not needed?
1bac0258 137 TypedMsgHdr message;
254912f3
AR
138 msg.pack(message);
139 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
10cefb7b 140}
a2c48c98 141
0d0bce6a
AR
142void
143Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
144{
145 debugs(54, 4, HERE << "kid" << request.requestorId <<
5667a628 146 " needs shared listen FD for " << request.params.addr);
0d0bce6a
AR
147 Listeners::const_iterator i = listeners.find(request.params);
148 int errNo = 0;
e0d28505 149 const Comm::ConnectionPointer c = (i != listeners.end()) ?
dc49061a 150 i->second : openListenSocket(request, errNo);
0d0bce6a 151
e0d28505 152 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
5667a628
AR
153 request.params.addr << " to kid" << request.requestorId <<
154 " mapId=" << request.mapId);
0d0bce6a 155
b72fb55d 156 SharedListenResponse response(c->fd, errNo, request.mapId);
0d0bce6a
AR
157 TypedMsgHdr message;
158 response.pack(message);
159 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
160}
161
8822ebee
AR
162void
163Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
164{
165 debugs(54, 4, HERE);
166
46a7dc66
AR
167 try {
168 Mgr::Action::Pointer action =
169 CacheManager::GetInstance()->createRequestedAction(request.params);
170 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
171 }
172 catch (const std::exception &ex) {
173 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
174 request.params.actionName << ": " << ex.what());
175 // TODO: Avoid half-baked Connections or teach them how to close.
176 ::close(request.conn->fd);
177 request.conn->fd = -1;
178 return; // the worker will timeout and close
179 }
180
8822ebee
AR
181 // Let the strand know that we are now responsible for handling the request
182 Mgr::Response response(request.requestId);
183 TypedMsgHdr message;
184 response.pack(message);
185 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
186
8822ebee
AR
187}
188
189void
190Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
191{
192 Mgr::Inquirer::HandleRemoteAck(response);
193}
194
254912f3
AR
195void
196Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
197{
198 // do we know of a strand with the given search tag?
199 const StrandCoord *strand = NULL;
200 typedef StrandCoords::const_iterator SCCI;
201 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
202 if (i->tag == request.tag)
203 strand = &(*i);
204 }
205
206 if (strand) {
207 notifySearcher(request, *strand);
208 return;
e67d0508 209 }
254912f3
AR
210
211 searchers.push_back(request);
212 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
9199139f 213 " who " << request.tag << " is");
254912f3
AR
214}
215
216void
217Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
218 const StrandCoord& strand)
219{
220 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
9199139f 221 request.tag << " is kid" << strand.kidId);
b2aa0934 222 const StrandSearchResponse response(strand);
254912f3 223 TypedMsgHdr message;
9a51593d 224 response.pack(message);
254912f3
AR
225 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
226}
227
f738d783 228#if SQUID_SNMP
51ea0904
CT
229void
230Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
231{
232 debugs(54, 4, HERE);
233
234 Snmp::Response response(request.requestId);
235 TypedMsgHdr message;
236 response.pack(message);
237 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
238
239 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
240}
241
242void
243Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
244{
245 debugs(54, 4, HERE);
246 Snmp::Inquirer::HandleRemoteAck(response);
247}
f738d783 248#endif
254912f3 249
e0d28505 250Comm::ConnectionPointer
0d0bce6a 251Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
5667a628 252 int &errNo)
0d0bce6a
AR
253{
254 const OpenListenerParams &p = request.params;
255
256 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
5667a628 257 request.requestorId);
0d0bce6a 258
e0d28505
AJ
259 Comm::ConnectionPointer conn = new Comm::Connection;
260 conn->local = p.addr; // comm_open_listener may modify it
261 conn->flags = p.flags;
0d0bce6a
AR
262
263 enter_suid();
e0d28505
AJ
264 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
265 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
0d0bce6a
AR
266 leave_suid();
267
e0d28505
AJ
268 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
269 request.requestorId);
270
0d0bce6a 271 // cache positive results
e0d28505
AJ
272 if (Comm::IsConnOpen(conn))
273 listeners[request.params] = conn;
0d0bce6a 274
e0d28505 275 return conn;
0d0bce6a
AR
276}
277
a2c48c98
AR
278void Ipc::Coordinator::broadcastSignal(int sig) const
279{
8822ebee
AR
280 typedef StrandCoords::const_iterator SCI;
281 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
a2c48c98 282 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
5667a628 283 ", PID=" << iter->pid);
a2c48c98
AR
284 kill(iter->pid, sig);
285 }
286}
287
288Ipc::Coordinator* Ipc::Coordinator::Instance()
289{
290 if (!TheInstance)
291 TheInstance = new Coordinator;
292 // XXX: if the Coordinator job quits, this pointer will become invalid
293 // we could make Coordinator death fatal, except during exit, but since
294 // Strands do not re-register, even process death would be pointless.
295 return TheInstance;
296}
8822ebee
AR
297
298const Ipc::StrandCoords&
299Ipc::Coordinator::strands() const
300{
301 return strands_;
302}