]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/Inquirer.cc
4 * DEBUG: section 16 Cache Manager API
9 #include "base/TextException.h"
11 #include "comm/Write.h"
12 #include "CommCalls.h"
13 #include "comm/Connection.h"
14 #include "HttpReply.h"
15 #include "ipc/Coordinator.h"
16 #include "mgr/ActionWriter.h"
17 #include "mgr/Command.h"
18 #include "mgr/Inquirer.h"
19 #include "mgr/Request.h"
20 #include "mgr/Response.h"
21 #include "SquidTime.h"
26 CBDATA_NAMESPACED_CLASS_INIT(Mgr
, Inquirer
);
28 Mgr::Inquirer::RequestsMap
Mgr::Inquirer::TheRequestsMap
;
29 unsigned int Mgr::Inquirer::LastRequestId
= 0;
31 /// compare Ipc::StrandCoord using kidId, for std::sort() below
33 LesserStrandByKidId(const Ipc::StrandCoord
&c1
, const Ipc::StrandCoord
&c2
)
35 return c1
.kidId
< c2
.kidId
;
38 Mgr::Inquirer::Inquirer(Action::Pointer anAction
, const Comm::ConnectionPointer
&conn
,
39 const Request
&aCause
, const Ipc::StrandCoords
&coords
):
40 AsyncJob("Mgr::Inquirer"),
43 clientConnection(conn
),
44 strands(coords
), pos(strands
.begin()),
45 requestId(0), closer(NULL
), timeout(aggrAction
->atomic() ? 10 : 100)
47 debugs(16, 5, HERE
<< conn
<< " action: " << aggrAction
);
49 // order by ascending kid IDs; useful for non-aggregatable stats
50 std::sort(strands
.begin(), strands
.end(), LesserStrandByKidId
);
52 closer
= asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
53 CommCbMemFunT
<Inquirer
, CommCloseCbParams
>(this, &Inquirer::noteCommClosed
));
54 comm_add_close_handler(clientConnection
->fd
, closer
);
57 Mgr::Inquirer::~Inquirer()
63 /// closes our copy of the client HTTP connection socket
65 Mgr::Inquirer::close()
67 if (Comm::IsConnOpen(clientConnection
)) {
69 clientConnection
->close();
74 Mgr::Inquirer::removeCloseHandler()
77 comm_remove_close_handler(clientConnection
->fd
, closer
);
83 Mgr::Inquirer::start()
86 Must(Comm::IsConnOpen(clientConnection
));
87 Must(aggrAction
!= NULL
);
89 std::auto_ptr
<HttpReply
> reply(new HttpReply
);
90 reply
->setHeaders(HTTP_OK
, NULL
, "text/plain", -1, squid_curtime
, squid_curtime
);
91 reply
->header
.putStr(HDR_CONNECTION
, "close"); // until we chunk response
92 std::auto_ptr
<MemBuf
> replyBuf(reply
->pack());
93 writer
= asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
94 CommCbMemFunT
<Inquirer
, CommIoCbParams
>(this, &Inquirer::noteWroteHeader
));
95 Comm::Write(clientConnection
, replyBuf
.get(), writer
);
98 /// called when we wrote the response header
100 Mgr::Inquirer::noteWroteHeader(const CommIoCbParams
& params
)
104 Must(params
.flag
== COMM_OK
);
105 Must(clientConnection
!= NULL
&& params
.fd
== clientConnection
->fd
);
106 Must(params
.size
!= 0);
107 // start inquiries at the initial pos
112 Mgr::Inquirer::inquire()
114 if (pos
== strands
.end()) {
119 Must(requestId
== 0);
120 AsyncCall::Pointer callback
= asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck",
121 HandleAckDialer(this, &Inquirer::handleRemoteAck
, Response()));
122 if (++LastRequestId
== 0) // don't use zero value as requestId
124 requestId
= LastRequestId
;
125 const int kidId
= pos
->kidId
;
126 debugs(16, 4, HERE
<< "inquire kid: " << kidId
<< status());
127 TheRequestsMap
[requestId
] = callback
;
128 Request
mgrRequest(KidIdentifier
, requestId
, clientConnection
,
129 aggrAction
->command().params
);
130 Ipc::TypedMsgHdr message
;
131 mgrRequest
.pack(message
);
132 Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx
, kidId
), message
);
133 eventAdd("Mgr::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut
,
134 this, timeout
, 0, false);
137 /// called when a strand is done writing its output
139 Mgr::Inquirer::handleRemoteAck(const Response
& response
)
141 debugs(16, 4, HERE
<< status());
143 removeTimeoutEvent();
144 if (response
.hasAction())
145 aggrAction
->add(response
.getAction());
146 Must(!done()); // or we should not be called
147 ++pos
; // advance after a successful inquiry
151 /// called when the HTTP client or some external force closed our socket
153 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams
& params
)
156 Must(!Comm::IsConnOpen(clientConnection
) || clientConnection
->fd
== params
.fd
);
157 clientConnection
= NULL
; // AYJ: Do we actually have to NULL it?
158 mustStop("commClosed");
162 Mgr::Inquirer::swanSong()
165 removeTimeoutEvent();
167 DequeueRequest(requestId
);
170 if (aggrAction
->aggregatable()) {
171 removeCloseHandler();
172 AsyncJob::Start(new ActionWriter(aggrAction
, clientConnection
));
173 clientConnection
= NULL
; // should not close fd because we passed it to ActionWriter
179 Mgr::Inquirer::doneAll() const
181 return !writer
&& pos
== strands
.end();
184 /// returns and forgets the right Inquirer callback for strand request
186 Mgr::Inquirer::DequeueRequest(unsigned int requestId
)
188 debugs(16, 3, HERE
<< " requestId " << requestId
);
189 Must(requestId
!= 0);
190 AsyncCall::Pointer call
;
191 RequestsMap::iterator request
= TheRequestsMap
.find(requestId
);
192 if (request
!= TheRequestsMap
.end()) {
193 call
= request
->second
;
195 TheRequestsMap
.erase(request
);
201 Mgr::Inquirer::HandleRemoteAck(const Mgr::Response
& response
)
203 Must(response
.requestId
!= 0);
204 AsyncCall::Pointer call
= DequeueRequest(response
.requestId
);
206 HandleAckDialer
* dialer
= dynamic_cast<HandleAckDialer
*>(call
->getDialer());
208 dialer
->arg1
= response
;
209 ScheduleCallHere(call
);
213 /// called when we are no longer waiting for the strand to respond
215 Mgr::Inquirer::removeTimeoutEvent()
217 if (eventFind(&Inquirer::RequestTimedOut
, this))
218 eventDelete(&Inquirer::RequestTimedOut
, this);
221 /// Mgr::Inquirer::requestTimedOut wrapper
223 Mgr::Inquirer::RequestTimedOut(void* param
)
227 Inquirer
* cmi
= static_cast<Inquirer
*>(param
);
228 // use async call to enable job call protection that time events lack
229 CallJobHere(16, 5, cmi
, Mgr::Inquirer
, requestTimedOut
);
232 /// called when the strand failed to respond (or finish responding) in time
234 Mgr::Inquirer::requestTimedOut()
237 if (requestId
!= 0) {
238 DequeueRequest(requestId
);
240 Must(!done()); // or we should not be called
241 ++pos
; // advance after a failed inquiry
247 Mgr::Inquirer::status() const
251 buf
.Printf(" [FD %d, requestId %u]", clientConnection
->fd
, requestId
);
253 return buf
.content();