]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Inquirer.cc
Maintenance: Removed most NULLs using modernize-use-nullptr (#1075)
[thirdparty/squid.git] / src / ipc / Inquirer.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Interprocess Communication */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemBuf.h"
19 #include <algorithm>
20
21 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
22
23 Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
24 Ipc::RequestId::Index Ipc::Inquirer::LastRequestId = 0;
25
26 /// compare Ipc::StrandCoord using kidId, for std::sort() below
27 static bool
28 LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
29 {
30 return c1.kidId < c2.kidId;
31 }
32
33 Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
34 double aTimeout):
35 AsyncJob("Ipc::Inquirer"),
36 codeContext(CodeContext::Current()),
37 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
38 {
39 debugs(54, 5, MYNAME);
40
41 // order by ascending kid IDs; useful for non-aggregatable stats
42 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
43 }
44
45 Ipc::Inquirer::~Inquirer()
46 {
47 debugs(54, 5, MYNAME);
48 cleanup();
49 }
50
51 void
52 Ipc::Inquirer::cleanup()
53 {
54 }
55
56 void
57 Ipc::Inquirer::start()
58 {
59 request->requestId = 0;
60 }
61
62 void
63 Ipc::Inquirer::inquire()
64 {
65 if (pos == strands.end()) {
66 Must(done());
67 return;
68 }
69
70 Must(request->requestId == 0);
71 AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
72 HandleAckDialer(this, &Inquirer::handleRemoteAck, nullptr));
73 if (++LastRequestId == 0) // don't use zero value as request->requestId
74 ++LastRequestId;
75 request->requestId = LastRequestId;
76 const int kidId = pos->kidId;
77 debugs(54, 4, "inquire kid: " << kidId << status());
78 TheRequestsMap[request->requestId] = callback;
79 TypedMsgHdr message;
80 request->pack(message);
81 SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
82 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
83 this, timeout, 0, false);
84 }
85
86 /// called when a strand is done writing its output
87 void
88 Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
89 {
90 debugs(54, 4, status());
91 request->requestId = 0;
92 removeTimeoutEvent();
93 if (aggregate(response)) {
94 Must(!done()); // or we should not be called
95 ++pos; // advance after a successful inquiry
96 inquire();
97 } else {
98 mustStop("error");
99 }
100 }
101
102 void
103 Ipc::Inquirer::swanSong()
104 {
105 debugs(54, 5, MYNAME);
106 removeTimeoutEvent();
107 if (request->requestId > 0) {
108 DequeueRequest(request->requestId);
109 request->requestId = 0;
110 }
111 sendResponse();
112 cleanup();
113 }
114
115 bool
116 Ipc::Inquirer::doneAll() const
117 {
118 return pos == strands.end();
119 }
120
121 void
122 Ipc::Inquirer::handleException(const std::exception& e)
123 {
124 debugs(54, 3, e.what());
125 mustStop("exception");
126 }
127
128 void
129 Ipc::Inquirer::callException(const std::exception& e)
130 {
131 debugs(54, 3, MYNAME);
132 try {
133 handleException(e);
134 } catch (const std::exception& ex) {
135 debugs(54, DBG_CRITICAL, ex.what());
136 }
137 AsyncJob::callException(e);
138 }
139
140 /// returns and forgets the right Inquirer callback for strand request
141 AsyncCall::Pointer
142 Ipc::Inquirer::DequeueRequest(const RequestId::Index requestId)
143 {
144 debugs(54, 3, " requestId " << requestId);
145 Must(requestId != 0);
146 AsyncCall::Pointer call;
147 RequestsMap::iterator request = TheRequestsMap.find(requestId);
148 if (request != TheRequestsMap.end()) {
149 call = request->second;
150 Must(call != nullptr);
151 TheRequestsMap.erase(request);
152 }
153 return call;
154 }
155
156 void
157 Ipc::Inquirer::HandleRemoteAck(const Response& response)
158 {
159 Must(response.requestId != 0);
160 AsyncCall::Pointer call = DequeueRequest(response.requestId);
161 if (call != nullptr) {
162 HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
163 Must(dialer);
164 dialer->arg1 = response.clone();
165 ScheduleCallHere(call);
166 }
167 }
168
169 /// called when we are no longer waiting for the strand to respond
170 void
171 Ipc::Inquirer::removeTimeoutEvent()
172 {
173 if (eventFind(&Inquirer::RequestTimedOut, this))
174 eventDelete(&Inquirer::RequestTimedOut, this);
175 }
176
177 /// Ipc::Inquirer::requestTimedOut wrapper
178 void
179 Ipc::Inquirer::RequestTimedOut(void* param)
180 {
181 debugs(54, 3, MYNAME);
182 Must(param != nullptr);
183 Inquirer* cmi = static_cast<Inquirer*>(param);
184 // use async call to enable job call protection that time events lack
185 CallBack(cmi->codeContext, [&cmi] {
186 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
187 });
188 }
189
190 /// called when the strand failed to respond (or finish responding) in time
191 void
192 Ipc::Inquirer::requestTimedOut()
193 {
194 debugs(54, 3, MYNAME);
195 if (request->requestId != 0) {
196 DequeueRequest(request->requestId);
197 request->requestId = 0;
198 Must(!done()); // or we should not be called
199 ++pos; // advance after a failed inquiry
200 inquire();
201 }
202 }
203
204 const char*
205 Ipc::Inquirer::status() const
206 {
207 static MemBuf buf;
208 buf.reset();
209 buf.appendf(" [requestId %u]", request->requestId.index());
210 buf.terminate();
211 return buf.content();
212 }
213