]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
Renamed squid.h to squid-old.h and config.h to squid.h
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "squid.h"
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
12 #include "CacheManager.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "ipc/Coordinator.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
20 #if SQUID_SNMP
21 #include "snmp/Inquirer.h"
22 #include "snmp/Request.h"
23 #include "snmp/Response.h"
24 #endif
25
26 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
27 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
28
29
30 Ipc::Coordinator::Coordinator():
31 Port(coordinatorAddr)
32 {
33 }
34
35 void Ipc::Coordinator::start()
36 {
37 Port::start();
38 }
39
40 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
41 {
42 typedef StrandCoords::iterator SI;
43 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
44 if (iter->kidId == kidId)
45 return &(*iter);
46 }
47 return NULL;
48 }
49
50 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
51 {
52 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
53 ' ' << strand.tag);
54 if (StrandCoord* found = findStrand(strand.kidId)) {
55 const String oldTag = found->tag;
56 *found = strand;
57 if (oldTag.size() && !strand.tag.size())
58 found->tag = oldTag; // keep more detailed info (XXX?)
59 } else {
60 strands_.push_back(strand);
61 }
62
63 // notify searchers waiting for this new strand, if any
64 typedef Searchers::iterator SRI;
65 for (SRI i = searchers.begin(); i != searchers.end();) {
66 if (i->tag == strand.tag) {
67 notifySearcher(*i, strand);
68 i = searchers.erase(i);
69 } else {
70 ++i;
71 }
72 }
73 }
74
75 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
76 {
77 switch (message.type()) {
78 case mtRegistration:
79 debugs(54, 6, HERE << "Registration request");
80 handleRegistrationRequest(HereIamMessage(message));
81 break;
82
83 case mtStrandSearchRequest: {
84 const StrandSearchRequest sr(message);
85 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
86 " tag: " << sr.tag);
87 handleSearchRequest(sr);
88 break;
89 }
90
91 case mtSharedListenRequest:
92 debugs(54, 6, HERE << "Shared listen request");
93 handleSharedListenRequest(SharedListenRequest(message));
94 break;
95
96 case mtCacheMgrRequest: {
97 debugs(54, 6, HERE << "Cache manager request");
98 const Mgr::Request req(message);
99 handleCacheMgrRequest(req);
100 }
101 break;
102
103 case mtCacheMgrResponse: {
104 debugs(54, 6, HERE << "Cache manager response");
105 const Mgr::Response resp(message);
106 handleCacheMgrResponse(resp);
107 }
108 break;
109
110 #if SQUID_SNMP
111 case mtSnmpRequest: {
112 debugs(54, 6, HERE << "SNMP request");
113 const Snmp::Request req(message);
114 handleSnmpRequest(req);
115 }
116 break;
117
118 case mtSnmpResponse: {
119 debugs(54, 6, HERE << "SNMP response");
120 const Snmp::Response resp(message);
121 handleSnmpResponse(resp);
122 }
123 break;
124 #endif
125
126 default:
127 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
128 break;
129 }
130 }
131
132 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
133 {
134 registerStrand(msg.strand);
135
136 // send back an acknowledgement; TODO: remove as not needed?
137 TypedMsgHdr message;
138 msg.pack(message);
139 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
140 }
141
142 void
143 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
144 {
145 debugs(54, 4, HERE << "kid" << request.requestorId <<
146 " needs shared listen FD for " << request.params.addr);
147 Listeners::const_iterator i = listeners.find(request.params);
148 int errNo = 0;
149 const Comm::ConnectionPointer c = (i != listeners.end()) ?
150 i->second : openListenSocket(request, errNo);
151
152 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
153 request.params.addr << " to kid" << request.requestorId <<
154 " mapId=" << request.mapId);
155
156 SharedListenResponse response(c->fd, errNo, request.mapId);
157 TypedMsgHdr message;
158 response.pack(message);
159 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
160 }
161
162 void
163 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
164 {
165 debugs(54, 4, HERE);
166
167 try {
168 Mgr::Action::Pointer action =
169 CacheManager::GetInstance()->createRequestedAction(request.params);
170 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
171 } catch (const std::exception &ex) {
172 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
173 request.params.actionName << ": " << ex.what());
174 // TODO: Avoid half-baked Connections or teach them how to close.
175 ::close(request.conn->fd);
176 request.conn->fd = -1;
177 return; // the worker will timeout and close
178 }
179
180 // Let the strand know that we are now responsible for handling the request
181 Mgr::Response response(request.requestId);
182 TypedMsgHdr message;
183 response.pack(message);
184 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
185
186 }
187
188 void
189 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
190 {
191 Mgr::Inquirer::HandleRemoteAck(response);
192 }
193
194 void
195 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
196 {
197 // do we know of a strand with the given search tag?
198 const StrandCoord *strand = NULL;
199 typedef StrandCoords::const_iterator SCCI;
200 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
201 if (i->tag == request.tag)
202 strand = &(*i);
203 }
204
205 if (strand) {
206 notifySearcher(request, *strand);
207 return;
208 }
209
210 searchers.push_back(request);
211 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
212 " who " << request.tag << " is");
213 }
214
215 void
216 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
217 const StrandCoord& strand)
218 {
219 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
220 request.tag << " is kid" << strand.kidId);
221 const StrandSearchResponse response(strand);
222 TypedMsgHdr message;
223 response.pack(message);
224 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
225 }
226
227 #if SQUID_SNMP
228 void
229 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
230 {
231 debugs(54, 4, HERE);
232
233 Snmp::Response response(request.requestId);
234 TypedMsgHdr message;
235 response.pack(message);
236 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
237
238 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
239 }
240
241 void
242 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
243 {
244 debugs(54, 4, HERE);
245 Snmp::Inquirer::HandleRemoteAck(response);
246 }
247 #endif
248
249 Comm::ConnectionPointer
250 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
251 int &errNo)
252 {
253 const OpenListenerParams &p = request.params;
254
255 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
256 request.requestorId);
257
258 Comm::ConnectionPointer conn = new Comm::Connection;
259 conn->local = p.addr; // comm_open_listener may modify it
260 conn->flags = p.flags;
261
262 enter_suid();
263 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
264 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
265 leave_suid();
266
267 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
268 request.requestorId);
269
270 // cache positive results
271 if (Comm::IsConnOpen(conn))
272 listeners[request.params] = conn;
273
274 return conn;
275 }
276
277 void Ipc::Coordinator::broadcastSignal(int sig) const
278 {
279 typedef StrandCoords::const_iterator SCI;
280 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
281 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
282 ", PID=" << iter->pid);
283 kill(iter->pid, sig);
284 }
285 }
286
287 Ipc::Coordinator* Ipc::Coordinator::Instance()
288 {
289 if (!TheInstance)
290 TheInstance = new Coordinator;
291 // XXX: if the Coordinator job quits, this pointer will become invalid
292 // we could make Coordinator death fatal, except during exit, but since
293 // Strands do not re-register, even process death would be pointless.
294 return TheInstance;
295 }
296
297 const Ipc::StrandCoords&
298 Ipc::Coordinator::strands() const
299 {
300 return strands_;
301 }