]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Inquirer.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Inquirer.cc
CommitLineData
51ea0904
CT
1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
f7f3304a 8#include "squid.h"
51ea0904 9#include "base/TextException.h"
1b76e6c1 10#include "comm.h"
51ea0904
CT
11#include "comm/Write.h"
12#include "ipc/Inquirer.h"
13#include "ipc/Port.h"
14#include "ipc/TypedMsgHdr.h"
15#include "MemBuf.h"
16#include <algorithm>
17
51ea0904
CT
18CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
19
20Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
21unsigned int Ipc::Inquirer::LastRequestId = 0;
22
23/// compare Ipc::StrandCoord using kidId, for std::sort() below
24static bool
25LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
26{
27 return c1.kidId < c2.kidId;
28}
29
30Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
31 double aTimeout):
8fb5a96c
CT
32 AsyncJob("Ipc::Inquirer"),
33 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
51ea0904
CT
34{
35 debugs(54, 5, HERE);
36
37 // order by ascending kid IDs; useful for non-aggregatable stats
38 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
39}
40
41Ipc::Inquirer::~Inquirer()
42{
43 debugs(54, 5, HERE);
44 cleanup();
45}
46
47void
48Ipc::Inquirer::cleanup()
49{
50}
51
52void
53Ipc::Inquirer::start()
54{
55 request->requestId = 0;
56}
57
58void
59Ipc::Inquirer::inquire()
60{
61 if (pos == strands.end()) {
62 Must(done());
63 return;
64 }
65
66 Must(request->requestId == 0);
933a6aed 67 AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
51ea0904
CT
68 HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL));
69 if (++LastRequestId == 0) // don't use zero value as request->requestId
70 ++LastRequestId;
71 request->requestId = LastRequestId;
72 const int kidId = pos->kidId;
73 debugs(54, 4, HERE << "inquire kid: " << kidId << status());
74 TheRequestsMap[request->requestId] = callback;
75 TypedMsgHdr message;
76 request->pack(message);
77 SendMessage(Port::MakeAddr(strandAddrPfx, kidId), message);
78 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
79 this, timeout, 0, false);
80}
81
82/// called when a strand is done writing its output
83void
84Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
85{
86 debugs(54, 4, HERE << status());
87 request->requestId = 0;
88 removeTimeoutEvent();
89 if (aggregate(response)) {
90 Must(!done()); // or we should not be called
91 ++pos; // advance after a successful inquiry
92 inquire();
93 } else {
94 mustStop("error");
95 }
96}
97
98void
99Ipc::Inquirer::swanSong()
100{
101 debugs(54, 5, HERE);
102 removeTimeoutEvent();
103 if (request->requestId > 0) {
104 DequeueRequest(request->requestId);
105 request->requestId = 0;
106 }
107 sendResponse();
108 cleanup();
109}
110
111bool
112Ipc::Inquirer::doneAll() const
113{
114 return pos == strands.end();
115}
116
117void
118Ipc::Inquirer::handleException(const std::exception& e)
119{
120 debugs(54, 3, HERE << e.what());
121 mustStop("exception");
122}
123
124void
125Ipc::Inquirer::callException(const std::exception& e)
126{
127 debugs(54, 3, HERE);
128 try {
129 handleException(e);
130 } catch (const std::exception& ex) {
131 debugs(54, DBG_CRITICAL, HERE << ex.what());
132 }
133 AsyncJob::callException(e);
134}
135
136/// returns and forgets the right Inquirer callback for strand request
137AsyncCall::Pointer
138Ipc::Inquirer::DequeueRequest(unsigned int requestId)
139{
140 debugs(54, 3, HERE << " requestId " << requestId);
141 Must(requestId != 0);
142 AsyncCall::Pointer call;
143 RequestsMap::iterator request = TheRequestsMap.find(requestId);
144 if (request != TheRequestsMap.end()) {
145 call = request->second;
146 Must(call != NULL);
147 TheRequestsMap.erase(request);
148 }
149 return call;
150}
151
152void
153Ipc::Inquirer::HandleRemoteAck(const Response& response)
154{
155 Must(response.requestId != 0);
156 AsyncCall::Pointer call = DequeueRequest(response.requestId);
157 if (call != NULL) {
158 HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
159 Must(dialer);
160 dialer->arg1 = response.clone();
161 ScheduleCallHere(call);
162 }
163}
164
165/// called when we are no longer waiting for the strand to respond
166void
167Ipc::Inquirer::removeTimeoutEvent()
168{
169 if (eventFind(&Inquirer::RequestTimedOut, this))
170 eventDelete(&Inquirer::RequestTimedOut, this);
171}
172
173/// Ipc::Inquirer::requestTimedOut wrapper
174void
175Ipc::Inquirer::RequestTimedOut(void* param)
176{
177 debugs(54, 3, HERE);
178 Must(param != NULL);
179 Inquirer* cmi = static_cast<Inquirer*>(param);
180 // use async call to enable job call protection that time events lack
933a6aed 181 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
51ea0904
CT
182}
183
184/// called when the strand failed to respond (or finish responding) in time
185void
186Ipc::Inquirer::requestTimedOut()
187{
188 debugs(54, 3, HERE);
189 if (request->requestId != 0) {
190 DequeueRequest(request->requestId);
191 request->requestId = 0;
192 Must(!done()); // or we should not be called
193 ++pos; // advance after a failed inquiry
194 inquire();
195 }
196}
197
198const char*
199Ipc::Inquirer::status() const
200{
201 static MemBuf buf;
202 buf.reset();
203 buf.Printf(" [request->requestId %u]", request->requestId);
204 buf.terminate();
205 return buf.content();
206}