]>
Commit | Line | Data |
---|---|---|
51ea0904 | 1 | /* |
bbc27441 | 2 | * Copyright (C) 1996-2014 The Squid Software Foundation and contributors |
51ea0904 | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
51ea0904 CT |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 54 Interprocess Communication */ |
10 | ||
f7f3304a | 11 | #include "squid.h" |
51ea0904 | 12 | #include "base/TextException.h" |
1b76e6c1 | 13 | #include "comm.h" |
51ea0904 CT |
14 | #include "comm/Write.h" |
15 | #include "ipc/Inquirer.h" | |
16 | #include "ipc/Port.h" | |
17 | #include "ipc/TypedMsgHdr.h" | |
18 | #include "MemBuf.h" | |
19 | #include <algorithm> | |
20 | ||
51ea0904 CT |
21 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer); |
22 | ||
23 | Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap; | |
24 | unsigned int Ipc::Inquirer::LastRequestId = 0; | |
25 | ||
26 | /// compare Ipc::StrandCoord using kidId, for std::sort() below | |
27 | static bool | |
28 | LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2) | |
29 | { | |
30 | return c1.kidId < c2.kidId; | |
31 | } | |
32 | ||
33 | Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords, | |
34 | double aTimeout): | |
f53969cc SM |
35 | AsyncJob("Ipc::Inquirer"), |
36 | request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout) | |
51ea0904 CT |
37 | { |
38 | debugs(54, 5, HERE); | |
39 | ||
40 | // order by ascending kid IDs; useful for non-aggregatable stats | |
41 | std::sort(strands.begin(), strands.end(), LesserStrandByKidId); | |
42 | } | |
43 | ||
44 | Ipc::Inquirer::~Inquirer() | |
45 | { | |
46 | debugs(54, 5, HERE); | |
47 | cleanup(); | |
48 | } | |
49 | ||
50 | void | |
51 | Ipc::Inquirer::cleanup() | |
52 | { | |
53 | } | |
54 | ||
55 | void | |
56 | Ipc::Inquirer::start() | |
57 | { | |
58 | request->requestId = 0; | |
59 | } | |
60 | ||
61 | void | |
62 | Ipc::Inquirer::inquire() | |
63 | { | |
64 | if (pos == strands.end()) { | |
65 | Must(done()); | |
66 | return; | |
67 | } | |
68 | ||
69 | Must(request->requestId == 0); | |
933a6aed | 70 | AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck", |
51ea0904 CT |
71 | HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL)); |
72 | if (++LastRequestId == 0) // don't use zero value as request->requestId | |
73 | ++LastRequestId; | |
74 | request->requestId = LastRequestId; | |
75 | const int kidId = pos->kidId; | |
76 | debugs(54, 4, HERE << "inquire kid: " << kidId << status()); | |
77 | TheRequestsMap[request->requestId] = callback; | |
78 | TypedMsgHdr message; | |
79 | request->pack(message); | |
1ee292b7 | 80 | SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message); |
51ea0904 CT |
81 | eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut, |
82 | this, timeout, 0, false); | |
83 | } | |
84 | ||
85 | /// called when a strand is done writing its output | |
86 | void | |
87 | Ipc::Inquirer::handleRemoteAck(Response::Pointer response) | |
88 | { | |
89 | debugs(54, 4, HERE << status()); | |
90 | request->requestId = 0; | |
91 | removeTimeoutEvent(); | |
92 | if (aggregate(response)) { | |
93 | Must(!done()); // or we should not be called | |
94 | ++pos; // advance after a successful inquiry | |
95 | inquire(); | |
96 | } else { | |
97 | mustStop("error"); | |
98 | } | |
99 | } | |
100 | ||
101 | void | |
102 | Ipc::Inquirer::swanSong() | |
103 | { | |
104 | debugs(54, 5, HERE); | |
105 | removeTimeoutEvent(); | |
106 | if (request->requestId > 0) { | |
107 | DequeueRequest(request->requestId); | |
108 | request->requestId = 0; | |
109 | } | |
110 | sendResponse(); | |
111 | cleanup(); | |
112 | } | |
113 | ||
114 | bool | |
115 | Ipc::Inquirer::doneAll() const | |
116 | { | |
117 | return pos == strands.end(); | |
118 | } | |
119 | ||
120 | void | |
121 | Ipc::Inquirer::handleException(const std::exception& e) | |
122 | { | |
123 | debugs(54, 3, HERE << e.what()); | |
124 | mustStop("exception"); | |
125 | } | |
126 | ||
127 | void | |
128 | Ipc::Inquirer::callException(const std::exception& e) | |
129 | { | |
130 | debugs(54, 3, HERE); | |
131 | try { | |
132 | handleException(e); | |
133 | } catch (const std::exception& ex) { | |
134 | debugs(54, DBG_CRITICAL, HERE << ex.what()); | |
135 | } | |
136 | AsyncJob::callException(e); | |
137 | } | |
138 | ||
139 | /// returns and forgets the right Inquirer callback for strand request | |
140 | AsyncCall::Pointer | |
141 | Ipc::Inquirer::DequeueRequest(unsigned int requestId) | |
142 | { | |
143 | debugs(54, 3, HERE << " requestId " << requestId); | |
144 | Must(requestId != 0); | |
145 | AsyncCall::Pointer call; | |
146 | RequestsMap::iterator request = TheRequestsMap.find(requestId); | |
147 | if (request != TheRequestsMap.end()) { | |
148 | call = request->second; | |
149 | Must(call != NULL); | |
150 | TheRequestsMap.erase(request); | |
151 | } | |
152 | return call; | |
153 | } | |
154 | ||
155 | void | |
156 | Ipc::Inquirer::HandleRemoteAck(const Response& response) | |
157 | { | |
158 | Must(response.requestId != 0); | |
159 | AsyncCall::Pointer call = DequeueRequest(response.requestId); | |
160 | if (call != NULL) { | |
161 | HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer()); | |
162 | Must(dialer); | |
163 | dialer->arg1 = response.clone(); | |
164 | ScheduleCallHere(call); | |
165 | } | |
166 | } | |
167 | ||
168 | /// called when we are no longer waiting for the strand to respond | |
169 | void | |
170 | Ipc::Inquirer::removeTimeoutEvent() | |
171 | { | |
172 | if (eventFind(&Inquirer::RequestTimedOut, this)) | |
173 | eventDelete(&Inquirer::RequestTimedOut, this); | |
174 | } | |
175 | ||
176 | /// Ipc::Inquirer::requestTimedOut wrapper | |
177 | void | |
178 | Ipc::Inquirer::RequestTimedOut(void* param) | |
179 | { | |
180 | debugs(54, 3, HERE); | |
181 | Must(param != NULL); | |
182 | Inquirer* cmi = static_cast<Inquirer*>(param); | |
183 | // use async call to enable job call protection that time events lack | |
933a6aed | 184 | CallJobHere(54, 5, cmi, Inquirer, requestTimedOut); |
51ea0904 CT |
185 | } |
186 | ||
187 | /// called when the strand failed to respond (or finish responding) in time | |
188 | void | |
189 | Ipc::Inquirer::requestTimedOut() | |
190 | { | |
191 | debugs(54, 3, HERE); | |
192 | if (request->requestId != 0) { | |
193 | DequeueRequest(request->requestId); | |
194 | request->requestId = 0; | |
195 | Must(!done()); // or we should not be called | |
196 | ++pos; // advance after a failed inquiry | |
197 | inquire(); | |
198 | } | |
199 | } | |
200 | ||
201 | const char* | |
202 | Ipc::Inquirer::status() const | |
203 | { | |
204 | static MemBuf buf; | |
205 | buf.reset(); | |
206 | buf.Printf(" [request->requestId %u]", request->requestId); | |
207 | buf.terminate(); | |
208 | return buf.content(); | |
209 | } | |
f53969cc | 210 |