]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
fc38a7d27373b9f7c2bb36054a57c63bc4828d49
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "config.h"
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
12 #include "comm.h"
13 #include "comm/Connection.h"
14 #include "ipc/Coordinator.h"
15 #include "ipc/FdNotes.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
20 #include "mgr/StoreToCommWriter.h"
21
22
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
24 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
25
26
27 Ipc::Coordinator::Coordinator():
28 Port(coordinatorAddr)
29 {
30 }
31
32 void Ipc::Coordinator::start()
33 {
34 Port::start();
35 }
36
37 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
38 {
39 typedef StrandCoords::iterator SI;
40 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
41 if (iter->kidId == kidId)
42 return &(*iter);
43 }
44 return NULL;
45 }
46
47 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
48 {
49 if (StrandCoord* found = findStrand(strand.kidId))
50 *found = strand;
51 else
52 strands_.push_back(strand);
53 }
54
55 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
56 {
57 switch (message.type()) {
58 case mtRegistration:
59 debugs(54, 6, HERE << "Registration request");
60 handleRegistrationRequest(StrandCoord(message));
61 break;
62
63 case mtSharedListenRequest:
64 debugs(54, 6, HERE << "Shared listen request");
65 handleSharedListenRequest(SharedListenRequest(message));
66 break;
67
68 case mtCacheMgrRequest:
69 debugs(54, 6, HERE << "Cache manager request");
70 handleCacheMgrRequest(Mgr::Request(message));
71 break;
72
73 case mtCacheMgrResponse:
74 debugs(54, 6, HERE << "Cache manager response");
75 handleCacheMgrResponse(Mgr::Response(message));
76 break;
77
78 default:
79 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
80 break;
81 }
82 }
83
84 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
85 {
86 registerStrand(strand);
87
88 // send back an acknowledgement; TODO: remove as not needed?
89 TypedMsgHdr message;
90 strand.pack(message);
91 SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
92 }
93
94 void
95 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
96 {
97 debugs(54, 4, HERE << "kid" << request.requestorId <<
98 " needs shared listen FD for " << request.params.addr);
99 Listeners::const_iterator i = listeners.find(request.params);
100 int errNo = 0;
101 const Comm::ConnectionPointer c = (i != listeners.end()) ?
102 i->second : openListenSocket(request, errNo);
103
104 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
105 request.params.addr << " to kid" << request.requestorId <<
106 " mapId=" << request.mapId);
107
108 SharedListenResponse response(c, errNo, request.mapId);
109 TypedMsgHdr message;
110 response.pack(message);
111 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
112 }
113
114 void
115 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
116 {
117 debugs(54, 4, HERE);
118
119 // Let the strand know that we are now responsible for handling the request
120 Mgr::Response response(request.requestId);
121 TypedMsgHdr message;
122 response.pack(message);
123 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
124
125 Mgr::Action::Pointer action =
126 CacheManager::GetInstance()->createRequestedAction(request.params);
127 AsyncJob::Start(new Mgr::Inquirer(action,
128 Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
129 }
130
131 void
132 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
133 {
134 Mgr::Inquirer::HandleRemoteAck(response);
135 }
136
137 Comm::ConnectionPointer
138 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
139 int &errNo)
140 {
141 const OpenListenerParams &p = request.params;
142
143 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
144 request.requestorId);
145
146 Comm::ConnectionPointer conn = new Comm::Connection;
147 conn->local = p.addr; // comm_open_listener may modify it
148 conn->flags = p.flags;
149
150 enter_suid();
151 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
152 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
153 leave_suid();
154
155 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
156 request.requestorId);
157
158 // cache positive results
159 if (Comm::IsConnOpen(conn))
160 listeners[request.params] = conn;
161
162 return conn;
163 }
164
165 void Ipc::Coordinator::broadcastSignal(int sig) const
166 {
167 typedef StrandCoords::const_iterator SCI;
168 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
169 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
170 ", PID=" << iter->pid);
171 kill(iter->pid, sig);
172 }
173 }
174
175 Ipc::Coordinator* Ipc::Coordinator::Instance()
176 {
177 if (!TheInstance)
178 TheInstance = new Coordinator;
179 // XXX: if the Coordinator job quits, this pointer will become invalid
180 // we could make Coordinator death fatal, except during exit, but since
181 // Strands do not re-register, even process death would be pointless.
182 return TheInstance;
183 }
184
185 const Ipc::StrandCoords&
186 Ipc::Coordinator::strands() const
187 {
188 return strands_;
189 }