]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Inquirer.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Inquirer.cc
CommitLineData
51ea0904 1/*
bbc27441 2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
51ea0904 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
51ea0904
CT
7 */
8
bbc27441
AJ
9/* DEBUG: section 54 Interprocess Communication */
10
f7f3304a 11#include "squid.h"
51ea0904 12#include "base/TextException.h"
1b76e6c1 13#include "comm.h"
51ea0904
CT
14#include "comm/Write.h"
15#include "ipc/Inquirer.h"
16#include "ipc/Port.h"
17#include "ipc/TypedMsgHdr.h"
18#include "MemBuf.h"
19#include <algorithm>
20
51ea0904
CT
21CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
22
23Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
24unsigned int Ipc::Inquirer::LastRequestId = 0;
25
26/// compare Ipc::StrandCoord using kidId, for std::sort() below
27static bool
28LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
29{
30 return c1.kidId < c2.kidId;
31}
32
33Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
34 double aTimeout):
f53969cc
SM
35 AsyncJob("Ipc::Inquirer"),
36 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
51ea0904
CT
37{
38 debugs(54, 5, HERE);
39
40 // order by ascending kid IDs; useful for non-aggregatable stats
41 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
42}
43
44Ipc::Inquirer::~Inquirer()
45{
46 debugs(54, 5, HERE);
47 cleanup();
48}
49
50void
51Ipc::Inquirer::cleanup()
52{
53}
54
55void
56Ipc::Inquirer::start()
57{
58 request->requestId = 0;
59}
60
61void
62Ipc::Inquirer::inquire()
63{
64 if (pos == strands.end()) {
65 Must(done());
66 return;
67 }
68
69 Must(request->requestId == 0);
933a6aed 70 AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
51ea0904
CT
71 HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL));
72 if (++LastRequestId == 0) // don't use zero value as request->requestId
73 ++LastRequestId;
74 request->requestId = LastRequestId;
75 const int kidId = pos->kidId;
76 debugs(54, 4, HERE << "inquire kid: " << kidId << status());
77 TheRequestsMap[request->requestId] = callback;
78 TypedMsgHdr message;
79 request->pack(message);
1ee292b7 80 SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
51ea0904
CT
81 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
82 this, timeout, 0, false);
83}
84
85/// called when a strand is done writing its output
86void
87Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
88{
89 debugs(54, 4, HERE << status());
90 request->requestId = 0;
91 removeTimeoutEvent();
92 if (aggregate(response)) {
93 Must(!done()); // or we should not be called
94 ++pos; // advance after a successful inquiry
95 inquire();
96 } else {
97 mustStop("error");
98 }
99}
100
101void
102Ipc::Inquirer::swanSong()
103{
104 debugs(54, 5, HERE);
105 removeTimeoutEvent();
106 if (request->requestId > 0) {
107 DequeueRequest(request->requestId);
108 request->requestId = 0;
109 }
110 sendResponse();
111 cleanup();
112}
113
114bool
115Ipc::Inquirer::doneAll() const
116{
117 return pos == strands.end();
118}
119
120void
121Ipc::Inquirer::handleException(const std::exception& e)
122{
123 debugs(54, 3, HERE << e.what());
124 mustStop("exception");
125}
126
127void
128Ipc::Inquirer::callException(const std::exception& e)
129{
130 debugs(54, 3, HERE);
131 try {
132 handleException(e);
133 } catch (const std::exception& ex) {
134 debugs(54, DBG_CRITICAL, HERE << ex.what());
135 }
136 AsyncJob::callException(e);
137}
138
139/// returns and forgets the right Inquirer callback for strand request
140AsyncCall::Pointer
141Ipc::Inquirer::DequeueRequest(unsigned int requestId)
142{
143 debugs(54, 3, HERE << " requestId " << requestId);
144 Must(requestId != 0);
145 AsyncCall::Pointer call;
146 RequestsMap::iterator request = TheRequestsMap.find(requestId);
147 if (request != TheRequestsMap.end()) {
148 call = request->second;
149 Must(call != NULL);
150 TheRequestsMap.erase(request);
151 }
152 return call;
153}
154
155void
156Ipc::Inquirer::HandleRemoteAck(const Response& response)
157{
158 Must(response.requestId != 0);
159 AsyncCall::Pointer call = DequeueRequest(response.requestId);
160 if (call != NULL) {
161 HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
162 Must(dialer);
163 dialer->arg1 = response.clone();
164 ScheduleCallHere(call);
165 }
166}
167
168/// called when we are no longer waiting for the strand to respond
169void
170Ipc::Inquirer::removeTimeoutEvent()
171{
172 if (eventFind(&Inquirer::RequestTimedOut, this))
173 eventDelete(&Inquirer::RequestTimedOut, this);
174}
175
176/// Ipc::Inquirer::requestTimedOut wrapper
177void
178Ipc::Inquirer::RequestTimedOut(void* param)
179{
180 debugs(54, 3, HERE);
181 Must(param != NULL);
182 Inquirer* cmi = static_cast<Inquirer*>(param);
183 // use async call to enable job call protection that time events lack
933a6aed 184 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
51ea0904
CT
185}
186
187/// called when the strand failed to respond (or finish responding) in time
188void
189Ipc::Inquirer::requestTimedOut()
190{
191 debugs(54, 3, HERE);
192 if (request->requestId != 0) {
193 DequeueRequest(request->requestId);
194 request->requestId = 0;
195 Must(!done()); // or we should not be called
196 ++pos; // advance after a failed inquiry
197 inquire();
198 }
199}
200
201const char*
202Ipc::Inquirer::status() const
203{
204 static MemBuf buf;
205 buf.reset();
206 buf.Printf(" [request->requestId %u]", request->requestId);
207 buf.terminate();
208 return buf.content();
209}
f53969cc 210