]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
Merge 3p2-rock.
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "config.h"
10 #include "base/TextException.h"
11 #include "CacheManager.h"
12 #include "comm.h"
13 #include "ipc/Coordinator.h"
14 #include "ipc/FdNotes.h"
15 #include "ipc/SharedListen.h"
16 #include "mgr/Inquirer.h"
17 #include "mgr/Request.h"
18 #include "mgr/Response.h"
19 #include "mgr/StoreToCommWriter.h"
20 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
21
22
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
24 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
25
26
27 Ipc::Coordinator::Coordinator():
28 Port(coordinatorAddr)
29 {
30 }
31
32 void Ipc::Coordinator::start()
33 {
34 Port::start();
35 }
36
37 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
38 {
39 typedef StrandCoords::iterator SI;
40 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
41 if (iter->kidId == kidId)
42 return &(*iter);
43 }
44 return NULL;
45 }
46
47 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
48 {
49 debugs(54, 3, HERE << "registering kid" << strand.kidId <<
50 ' ' << strand.tag);
51 if (StrandCoord* found = findStrand(strand.kidId))
52 *found = strand;
53 else
54 strands_.push_back(strand);
55
56 // notify searchers waiting for this new strand, if any
57 typedef Searchers::iterator SRI;
58 for (SRI i = searchers.begin(); i != searchers.end();) {
59 if (i->tag == strand.tag) {
60 notifySearcher(*i, strand);
61 i = searchers.erase(i);
62 } else {
63 ++i;
64 }
65 }
66 }
67
68 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
69 {
70 switch (message.type()) {
71 case mtRegistration:
72 debugs(54, 6, HERE << "Registration request");
73 handleRegistrationRequest(HereIamMessage(message));
74 break;
75
76 case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
77 IpcIoRequest io(message);
78 StrandSearchRequest sr;
79 sr.requestorId = io.requestorId;
80 sr.requestId = io.requestId;
81 sr.tag.limitInit(io.buf, io.len);
82 debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
83 handleSearchRequest(sr);
84 break;
85 }
86
87 case mtSharedListenRequest:
88 debugs(54, 6, HERE << "Shared listen request");
89 handleSharedListenRequest(SharedListenRequest(message));
90 break;
91
92 case mtCacheMgrRequest:
93 debugs(54, 6, HERE << "Cache manager request");
94 handleCacheMgrRequest(Mgr::Request(message));
95 break;
96
97 case mtCacheMgrResponse:
98 debugs(54, 6, HERE << "Cache manager response");
99 handleCacheMgrResponse(Mgr::Response(message));
100 break;
101
102 default:
103 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
104 break;
105 }
106 }
107
108 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
109 {
110 registerStrand(msg.strand);
111
112 // send back an acknowledgement; TODO: remove as not needed?
113 TypedMsgHdr message;
114 msg.pack(message);
115 SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
116 }
117
118 void
119 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
120 {
121 debugs(54, 4, HERE << "kid" << request.requestorId <<
122 " needs shared listen FD for " << request.params.addr);
123 Listeners::const_iterator i = listeners.find(request.params);
124 int errNo = 0;
125 const int sock = (i != listeners.end()) ?
126 i->second : openListenSocket(request, errNo);
127
128 debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
129 request.params.addr << " to kid" << request.requestorId <<
130 " mapId=" << request.mapId);
131
132 SharedListenResponse response(sock, errNo, request.mapId);
133 TypedMsgHdr message;
134 response.pack(message);
135 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
136 }
137
138 void
139 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
140 {
141 debugs(54, 4, HERE);
142
143 // Let the strand know that we are now responsible for handling the request
144 Mgr::Response response(request.requestId);
145 TypedMsgHdr message;
146 response.pack(message);
147 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
148
149 Mgr::Action::Pointer action =
150 CacheManager::GetInstance()->createRequestedAction(request.params);
151 AsyncJob::Start(new Mgr::Inquirer(action,
152 Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
153 }
154
155 void
156 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
157 {
158 Mgr::Inquirer::HandleRemoteAck(response);
159 }
160
161 void
162 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
163 {
164 // do we know of a strand with the given search tag?
165 const StrandCoord *strand = NULL;
166 typedef StrandCoords::const_iterator SCCI;
167 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
168 if (i->tag == request.tag)
169 strand = &(*i);
170 }
171
172 if (strand) {
173 notifySearcher(request, *strand);
174 return;
175 }
176
177 searchers.push_back(request);
178 debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
179 " who " << request.tag << " is");
180 }
181
182 void
183 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
184 const StrandCoord& strand)
185 {
186 debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
187 request.tag << " is kid" << strand.kidId);
188 const StrandSearchResponse response0(request.requestId, strand);
189 // XXX: we should use StrandSearchResponse instead of converting it
190 IpcIoResponse io;
191 io.diskId = strand.kidId;
192 io.requestId = request.requestId;
193 io.command = IpcIo::cmdOpen;
194 TypedMsgHdr message;
195 io.pack(message);
196 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
197 }
198
199
200 int
201 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
202 int &errNo)
203 {
204 const OpenListenerParams &p = request.params;
205
206 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
207 request.requestorId);
208
209 Ip::Address addr = p.addr; // comm_open_listener may modify it
210
211 enter_suid();
212 const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
213 FdNote(p.fdNote));
214 errNo = (sock >= 0) ? 0 : errno;
215 leave_suid();
216
217 // cache positive results
218 if (sock >= 0)
219 listeners[request.params] = sock;
220
221 return sock;
222 }
223
224 void Ipc::Coordinator::broadcastSignal(int sig) const
225 {
226 typedef StrandCoords::const_iterator SCI;
227 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
228 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
229 ", PID=" << iter->pid);
230 kill(iter->pid, sig);
231 }
232 }
233
234 Ipc::Coordinator* Ipc::Coordinator::Instance()
235 {
236 if (!TheInstance)
237 TheInstance = new Coordinator;
238 // XXX: if the Coordinator job quits, this pointer will become invalid
239 // we could make Coordinator death fatal, except during exit, but since
240 // Strands do not re-register, even process death would be pointless.
241 return TheInstance;
242 }
243
244 const Ipc::StrandCoords&
245 Ipc::Coordinator::strands() const
246 {
247 return strands_;
248 }