]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Coordinator.cc
Removed squid-old.h
[thirdparty/squid.git] / src / ipc / Coordinator.cc
CommitLineData
10cefb7b 1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
f7f3304a 9#include "squid.h"
e0d28505 10#include "base/Subscription.h"
8822ebee
AR
11#include "base/TextException.h"
12#include "CacheManager.h"
0d0bce6a 13#include "comm.h"
e0d28505 14#include "comm/Connection.h"
10cefb7b 15#include "ipc/Coordinator.h"
0d0bce6a 16#include "ipc/SharedListen.h"
8822ebee
AR
17#include "mgr/Inquirer.h"
18#include "mgr/Request.h"
19#include "mgr/Response.h"
582c2af2 20#include "protos.h"
f738d783 21#if SQUID_SNMP
d6e3ad20
CT
22#include "snmp/Inquirer.h"
23#include "snmp/Request.h"
24#include "snmp/Response.h"
f738d783 25#endif
10cefb7b 26
27CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
a2c48c98 28Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
10cefb7b 29
30
31Ipc::Coordinator::Coordinator():
5667a628 32 Port(coordinatorAddr)
10cefb7b 33{
34}
35
36void Ipc::Coordinator::start()
37{
ba568924 38 Port::start();
10cefb7b 39}
40
1bac0258 41Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
10cefb7b 42{
8822ebee
AR
43 typedef StrandCoords::iterator SI;
44 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
10cefb7b 45 if (iter->kidId == kidId)
46 return &(*iter);
47 }
48 return NULL;
49}
50
1bac0258 51void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
10cefb7b 52{
254912f3
AR
53 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
54 ' ' << strand.tag);
b04c601b
AR
55 if (StrandCoord* found = findStrand(strand.kidId)) {
56 const String oldTag = found->tag;
10cefb7b 57 *found = strand;
b04c601b
AR
58 if (oldTag.size() && !strand.tag.size())
59 found->tag = oldTag; // keep more detailed info (XXX?)
60 } else {
8822ebee 61 strands_.push_back(strand);
b04c601b 62 }
254912f3
AR
63
64 // notify searchers waiting for this new strand, if any
65 typedef Searchers::iterator SRI;
66 for (SRI i = searchers.begin(); i != searchers.end();) {
67 if (i->tag == strand.tag) {
68 notifySearcher(*i, strand);
69 i = searchers.erase(i);
e67d0508 70 } else {
254912f3 71 ++i;
e67d0508 72 }
254912f3 73 }
10cefb7b 74}
75
1bac0258 76void Ipc::Coordinator::receive(const TypedMsgHdr& message)
10cefb7b 77{
78 switch (message.type()) {
ba568924 79 case mtRegistration:
10cefb7b 80 debugs(54, 6, HERE << "Registration request");
254912f3 81 handleRegistrationRequest(HereIamMessage(message));
10cefb7b 82 break;
83
9a51593d
DK
84 case mtStrandSearchRequest: {
85 const StrandSearchRequest sr(message);
86 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
87 " tag: " << sr.tag);
254912f3
AR
88 handleSearchRequest(sr);
89 break;
9a51593d 90 }
254912f3 91
0d0bce6a
AR
92 case mtSharedListenRequest:
93 debugs(54, 6, HERE << "Shared listen request");
94 handleSharedListenRequest(SharedListenRequest(message));
95 break;
96
3595fd68 97 case mtCacheMgrRequest: {
8822ebee 98 debugs(54, 6, HERE << "Cache manager request");
3595fd68
CT
99 const Mgr::Request req(message);
100 handleCacheMgrRequest(req);
101 }
102 break;
8822ebee 103
3595fd68 104 case mtCacheMgrResponse: {
8822ebee 105 debugs(54, 6, HERE << "Cache manager response");
3595fd68
CT
106 const Mgr::Response resp(message);
107 handleCacheMgrResponse(resp);
108 }
109 break;
8822ebee 110
f738d783 111#if SQUID_SNMP
3595fd68 112 case mtSnmpRequest: {
51ea0904 113 debugs(54, 6, HERE << "SNMP request");
3595fd68
CT
114 const Snmp::Request req(message);
115 handleSnmpRequest(req);
116 }
117 break;
51ea0904 118
3595fd68 119 case mtSnmpResponse: {
51ea0904 120 debugs(54, 6, HERE << "SNMP response");
3595fd68
CT
121 const Snmp::Response resp(message);
122 handleSnmpResponse(resp);
123 }
124 break;
f738d783 125#endif
8822ebee 126
10cefb7b 127 default:
e0236918 128 debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
10cefb7b 129 break;
130 }
131}
132
254912f3 133void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
10cefb7b 134{
254912f3 135 registerStrand(msg.strand);
ba568924
AR
136
137 // send back an acknowledgement; TODO: remove as not needed?
1bac0258 138 TypedMsgHdr message;
254912f3
AR
139 msg.pack(message);
140 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
10cefb7b 141}
a2c48c98 142
0d0bce6a
AR
143void
144Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
145{
146 debugs(54, 4, HERE << "kid" << request.requestorId <<
5667a628 147 " needs shared listen FD for " << request.params.addr);
0d0bce6a
AR
148 Listeners::const_iterator i = listeners.find(request.params);
149 int errNo = 0;
e0d28505 150 const Comm::ConnectionPointer c = (i != listeners.end()) ?
dc49061a 151 i->second : openListenSocket(request, errNo);
0d0bce6a 152
e0d28505 153 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
5667a628
AR
154 request.params.addr << " to kid" << request.requestorId <<
155 " mapId=" << request.mapId);
0d0bce6a 156
b72fb55d 157 SharedListenResponse response(c->fd, errNo, request.mapId);
0d0bce6a
AR
158 TypedMsgHdr message;
159 response.pack(message);
160 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
161}
162
8822ebee
AR
163void
164Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
165{
166 debugs(54, 4, HERE);
167
46a7dc66
AR
168 try {
169 Mgr::Action::Pointer action =
170 CacheManager::GetInstance()->createRequestedAction(request.params);
171 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
3077b2d4 172 } catch (const std::exception &ex) {
46a7dc66
AR
173 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
174 request.params.actionName << ": " << ex.what());
175 // TODO: Avoid half-baked Connections or teach them how to close.
176 ::close(request.conn->fd);
177 request.conn->fd = -1;
178 return; // the worker will timeout and close
179 }
180
8822ebee
AR
181 // Let the strand know that we are now responsible for handling the request
182 Mgr::Response response(request.requestId);
183 TypedMsgHdr message;
184 response.pack(message);
185 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
186
8822ebee
AR
187}
188
189void
190Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
191{
192 Mgr::Inquirer::HandleRemoteAck(response);
193}
194
254912f3
AR
195void
196Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
197{
198 // do we know of a strand with the given search tag?
199 const StrandCoord *strand = NULL;
200 typedef StrandCoords::const_iterator SCCI;
201 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
202 if (i->tag == request.tag)
203 strand = &(*i);
204 }
205
206 if (strand) {
207 notifySearcher(request, *strand);
208 return;
e67d0508 209 }
254912f3
AR
210
211 searchers.push_back(request);
212 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
9199139f 213 " who " << request.tag << " is");
254912f3
AR
214}
215
216void
217Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
218 const StrandCoord& strand)
219{
220 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
9199139f 221 request.tag << " is kid" << strand.kidId);
b2aa0934 222 const StrandSearchResponse response(strand);
254912f3 223 TypedMsgHdr message;
9a51593d 224 response.pack(message);
254912f3
AR
225 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
226}
227
f738d783 228#if SQUID_SNMP
51ea0904
CT
229void
230Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
231{
232 debugs(54, 4, HERE);
233
234 Snmp::Response response(request.requestId);
235 TypedMsgHdr message;
236 response.pack(message);
237 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
238
239 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
240}
241
242void
243Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
244{
245 debugs(54, 4, HERE);
246 Snmp::Inquirer::HandleRemoteAck(response);
247}
f738d783 248#endif
254912f3 249
e0d28505 250Comm::ConnectionPointer
0d0bce6a 251Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
5667a628 252 int &errNo)
0d0bce6a
AR
253{
254 const OpenListenerParams &p = request.params;
255
256 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
5667a628 257 request.requestorId);
0d0bce6a 258
e0d28505
AJ
259 Comm::ConnectionPointer conn = new Comm::Connection;
260 conn->local = p.addr; // comm_open_listener may modify it
261 conn->flags = p.flags;
0d0bce6a
AR
262
263 enter_suid();
e0d28505
AJ
264 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
265 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
0d0bce6a
AR
266 leave_suid();
267
e0d28505
AJ
268 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
269 request.requestorId);
270
0d0bce6a 271 // cache positive results
e0d28505
AJ
272 if (Comm::IsConnOpen(conn))
273 listeners[request.params] = conn;
0d0bce6a 274
e0d28505 275 return conn;
0d0bce6a
AR
276}
277
a2c48c98
AR
278void Ipc::Coordinator::broadcastSignal(int sig) const
279{
8822ebee
AR
280 typedef StrandCoords::const_iterator SCI;
281 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
a2c48c98 282 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
5667a628 283 ", PID=" << iter->pid);
a2c48c98
AR
284 kill(iter->pid, sig);
285 }
286}
287
288Ipc::Coordinator* Ipc::Coordinator::Instance()
289{
290 if (!TheInstance)
291 TheInstance = new Coordinator;
292 // XXX: if the Coordinator job quits, this pointer will become invalid
293 // we could make Coordinator death fatal, except during exit, but since
294 // Strands do not re-register, even process death would be pointless.
295 return TheInstance;
296}
8822ebee
AR
297
298const Ipc::StrandCoords&
299Ipc::Coordinator::strands() const
300{
301 return strands_;
302}