]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
d7f341949680a85164f5f784faf0b9cb3bba6b74
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "config.h"
10 #include "base/TextException.h"
11 #include "CacheManager.h"
12 #include "comm.h"
13 #include "ipc/Coordinator.h"
14 #include "ipc/FdNotes.h"
15 #include "ipc/SharedListen.h"
16 #include "mgr/Inquirer.h"
17 #include "mgr/Request.h"
18 #include "mgr/Response.h"
19 #include "mgr/StoreToCommWriter.h"
20
21
22 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
23 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
24
25
26 Ipc::Coordinator::Coordinator():
27 Port(coordinatorAddr)
28 {
29 }
30
31 void Ipc::Coordinator::start()
32 {
33 Port::start();
34 }
35
36 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
37 {
38 typedef StrandCoords::iterator SI;
39 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
40 if (iter->kidId == kidId)
41 return &(*iter);
42 }
43 return NULL;
44 }
45
46 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
47 {
48 if (StrandCoord* found = findStrand(strand.kidId))
49 *found = strand;
50 else
51 strands_.push_back(strand);
52 }
53
54 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
55 {
56 switch (message.type()) {
57 case mtRegistration:
58 debugs(54, 6, HERE << "Registration request");
59 handleRegistrationRequest(StrandCoord(message));
60 break;
61
62 case mtSharedListenRequest:
63 debugs(54, 6, HERE << "Shared listen request");
64 handleSharedListenRequest(SharedListenRequest(message));
65 break;
66
67 case mtCacheMgrRequest:
68 debugs(54, 6, HERE << "Cache manager request");
69 handleCacheMgrRequest(Mgr::Request(message));
70 break;
71
72 case mtCacheMgrResponse:
73 debugs(54, 6, HERE << "Cache manager response");
74 handleCacheMgrResponse(Mgr::Response(message));
75 break;
76
77 default:
78 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
79 break;
80 }
81 }
82
83 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
84 {
85 registerStrand(strand);
86
87 // send back an acknowledgement; TODO: remove as not needed?
88 TypedMsgHdr message;
89 strand.pack(message);
90 SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
91 }
92
93 void
94 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
95 {
96 debugs(54, 4, HERE << "kid" << request.requestorId <<
97 " needs shared listen FD for " << request.params.addr);
98 Listeners::const_iterator i = listeners.find(request.params);
99 int errNo = 0;
100 const int sock = (i != listeners.end()) ?
101 i->second : openListenSocket(request, errNo);
102
103 debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
104 request.params.addr << " to kid" << request.requestorId <<
105 " mapId=" << request.mapId);
106
107 SharedListenResponse response(sock, errNo, request.mapId);
108 TypedMsgHdr message;
109 response.pack(message);
110 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
111 }
112
113 void
114 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
115 {
116 debugs(54, 4, HERE);
117
118 // Let the strand know that we are now responsible for handling the request
119 Mgr::Response response(request.requestId);
120 TypedMsgHdr message;
121 response.pack(message);
122 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
123
124 Mgr::Action::Pointer action =
125 CacheManager::GetInstance()->createRequestedAction(request.params);
126 AsyncJob::Start(new Mgr::Inquirer(action,
127 Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
128 }
129
130 void
131 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
132 {
133 Mgr::Inquirer::HandleRemoteAck(response);
134 }
135
136 int
137 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
138 int &errNo)
139 {
140 const OpenListenerParams &p = request.params;
141
142 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
143 request.requestorId);
144
145 Ip::Address addr = p.addr; // comm_open_listener may modify it
146
147 enter_suid();
148 const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
149 FdNote(p.fdNote));
150 errNo = (sock >= 0) ? 0 : errno;
151 leave_suid();
152
153 // cache positive results
154 if (sock >= 0)
155 listeners[request.params] = sock;
156
157 return sock;
158 }
159
160 void Ipc::Coordinator::broadcastSignal(int sig) const
161 {
162 typedef StrandCoords::const_iterator SCI;
163 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
164 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
165 ", PID=" << iter->pid);
166 kill(iter->pid, sig);
167 }
168 }
169
170 Ipc::Coordinator* Ipc::Coordinator::Instance()
171 {
172 if (!TheInstance)
173 TheInstance = new Coordinator;
174 // XXX: if the Coordinator job quits, this pointer will become invalid
175 // we could make Coordinator death fatal, except during exit, but since
176 // Strands do not re-register, even process death would be pointless.
177 return TheInstance;
178 }
179
180 const Ipc::StrandCoords&
181 Ipc::Coordinator::strands() const
182 {
183 return strands_;
184 }