]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Inquirer.cc
Source Format Enforcement (#1234)
[thirdparty/squid.git] / src / ipc / Inquirer.cc
1 /*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Interprocess Communication */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "mem/PoolingAllocator.h"
19 #include "MemBuf.h"
20
21 #include <algorithm>
22 #include <unordered_map>
23
24 Ipc::RequestId::Index Ipc::Inquirer::LastRequestId = 0;
25
26 namespace Ipc {
27
28 /// maps request->id to the Inquirer waiting for the response to that request
29 using InquirerPointer = CbcPointer<Inquirer>;
30 using WaitingInquiriesItem = std::pair<const RequestId::Index, InquirerPointer>;
31 using WaitingInquiries = std::unordered_map<
32 RequestId::Index,
33 InquirerPointer,
34 std::hash<RequestId::Index>,
35 std::equal_to<RequestId::Index>,
36 PoolingAllocator<WaitingInquiriesItem> >;
37
38 /// pending Inquirer requests for this process
39 static WaitingInquiries TheWaitingInquirers;
40
41 /// returns and forgets the Inquirer waiting for the given requests
42 static InquirerPointer
43 DequeueRequest(const RequestId::Index requestId)
44 {
45 debugs(54, 3, "requestId " << requestId);
46 Assure(requestId != 0);
47 const auto request = TheWaitingInquirers.find(requestId);
48 if (request != TheWaitingInquirers.end()) {
49 const auto inquirer = request->second;
50 TheWaitingInquirers.erase(request);
51 return inquirer; // may already be gone by now
52 }
53 return nullptr;
54 }
55
56 } // namespace Ipc
57
58 /// compare Ipc::StrandCoord using kidId, for std::sort() below
59 static bool
60 LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
61 {
62 return c1.kidId < c2.kidId;
63 }
64
65 Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
66 double aTimeout):
67 AsyncJob("Ipc::Inquirer"),
68 codeContext(CodeContext::Current()),
69 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
70 {
71 debugs(54, 5, MYNAME);
72
73 // order by ascending kid IDs; useful for non-aggregatable stats
74 std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
75 }
76
77 Ipc::Inquirer::~Inquirer()
78 {
79 debugs(54, 5, MYNAME);
80 cleanup();
81 }
82
83 void
84 Ipc::Inquirer::cleanup()
85 {
86 }
87
88 void
89 Ipc::Inquirer::start()
90 {
91 request->requestId = 0;
92 }
93
94 void
95 Ipc::Inquirer::inquire()
96 {
97 if (pos == strands.end()) {
98 Must(done());
99 return;
100 }
101
102 Must(request->requestId == 0);
103 if (++LastRequestId == 0) // don't use zero value as request->requestId
104 ++LastRequestId;
105 request->requestId = LastRequestId;
106 const int kidId = pos->kidId;
107 debugs(54, 4, "inquire kid: " << kidId << status());
108 TheWaitingInquirers[request->requestId] = this;
109 TypedMsgHdr message;
110 request->pack(message);
111 SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
112 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
113 this, timeout, 0, false);
114 }
115
116 /// called when a strand is done writing its output
117 void
118 Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
119 {
120 debugs(54, 4, status());
121 request->requestId = 0;
122 removeTimeoutEvent();
123 if (aggregate(response)) {
124 Must(!done()); // or we should not be called
125 ++pos; // advance after a successful inquiry
126 inquire();
127 } else {
128 mustStop("error");
129 }
130 }
131
132 void
133 Ipc::Inquirer::swanSong()
134 {
135 debugs(54, 5, MYNAME);
136 removeTimeoutEvent();
137 if (request->requestId > 0) {
138 DequeueRequest(request->requestId);
139 request->requestId = 0;
140 }
141 sendResponse();
142 cleanup();
143 }
144
145 bool
146 Ipc::Inquirer::doneAll() const
147 {
148 return pos == strands.end();
149 }
150
151 void
152 Ipc::Inquirer::handleException(const std::exception& e)
153 {
154 debugs(54, 3, e.what());
155 mustStop("exception");
156 }
157
158 void
159 Ipc::Inquirer::callException(const std::exception& e)
160 {
161 debugs(54, 3, MYNAME);
162 try {
163 handleException(e);
164 } catch (const std::exception& ex) {
165 debugs(54, DBG_CRITICAL, ex.what());
166 }
167 AsyncJob::callException(e);
168 }
169
170 void
171 Ipc::Inquirer::HandleRemoteAck(const Response& response)
172 {
173 Must(response.requestId != 0);
174 const auto inquirer = DequeueRequest(response.requestId);
175 if (inquirer.valid()) {
176 CallService(inquirer->codeContext, [&] {
177 const auto call = asyncCall(54, 5, "Ipc::Inquirer::handleRemoteAck",
178 JobMemFun(inquirer, &Inquirer::handleRemoteAck, response.clone()));
179 ScheduleCallHere(call);
180 });
181 }
182 }
183
184 /// called when we are no longer waiting for the strand to respond
185 void
186 Ipc::Inquirer::removeTimeoutEvent()
187 {
188 if (eventFind(&Inquirer::RequestTimedOut, this))
189 eventDelete(&Inquirer::RequestTimedOut, this);
190 }
191
192 /// Ipc::Inquirer::requestTimedOut wrapper
193 void
194 Ipc::Inquirer::RequestTimedOut(void* param)
195 {
196 debugs(54, 3, MYNAME);
197 Must(param != nullptr);
198 Inquirer* cmi = static_cast<Inquirer*>(param);
199 // use async call to enable job call protection that time events lack
200 CallBack(cmi->codeContext, [&cmi] {
201 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
202 });
203 }
204
205 /// called when the strand failed to respond (or finish responding) in time
206 void
207 Ipc::Inquirer::requestTimedOut()
208 {
209 debugs(54, 3, MYNAME);
210 if (request->requestId != 0) {
211 DequeueRequest(request->requestId);
212 request->requestId = 0;
213 Must(!done()); // or we should not be called
214 ++pos; // advance after a failed inquiry
215 inquire();
216 }
217 }
218
219 const char*
220 Ipc::Inquirer::status() const
221 {
222 static MemBuf buf;
223 buf.reset();
224 buf.appendf(" [requestId %u]", request->requestId.index());
225 buf.terminate();
226 return buf.content();
227 }
228