]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Coordinator.cc
Amos requests
[thirdparty/squid.git] / src / ipc / Coordinator.cc
CommitLineData
10cefb7b 1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9#include "config.h"
8822ebee
AR
10#include "base/TextException.h"
11#include "CacheManager.h"
0d0bce6a 12#include "comm.h"
10cefb7b 13#include "ipc/Coordinator.h"
0d0bce6a 14#include "ipc/SharedListen.h"
8822ebee
AR
15#include "mgr/Inquirer.h"
16#include "mgr/Request.h"
17#include "mgr/Response.h"
f738d783 18#if SQUID_SNMP
51ea0904
CT
19#include "snmpx/Inquirer.h"
20#include "snmpx/Request.h"
21#include "snmpx/Response.h"
f738d783 22#endif
10cefb7b 23
24CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
a2c48c98 25Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
10cefb7b 26
27
28Ipc::Coordinator::Coordinator():
5667a628 29 Port(coordinatorAddr)
10cefb7b 30{
31}
32
33void Ipc::Coordinator::start()
34{
ba568924 35 Port::start();
10cefb7b 36}
37
1bac0258 38Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
10cefb7b 39{
8822ebee
AR
40 typedef StrandCoords::iterator SI;
41 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
10cefb7b 42 if (iter->kidId == kidId)
43 return &(*iter);
44 }
45 return NULL;
46}
47
1bac0258 48void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
10cefb7b 49{
1bac0258 50 if (StrandCoord* found = findStrand(strand.kidId))
10cefb7b 51 *found = strand;
52 else
8822ebee 53 strands_.push_back(strand);
10cefb7b 54}
55
1bac0258 56void Ipc::Coordinator::receive(const TypedMsgHdr& message)
10cefb7b 57{
58 switch (message.type()) {
ba568924 59 case mtRegistration:
10cefb7b 60 debugs(54, 6, HERE << "Registration request");
1bac0258 61 handleRegistrationRequest(StrandCoord(message));
10cefb7b 62 break;
63
0d0bce6a
AR
64 case mtSharedListenRequest:
65 debugs(54, 6, HERE << "Shared listen request");
66 handleSharedListenRequest(SharedListenRequest(message));
67 break;
68
8822ebee
AR
69 case mtCacheMgrRequest:
70 debugs(54, 6, HERE << "Cache manager request");
71 handleCacheMgrRequest(Mgr::Request(message));
72 break;
73
74 case mtCacheMgrResponse:
75 debugs(54, 6, HERE << "Cache manager response");
76 handleCacheMgrResponse(Mgr::Response(message));
77 break;
78
f738d783 79#if SQUID_SNMP
51ea0904
CT
80 case mtSnmpRequest:
81 debugs(54, 6, HERE << "SNMP request");
82 handleSnmpRequest(Snmp::Request(message));
83 break;
84
85 case mtSnmpResponse:
86 debugs(54, 6, HERE << "SNMP response");
87 handleSnmpResponse(Snmp::Response(message));
88 break;
f738d783 89#endif
51ea0904 90
10cefb7b 91 default:
7230e9c7 92 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
10cefb7b 93 break;
94 }
95}
96
1bac0258 97void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
10cefb7b 98{
ba568924
AR
99 registerStrand(strand);
100
101 // send back an acknowledgement; TODO: remove as not needed?
1bac0258
AR
102 TypedMsgHdr message;
103 strand.pack(message);
104 SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
10cefb7b 105}
a2c48c98 106
0d0bce6a
AR
107void
108Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
109{
110 debugs(54, 4, HERE << "kid" << request.requestorId <<
5667a628 111 " needs shared listen FD for " << request.params.addr);
0d0bce6a
AR
112 Listeners::const_iterator i = listeners.find(request.params);
113 int errNo = 0;
114 const int sock = (i != listeners.end()) ?
5667a628 115 i->second : openListenSocket(request, errNo);
0d0bce6a
AR
116
117 debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
5667a628
AR
118 request.params.addr << " to kid" << request.requestorId <<
119 " mapId=" << request.mapId);
0d0bce6a
AR
120
121 SharedListenResponse response(sock, errNo, request.mapId);
122 TypedMsgHdr message;
123 response.pack(message);
124 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
125}
126
8822ebee
AR
127void
128Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
129{
130 debugs(54, 4, HERE);
131
132 // Let the strand know that we are now responsible for handling the request
133 Mgr::Response response(request.requestId);
134 TypedMsgHdr message;
135 response.pack(message);
136 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
137
138 Mgr::Action::Pointer action =
139 CacheManager::GetInstance()->createRequestedAction(request.params);
51ea0904 140 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
8822ebee
AR
141}
142
143void
144Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
145{
146 Mgr::Inquirer::HandleRemoteAck(response);
147}
148
f738d783 149#if SQUID_SNMP
51ea0904
CT
150void
151Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
152{
153 debugs(54, 4, HERE);
154
155 Snmp::Response response(request.requestId);
156 TypedMsgHdr message;
157 response.pack(message);
158 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
159
160 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
161}
162
163void
164Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
165{
166 debugs(54, 4, HERE);
167 Snmp::Inquirer::HandleRemoteAck(response);
168}
f738d783 169#endif
51ea0904 170
0d0bce6a
AR
171int
172Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
5667a628 173 int &errNo)
0d0bce6a
AR
174{
175 const OpenListenerParams &p = request.params;
176
177 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
5667a628 178 request.requestorId);
0d0bce6a 179
a67d2b2e 180 Ip::Address addr = p.addr; // comm_open_listener may modify it
0d0bce6a
AR
181
182 enter_suid();
183 const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
5667a628 184 FdNote(p.fdNote));
0d0bce6a
AR
185 errNo = (sock >= 0) ? 0 : errno;
186 leave_suid();
187
188 // cache positive results
189 if (sock >= 0)
190 listeners[request.params] = sock;
191
192 return sock;
193}
194
a2c48c98
AR
195void Ipc::Coordinator::broadcastSignal(int sig) const
196{
8822ebee
AR
197 typedef StrandCoords::const_iterator SCI;
198 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
a2c48c98 199 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
5667a628 200 ", PID=" << iter->pid);
a2c48c98
AR
201 kill(iter->pid, sig);
202 }
203}
204
205Ipc::Coordinator* Ipc::Coordinator::Instance()
206{
207 if (!TheInstance)
208 TheInstance = new Coordinator;
209 // XXX: if the Coordinator job quits, this pointer will become invalid
210 // we could make Coordinator death fatal, except during exit, but since
211 // Strands do not re-register, even process death would be pointless.
212 return TheInstance;
213}
8822ebee
AR
214
215const Ipc::StrandCoords&
216Ipc::Coordinator::strands() const
217{
218 return strands_;
219}