2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/TextException.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "mem/PoolingAllocator.h"
22 #include <unordered_map>
24 Ipc::RequestId::Index
Ipc::Inquirer::LastRequestId
= 0;
28 /// maps request->id to the Inquirer waiting for the response to that request
29 using InquirerPointer
= CbcPointer
<Inquirer
>;
30 using WaitingInquiriesItem
= std::pair
<const RequestId::Index
, InquirerPointer
>;
31 using WaitingInquiries
= std::unordered_map
<
34 std::hash
<RequestId::Index
>,
35 std::equal_to
<RequestId::Index
>,
36 PoolingAllocator
<WaitingInquiriesItem
> >;
38 /// pending Inquirer requests for this process
39 static WaitingInquiries TheWaitingInquirers
;
41 /// returns and forgets the Inquirer waiting for the given requests
42 static InquirerPointer
43 DequeueRequest(const RequestId::Index requestId
)
45 debugs(54, 3, "requestId " << requestId
);
46 Assure(requestId
!= 0);
47 const auto request
= TheWaitingInquirers
.find(requestId
);
48 if (request
!= TheWaitingInquirers
.end()) {
49 const auto inquirer
= request
->second
;
50 TheWaitingInquirers
.erase(request
);
51 return inquirer
; // may already be gone by now
58 /// compare Ipc::StrandCoord using kidId, for std::sort() below
60 LesserStrandByKidId(const Ipc::StrandCoord
&c1
, const Ipc::StrandCoord
&c2
)
62 return c1
.kidId
< c2
.kidId
;
65 Ipc::Inquirer::Inquirer(Request::Pointer aRequest
, const StrandCoords
& coords
,
67 AsyncJob("Ipc::Inquirer"),
68 codeContext(CodeContext::Current()),
69 request(aRequest
), strands(coords
), pos(strands
.begin()), timeout(aTimeout
)
71 debugs(54, 5, MYNAME
);
73 // order by ascending kid IDs; useful for non-aggregatable stats
74 std::sort(strands
.begin(), strands
.end(), LesserStrandByKidId
);
77 Ipc::Inquirer::~Inquirer()
79 debugs(54, 5, MYNAME
);
84 Ipc::Inquirer::cleanup()
89 Ipc::Inquirer::start()
91 request
->requestId
= 0;
95 Ipc::Inquirer::inquire()
97 if (pos
== strands
.end()) {
102 Must(request
->requestId
== 0);
103 if (++LastRequestId
== 0) // don't use zero value as request->requestId
105 request
->requestId
= LastRequestId
;
106 const int kidId
= pos
->kidId
;
107 debugs(54, 4, "inquire kid: " << kidId
<< status());
108 TheWaitingInquirers
[request
->requestId
] = this;
110 request
->pack(message
);
111 SendMessage(Port::MakeAddr(strandAddrLabel
, kidId
), message
);
112 eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut
,
113 this, timeout
, 0, false);
116 /// called when a strand is done writing its output
118 Ipc::Inquirer::handleRemoteAck(Response::Pointer response
)
120 debugs(54, 4, status());
121 request
->requestId
= 0;
122 removeTimeoutEvent();
123 if (aggregate(response
)) {
124 Must(!done()); // or we should not be called
125 ++pos
; // advance after a successful inquiry
133 Ipc::Inquirer::swanSong()
135 debugs(54, 5, MYNAME
);
136 removeTimeoutEvent();
137 if (request
->requestId
> 0) {
138 DequeueRequest(request
->requestId
);
139 request
->requestId
= 0;
146 Ipc::Inquirer::doneAll() const
148 return pos
== strands
.end();
152 Ipc::Inquirer::handleException(const std::exception
& e
)
154 debugs(54, 3, e
.what());
155 mustStop("exception");
159 Ipc::Inquirer::callException(const std::exception
& e
)
161 debugs(54, 3, MYNAME
);
164 } catch (const std::exception
& ex
) {
165 debugs(54, DBG_CRITICAL
, ex
.what());
167 AsyncJob::callException(e
);
171 Ipc::Inquirer::HandleRemoteAck(const Response
& response
)
173 Must(response
.requestId
!= 0);
174 const auto inquirer
= DequeueRequest(response
.requestId
);
175 if (inquirer
.valid()) {
176 CallService(inquirer
->codeContext
, [&] {
177 const auto call
= asyncCall(54, 5, "Ipc::Inquirer::handleRemoteAck",
178 JobMemFun(inquirer
, &Inquirer::handleRemoteAck
, response
.clone()));
179 ScheduleCallHere(call
);
184 /// called when we are no longer waiting for the strand to respond
186 Ipc::Inquirer::removeTimeoutEvent()
188 if (eventFind(&Inquirer::RequestTimedOut
, this))
189 eventDelete(&Inquirer::RequestTimedOut
, this);
192 /// Ipc::Inquirer::requestTimedOut wrapper
194 Ipc::Inquirer::RequestTimedOut(void* param
)
196 debugs(54, 3, MYNAME
);
197 Must(param
!= nullptr);
198 Inquirer
* cmi
= static_cast<Inquirer
*>(param
);
199 // use async call to enable job call protection that time events lack
200 CallBack(cmi
->codeContext
, [&cmi
] {
201 CallJobHere(54, 5, cmi
, Inquirer
, requestTimedOut
);
205 /// called when the strand failed to respond (or finish responding) in time
207 Ipc::Inquirer::requestTimedOut()
209 debugs(54, 3, MYNAME
);
210 if (request
->requestId
!= 0) {
211 DequeueRequest(request
->requestId
);
212 request
->requestId
= 0;
213 Must(!done()); // or we should not be called
214 ++pos
; // advance after a failed inquiry
220 Ipc::Inquirer::status() const
224 buf
.appendf(" [requestId %u]", request
->requestId
.index());
226 return buf
.content();