]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Coordinator.cc
Source Format Enforcement (#963)
[thirdparty/squid.git] / src / ipc / Coordinator.cc
CommitLineData
10cefb7b 1/*
bf95c10a 2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
10cefb7b 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10cefb7b 7 */
8
bbc27441
AJ
9/* DEBUG: section 54 Interprocess Communication */
10
f7f3304a 11#include "squid.h"
e0d28505 12#include "base/Subscription.h"
8822ebee
AR
13#include "base/TextException.h"
14#include "CacheManager.h"
0d0bce6a 15#include "comm.h"
e0d28505 16#include "comm/Connection.h"
10cefb7b 17#include "ipc/Coordinator.h"
0d0bce6a 18#include "ipc/SharedListen.h"
8822ebee
AR
19#include "mgr/Inquirer.h"
20#include "mgr/Request.h"
21#include "mgr/Response.h"
561076e2 22#include "tools.h"
f738d783 23#if SQUID_SNMP
d6e3ad20
CT
24#include "snmp/Inquirer.h"
25#include "snmp/Request.h"
26#include "snmp/Response.h"
f738d783 27#endif
1a30fdf5
AJ
28
29#include <cerrno>
10cefb7b 30
31CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
a2c48c98 32Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
10cefb7b 33
10cefb7b 34Ipc::Coordinator::Coordinator():
f53969cc 35 Port(Ipc::Port::CoordinatorAddr())
10cefb7b 36{
37}
38
39void Ipc::Coordinator::start()
40{
ba568924 41 Port::start();
10cefb7b 42}
43
1bac0258 44Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
10cefb7b 45{
8822ebee
AR
46 typedef StrandCoords::iterator SI;
47 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
10cefb7b 48 if (iter->kidId == kidId)
49 return &(*iter);
50 }
51 return NULL;
52}
53
1bac0258 54void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
10cefb7b 55{
bf95c10a 56 debugs(54, 3, "registering kid" << strand.kidId <<
254912f3 57 ' ' << strand.tag);
b04c601b
AR
58 if (StrandCoord* found = findStrand(strand.kidId)) {
59 const String oldTag = found->tag;
10cefb7b 60 *found = strand;
b04c601b
AR
61 if (oldTag.size() && !strand.tag.size())
62 found->tag = oldTag; // keep more detailed info (XXX?)
63 } else {
8822ebee 64 strands_.push_back(strand);
b04c601b 65 }
254912f3
AR
66
67 // notify searchers waiting for this new strand, if any
68 typedef Searchers::iterator SRI;
69 for (SRI i = searchers.begin(); i != searchers.end();) {
70 if (i->tag == strand.tag) {
71 notifySearcher(*i, strand);
72 i = searchers.erase(i);
e67d0508 73 } else {
254912f3 74 ++i;
e67d0508 75 }
254912f3 76 }
10cefb7b 77}
78
1bac0258 79void Ipc::Coordinator::receive(const TypedMsgHdr& message)
10cefb7b 80{
6ccfd70a
EB
81 switch (message.rawType()) {
82 case mtRegisterStrand:
bf95c10a 83 debugs(54, 6, "Registration request");
6ccfd70a 84 handleRegistrationRequest(StrandMessage(message));
10cefb7b 85 break;
86
6ccfd70a 87 case mtFindStrand: {
9a51593d 88 const StrandSearchRequest sr(message);
bf95c10a 89 debugs(54, 6, "Strand search request: " << sr.requestorId <<
9a51593d 90 " tag: " << sr.tag);
254912f3
AR
91 handleSearchRequest(sr);
92 break;
9a51593d 93 }
254912f3 94
0d0bce6a 95 case mtSharedListenRequest:
bf95c10a 96 debugs(54, 6, "Shared listen request");
0d0bce6a
AR
97 handleSharedListenRequest(SharedListenRequest(message));
98 break;
99
3595fd68 100 case mtCacheMgrRequest: {
bf95c10a 101 debugs(54, 6, "Cache manager request");
3595fd68
CT
102 const Mgr::Request req(message);
103 handleCacheMgrRequest(req);
104 }
105 break;
8822ebee 106
3595fd68 107 case mtCacheMgrResponse: {
bf95c10a 108 debugs(54, 6, "Cache manager response");
3595fd68 109 const Mgr::Response resp(message);
4c218615 110 handleCacheMgrResponse(Mine(resp));
3595fd68
CT
111 }
112 break;
8822ebee 113
f738d783 114#if SQUID_SNMP
3595fd68 115 case mtSnmpRequest: {
bf95c10a 116 debugs(54, 6, "SNMP request");
3595fd68
CT
117 const Snmp::Request req(message);
118 handleSnmpRequest(req);
119 }
120 break;
51ea0904 121
3595fd68 122 case mtSnmpResponse: {
bf95c10a 123 debugs(54, 6, "SNMP response");
3595fd68 124 const Snmp::Response resp(message);
4c218615 125 handleSnmpResponse(Mine(resp));
3595fd68
CT
126 }
127 break;
f738d783 128#endif
8822ebee 129
10cefb7b 130 default:
4c218615 131 Port::receive(message);
10cefb7b 132 break;
133 }
134}
135
6ccfd70a 136void Ipc::Coordinator::handleRegistrationRequest(const StrandMessage& msg)
10cefb7b 137{
254912f3 138 registerStrand(msg.strand);
ba568924
AR
139
140 // send back an acknowledgement; TODO: remove as not needed?
1bac0258 141 TypedMsgHdr message;
6ccfd70a 142 msg.pack(mtStrandRegistered, message);
1ee292b7 143 SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
10cefb7b 144}
a2c48c98 145
0d0bce6a
AR
146void
147Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
148{
bf95c10a 149 debugs(54, 4, "kid" << request.requestorId <<
5667a628 150 " needs shared listen FD for " << request.params.addr);
0d0bce6a
AR
151 Listeners::const_iterator i = listeners.find(request.params);
152 int errNo = 0;
e0d28505 153 const Comm::ConnectionPointer c = (i != listeners.end()) ?
dc49061a 154 i->second : openListenSocket(request, errNo);
0d0bce6a 155
bf95c10a 156 debugs(54, 3, "sending shared listen " << c << " for " <<
5667a628
AR
157 request.params.addr << " to kid" << request.requestorId <<
158 " mapId=" << request.mapId);
0d0bce6a 159
b72fb55d 160 SharedListenResponse response(c->fd, errNo, request.mapId);
0d0bce6a
AR
161 TypedMsgHdr message;
162 response.pack(message);
1ee292b7 163 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
0d0bce6a
AR
164}
165
8822ebee
AR
166void
167Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
168{
bf95c10a 169 debugs(54, 4, MYNAME);
8822ebee 170
46a7dc66
AR
171 try {
172 Mgr::Action::Pointer action =
173 CacheManager::GetInstance()->createRequestedAction(request.params);
174 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
3077b2d4 175 } catch (const std::exception &ex) {
d816f28d 176 debugs(54, DBG_IMPORTANT, "ERROR: Squid BUG: cannot aggregate mgr:" <<
46a7dc66
AR
177 request.params.actionName << ": " << ex.what());
178 // TODO: Avoid half-baked Connections or teach them how to close.
179 ::close(request.conn->fd);
180 request.conn->fd = -1;
181 return; // the worker will timeout and close
182 }
183
8822ebee
AR
184 // Let the strand know that we are now responsible for handling the request
185 Mgr::Response response(request.requestId);
186 TypedMsgHdr message;
187 response.pack(message);
1ee292b7 188 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
8822ebee 189
8822ebee
AR
190}
191
192void
193Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
194{
195 Mgr::Inquirer::HandleRemoteAck(response);
196}
197
254912f3
AR
198void
199Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
200{
201 // do we know of a strand with the given search tag?
202 const StrandCoord *strand = NULL;
203 typedef StrandCoords::const_iterator SCCI;
204 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205 if (i->tag == request.tag)
206 strand = &(*i);
207 }
208
209 if (strand) {
210 notifySearcher(request, *strand);
211 return;
e67d0508 212 }
254912f3
AR
213
214 searchers.push_back(request);
bf95c10a 215 debugs(54, 3, "cannot yet tell kid" << request.requestorId <<
9199139f 216 " who " << request.tag << " is");
254912f3
AR
217}
218
219void
220Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
221 const StrandCoord& strand)
222{
bf95c10a 223 debugs(54, 3, "tell kid" << request.requestorId << " that " <<
9199139f 224 request.tag << " is kid" << strand.kidId);
4c218615 225 const StrandMessage response(strand, request.qid);
254912f3 226 TypedMsgHdr message;
6ccfd70a 227 response.pack(mtStrandReady, message);
1ee292b7 228 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
254912f3
AR
229}
230
f738d783 231#if SQUID_SNMP
51ea0904
CT
232void
233Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
234{
bf95c10a 235 debugs(54, 4, MYNAME);
51ea0904
CT
236
237 Snmp::Response response(request.requestId);
238 TypedMsgHdr message;
239 response.pack(message);
1ee292b7 240 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
51ea0904
CT
241
242 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
243}
244
245void
246Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
247{
bf95c10a 248 debugs(54, 4, MYNAME);
51ea0904
CT
249 Snmp::Inquirer::HandleRemoteAck(response);
250}
f738d783 251#endif
254912f3 252
e0d28505 253Comm::ConnectionPointer
0d0bce6a 254Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
5667a628 255 int &errNo)
0d0bce6a
AR
256{
257 const OpenListenerParams &p = request.params;
258
bf95c10a 259 debugs(54, 6, "opening listen FD at " << p.addr << " for kid" <<
5667a628 260 request.requestorId);
0d0bce6a 261
9dca980d
AJ
262 Comm::ConnectionPointer newConn = new Comm::Connection;
263 newConn->local = p.addr; // comm_open_listener may modify it
264 newConn->flags = p.flags;
0d0bce6a
AR
265
266 enter_suid();
9dca980d
AJ
267 comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
268 errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
0d0bce6a
AR
269 leave_suid();
270
bf95c10a 271 debugs(54, 6, "tried listening on " << newConn << " for kid" <<
e0d28505
AJ
272 request.requestorId);
273
0d0bce6a 274 // cache positive results
9dca980d
AJ
275 if (Comm::IsConnOpen(newConn))
276 listeners[request.params] = newConn;
0d0bce6a 277
9dca980d 278 return newConn;
0d0bce6a
AR
279}
280
a2c48c98
AR
281void Ipc::Coordinator::broadcastSignal(int sig) const
282{
8822ebee
AR
283 typedef StrandCoords::const_iterator SCI;
284 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
bf95c10a 285 debugs(54, 5, "signal " << sig << " to kid" << iter->kidId <<
5667a628 286 ", PID=" << iter->pid);
a2c48c98
AR
287 kill(iter->pid, sig);
288 }
289}
290
291Ipc::Coordinator* Ipc::Coordinator::Instance()
292{
293 if (!TheInstance)
294 TheInstance = new Coordinator;
295 // XXX: if the Coordinator job quits, this pointer will become invalid
296 // we could make Coordinator death fatal, except during exit, but since
297 // Strands do not re-register, even process death would be pointless.
298 return TheInstance;
299}
8822ebee
AR
300
301const Ipc::StrandCoords&
302Ipc::Coordinator::strands() const
303{
304 return strands_;
305}
f53969cc 306