]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/Inquirer.cc
Merge from trunk
[thirdparty/squid.git] / src / mgr / Inquirer.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 16 Cache Manager API
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "comm.h"
11 #include "comm/Write.h"
12 #include "CommCalls.h"
13 #include "comm/Connection.h"
14 #include "HttpReply.h"
15 #include "ipc/Coordinator.h"
16 #include "mgr/ActionWriter.h"
17 #include "mgr/Command.h"
18 #include "mgr/Inquirer.h"
19 #include "mgr/Request.h"
20 #include "mgr/Response.h"
21 #include "SquidTime.h"
22 #include <memory>
23 #include <algorithm>
24
25
26 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
27
28 Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap;
29 unsigned int Mgr::Inquirer::LastRequestId = 0;
30
31 /// compare Ipc::StrandCoord using kidId, for std::sort() below
32 static bool
33 LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
34 {
35 return c1.kidId < c2.kidId;
36 }
37
38 Mgr::Inquirer::Inquirer(Action::Pointer anAction, const Comm::ConnectionPointer &conn,
39 const Request &aCause, const Ipc::StrandCoords &coords):
40 AsyncJob("Mgr::Inquirer"),
41 aggrAction(anAction),
42 cause(aCause),
43 clientConnection(conn),
44 strands(coords), pos(strands.begin()),
45 requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100)
46 {
47 debugs(16, 5, HERE << conn << " action: " << aggrAction);
48
49 // order by ascending kid IDs; useful for non-aggregatable stats
50 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
51
52 closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
53 CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
54 comm_add_close_handler(clientConnection->fd, closer);
55 }
56
57 Mgr::Inquirer::~Inquirer()
58 {
59 debugs(16, 5, HERE);
60 close();
61 }
62
63 /// closes our copy of the client HTTP connection socket
64 void
65 Mgr::Inquirer::close()
66 {
67 if (Comm::IsConnOpen(clientConnection)) {
68 removeCloseHandler();
69 clientConnection->close();
70 }
71 }
72
73 void
74 Mgr::Inquirer::removeCloseHandler()
75 {
76 if (closer != NULL) {
77 comm_remove_close_handler(clientConnection->fd, closer);
78 closer = NULL;
79 }
80 }
81
82 void
83 Mgr::Inquirer::start()
84 {
85 debugs(16, 5, HERE);
86 Must(Comm::IsConnOpen(clientConnection));
87 Must(aggrAction != NULL);
88
89 std::auto_ptr<HttpReply> reply(new HttpReply);
90 reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
91 reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
92 std::auto_ptr<MemBuf> replyBuf(reply->pack());
93 writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
94 CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
95 Comm::Write(clientConnection, replyBuf.get(), writer);
96 }
97
98 /// called when we wrote the response header
99 void
100 Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
101 {
102 debugs(16, 5, HERE);
103 writer = NULL;
104 Must(params.flag == COMM_OK);
105 Must(clientConnection != NULL && params.fd == clientConnection->fd);
106 Must(params.size != 0);
107 // start inquiries at the initial pos
108 inquire();
109 }
110
111 void
112 Mgr::Inquirer::inquire()
113 {
114 if (pos == strands.end()) {
115 Must(done());
116 return;
117 }
118
119 Must(requestId == 0);
120 AsyncCall::Pointer callback = asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck",
121 HandleAckDialer(this, &Inquirer::handleRemoteAck, Response()));
122 if (++LastRequestId == 0) // don't use zero value as requestId
123 ++LastRequestId;
124 requestId = LastRequestId;
125 const int kidId = pos->kidId;
126 debugs(16, 4, HERE << "inquire kid: " << kidId << status());
127 TheRequestsMap[requestId] = callback;
128 Request mgrRequest(KidIdentifier, requestId, clientConnection,
129 aggrAction->command().params);
130 Ipc::TypedMsgHdr message;
131 mgrRequest.pack(message);
132 Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx, kidId), message);
133 eventAdd("Mgr::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
134 this, timeout, 0, false);
135 }
136
137 /// called when a strand is done writing its output
138 void
139 Mgr::Inquirer::handleRemoteAck(const Response& response)
140 {
141 debugs(16, 4, HERE << status());
142 requestId = 0;
143 removeTimeoutEvent();
144 if (response.hasAction())
145 aggrAction->add(response.getAction());
146 Must(!done()); // or we should not be called
147 ++pos; // advance after a successful inquiry
148 inquire();
149 }
150
151 /// called when the HTTP client or some external force closed our socket
152 void
153 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
154 {
155 debugs(16, 5, HERE);
156 Must(!Comm::IsConnOpen(clientConnection) || clientConnection->fd == params.fd);
157 clientConnection = NULL; // AYJ: Do we actually have to NULL it?
158 mustStop("commClosed");
159 }
160
161 void
162 Mgr::Inquirer::swanSong()
163 {
164 debugs(16, 5, HERE);
165 removeTimeoutEvent();
166 if (requestId > 0) {
167 DequeueRequest(requestId);
168 requestId = 0;
169 }
170 if (aggrAction->aggregatable()) {
171 removeCloseHandler();
172 AsyncJob::Start(new ActionWriter(aggrAction, clientConnection));
173 clientConnection = NULL; // should not close fd because we passed it to ActionWriter
174 }
175 close();
176 }
177
178 bool
179 Mgr::Inquirer::doneAll() const
180 {
181 return !writer && pos == strands.end();
182 }
183
184 /// returns and forgets the right Inquirer callback for strand request
185 AsyncCall::Pointer
186 Mgr::Inquirer::DequeueRequest(unsigned int requestId)
187 {
188 debugs(16, 3, HERE << " requestId " << requestId);
189 Must(requestId != 0);
190 AsyncCall::Pointer call;
191 RequestsMap::iterator request = TheRequestsMap.find(requestId);
192 if (request != TheRequestsMap.end()) {
193 call = request->second;
194 Must(call != NULL);
195 TheRequestsMap.erase(request);
196 }
197 return call;
198 }
199
200 void
201 Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response)
202 {
203 Must(response.requestId != 0);
204 AsyncCall::Pointer call = DequeueRequest(response.requestId);
205 if (call != NULL) {
206 HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
207 Must(dialer);
208 dialer->arg1 = response;
209 ScheduleCallHere(call);
210 }
211 }
212
213 /// called when we are no longer waiting for the strand to respond
214 void
215 Mgr::Inquirer::removeTimeoutEvent()
216 {
217 if (eventFind(&Inquirer::RequestTimedOut, this))
218 eventDelete(&Inquirer::RequestTimedOut, this);
219 }
220
221 /// Mgr::Inquirer::requestTimedOut wrapper
222 void
223 Mgr::Inquirer::RequestTimedOut(void* param)
224 {
225 debugs(16, 3, HERE);
226 Must(param != NULL);
227 Inquirer* cmi = static_cast<Inquirer*>(param);
228 // use async call to enable job call protection that time events lack
229 CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut);
230 }
231
232 /// called when the strand failed to respond (or finish responding) in time
233 void
234 Mgr::Inquirer::requestTimedOut()
235 {
236 debugs(16, 3, HERE);
237 if (requestId != 0) {
238 DequeueRequest(requestId);
239 requestId = 0;
240 Must(!done()); // or we should not be called
241 ++pos; // advance after a failed inquiry
242 inquire();
243 }
244 }
245
246 const char*
247 Mgr::Inquirer::status() const
248 {
249 static MemBuf buf;
250 buf.reset();
251 buf.Printf(" [FD %d, requestId %u]", clientConnection->fd, requestId);
252 buf.terminate();
253 return buf.content();
254 }