]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
Removed squid-old.h
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "squid.h"
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
12 #include "CacheManager.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "ipc/Coordinator.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
20 #include "protos.h"
21 #if SQUID_SNMP
22 #include "snmp/Inquirer.h"
23 #include "snmp/Request.h"
24 #include "snmp/Response.h"
25 #endif
26
27 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
28 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
29
30
31 Ipc::Coordinator::Coordinator():
32 Port(coordinatorAddr)
33 {
34 }
35
36 void Ipc::Coordinator::start()
37 {
38 Port::start();
39 }
40
41 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
42 {
43 typedef StrandCoords::iterator SI;
44 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
45 if (iter->kidId == kidId)
46 return &(*iter);
47 }
48 return NULL;
49 }
50
51 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
52 {
53 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
54 ' ' << strand.tag);
55 if (StrandCoord* found = findStrand(strand.kidId)) {
56 const String oldTag = found->tag;
57 *found = strand;
58 if (oldTag.size() && !strand.tag.size())
59 found->tag = oldTag; // keep more detailed info (XXX?)
60 } else {
61 strands_.push_back(strand);
62 }
63
64 // notify searchers waiting for this new strand, if any
65 typedef Searchers::iterator SRI;
66 for (SRI i = searchers.begin(); i != searchers.end();) {
67 if (i->tag == strand.tag) {
68 notifySearcher(*i, strand);
69 i = searchers.erase(i);
70 } else {
71 ++i;
72 }
73 }
74 }
75
76 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
77 {
78 switch (message.type()) {
79 case mtRegistration:
80 debugs(54, 6, HERE << "Registration request");
81 handleRegistrationRequest(HereIamMessage(message));
82 break;
83
84 case mtStrandSearchRequest: {
85 const StrandSearchRequest sr(message);
86 debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
87 " tag: " << sr.tag);
88 handleSearchRequest(sr);
89 break;
90 }
91
92 case mtSharedListenRequest:
93 debugs(54, 6, HERE << "Shared listen request");
94 handleSharedListenRequest(SharedListenRequest(message));
95 break;
96
97 case mtCacheMgrRequest: {
98 debugs(54, 6, HERE << "Cache manager request");
99 const Mgr::Request req(message);
100 handleCacheMgrRequest(req);
101 }
102 break;
103
104 case mtCacheMgrResponse: {
105 debugs(54, 6, HERE << "Cache manager response");
106 const Mgr::Response resp(message);
107 handleCacheMgrResponse(resp);
108 }
109 break;
110
111 #if SQUID_SNMP
112 case mtSnmpRequest: {
113 debugs(54, 6, HERE << "SNMP request");
114 const Snmp::Request req(message);
115 handleSnmpRequest(req);
116 }
117 break;
118
119 case mtSnmpResponse: {
120 debugs(54, 6, HERE << "SNMP response");
121 const Snmp::Response resp(message);
122 handleSnmpResponse(resp);
123 }
124 break;
125 #endif
126
127 default:
128 debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
129 break;
130 }
131 }
132
133 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
134 {
135 registerStrand(msg.strand);
136
137 // send back an acknowledgement; TODO: remove as not needed?
138 TypedMsgHdr message;
139 msg.pack(message);
140 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
141 }
142
143 void
144 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
145 {
146 debugs(54, 4, HERE << "kid" << request.requestorId <<
147 " needs shared listen FD for " << request.params.addr);
148 Listeners::const_iterator i = listeners.find(request.params);
149 int errNo = 0;
150 const Comm::ConnectionPointer c = (i != listeners.end()) ?
151 i->second : openListenSocket(request, errNo);
152
153 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
154 request.params.addr << " to kid" << request.requestorId <<
155 " mapId=" << request.mapId);
156
157 SharedListenResponse response(c->fd, errNo, request.mapId);
158 TypedMsgHdr message;
159 response.pack(message);
160 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
161 }
162
163 void
164 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
165 {
166 debugs(54, 4, HERE);
167
168 try {
169 Mgr::Action::Pointer action =
170 CacheManager::GetInstance()->createRequestedAction(request.params);
171 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
172 } catch (const std::exception &ex) {
173 debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
174 request.params.actionName << ": " << ex.what());
175 // TODO: Avoid half-baked Connections or teach them how to close.
176 ::close(request.conn->fd);
177 request.conn->fd = -1;
178 return; // the worker will timeout and close
179 }
180
181 // Let the strand know that we are now responsible for handling the request
182 Mgr::Response response(request.requestId);
183 TypedMsgHdr message;
184 response.pack(message);
185 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
186
187 }
188
189 void
190 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
191 {
192 Mgr::Inquirer::HandleRemoteAck(response);
193 }
194
195 void
196 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
197 {
198 // do we know of a strand with the given search tag?
199 const StrandCoord *strand = NULL;
200 typedef StrandCoords::const_iterator SCCI;
201 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
202 if (i->tag == request.tag)
203 strand = &(*i);
204 }
205
206 if (strand) {
207 notifySearcher(request, *strand);
208 return;
209 }
210
211 searchers.push_back(request);
212 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
213 " who " << request.tag << " is");
214 }
215
216 void
217 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
218 const StrandCoord& strand)
219 {
220 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
221 request.tag << " is kid" << strand.kidId);
222 const StrandSearchResponse response(strand);
223 TypedMsgHdr message;
224 response.pack(message);
225 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
226 }
227
228 #if SQUID_SNMP
229 void
230 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
231 {
232 debugs(54, 4, HERE);
233
234 Snmp::Response response(request.requestId);
235 TypedMsgHdr message;
236 response.pack(message);
237 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
238
239 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
240 }
241
242 void
243 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
244 {
245 debugs(54, 4, HERE);
246 Snmp::Inquirer::HandleRemoteAck(response);
247 }
248 #endif
249
250 Comm::ConnectionPointer
251 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
252 int &errNo)
253 {
254 const OpenListenerParams &p = request.params;
255
256 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
257 request.requestorId);
258
259 Comm::ConnectionPointer conn = new Comm::Connection;
260 conn->local = p.addr; // comm_open_listener may modify it
261 conn->flags = p.flags;
262
263 enter_suid();
264 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
265 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
266 leave_suid();
267
268 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
269 request.requestorId);
270
271 // cache positive results
272 if (Comm::IsConnOpen(conn))
273 listeners[request.params] = conn;
274
275 return conn;
276 }
277
278 void Ipc::Coordinator::broadcastSignal(int sig) const
279 {
280 typedef StrandCoords::const_iterator SCI;
281 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
282 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
283 ", PID=" << iter->pid);
284 kill(iter->pid, sig);
285 }
286 }
287
288 Ipc::Coordinator* Ipc::Coordinator::Instance()
289 {
290 if (!TheInstance)
291 TheInstance = new Coordinator;
292 // XXX: if the Coordinator job quits, this pointer will become invalid
293 // we could make Coordinator death fatal, except during exit, but since
294 // Strands do not re-register, even process death would be pointless.
295 return TheInstance;
296 }
297
298 const Ipc::StrandCoords&
299 Ipc::Coordinator::strands() const
300 {
301 return strands_;
302 }