]>
Commit | Line | Data |
---|---|---|
51ea0904 CT |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
f7f3304a | 8 | #include "squid.h" |
51ea0904 | 9 | #include "base/TextException.h" |
1b76e6c1 | 10 | #include "comm.h" |
51ea0904 CT |
11 | #include "comm/Write.h" |
12 | #include "ipc/Inquirer.h" | |
13 | #include "ipc/Port.h" | |
14 | #include "ipc/TypedMsgHdr.h" | |
15 | #include "MemBuf.h" | |
16 | #include <algorithm> | |
17 | ||
51ea0904 CT |
18 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer); |
19 | ||
20 | Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap; | |
21 | unsigned int Ipc::Inquirer::LastRequestId = 0; | |
22 | ||
23 | /// compare Ipc::StrandCoord using kidId, for std::sort() below | |
24 | static bool | |
25 | LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2) | |
26 | { | |
27 | return c1.kidId < c2.kidId; | |
28 | } | |
29 | ||
30 | Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords, | |
31 | double aTimeout): | |
8fb5a96c CT |
32 | AsyncJob("Ipc::Inquirer"), |
33 | request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout) | |
51ea0904 CT |
34 | { |
35 | debugs(54, 5, HERE); | |
36 | ||
37 | // order by ascending kid IDs; useful for non-aggregatable stats | |
38 | std::sort(strands.begin(), strands.end(), LesserStrandByKidId); | |
39 | } | |
40 | ||
41 | Ipc::Inquirer::~Inquirer() | |
42 | { | |
43 | debugs(54, 5, HERE); | |
44 | cleanup(); | |
45 | } | |
46 | ||
47 | void | |
48 | Ipc::Inquirer::cleanup() | |
49 | { | |
50 | } | |
51 | ||
52 | void | |
53 | Ipc::Inquirer::start() | |
54 | { | |
55 | request->requestId = 0; | |
56 | } | |
57 | ||
58 | void | |
59 | Ipc::Inquirer::inquire() | |
60 | { | |
61 | if (pos == strands.end()) { | |
62 | Must(done()); | |
63 | return; | |
64 | } | |
65 | ||
66 | Must(request->requestId == 0); | |
933a6aed | 67 | AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck", |
51ea0904 CT |
68 | HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL)); |
69 | if (++LastRequestId == 0) // don't use zero value as request->requestId | |
70 | ++LastRequestId; | |
71 | request->requestId = LastRequestId; | |
72 | const int kidId = pos->kidId; | |
73 | debugs(54, 4, HERE << "inquire kid: " << kidId << status()); | |
74 | TheRequestsMap[request->requestId] = callback; | |
75 | TypedMsgHdr message; | |
76 | request->pack(message); | |
77 | SendMessage(Port::MakeAddr(strandAddrPfx, kidId), message); | |
78 | eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut, | |
79 | this, timeout, 0, false); | |
80 | } | |
81 | ||
82 | /// called when a strand is done writing its output | |
83 | void | |
84 | Ipc::Inquirer::handleRemoteAck(Response::Pointer response) | |
85 | { | |
86 | debugs(54, 4, HERE << status()); | |
87 | request->requestId = 0; | |
88 | removeTimeoutEvent(); | |
89 | if (aggregate(response)) { | |
90 | Must(!done()); // or we should not be called | |
91 | ++pos; // advance after a successful inquiry | |
92 | inquire(); | |
93 | } else { | |
94 | mustStop("error"); | |
95 | } | |
96 | } | |
97 | ||
98 | void | |
99 | Ipc::Inquirer::swanSong() | |
100 | { | |
101 | debugs(54, 5, HERE); | |
102 | removeTimeoutEvent(); | |
103 | if (request->requestId > 0) { | |
104 | DequeueRequest(request->requestId); | |
105 | request->requestId = 0; | |
106 | } | |
107 | sendResponse(); | |
108 | cleanup(); | |
109 | } | |
110 | ||
111 | bool | |
112 | Ipc::Inquirer::doneAll() const | |
113 | { | |
114 | return pos == strands.end(); | |
115 | } | |
116 | ||
117 | void | |
118 | Ipc::Inquirer::handleException(const std::exception& e) | |
119 | { | |
120 | debugs(54, 3, HERE << e.what()); | |
121 | mustStop("exception"); | |
122 | } | |
123 | ||
124 | void | |
125 | Ipc::Inquirer::callException(const std::exception& e) | |
126 | { | |
127 | debugs(54, 3, HERE); | |
128 | try { | |
129 | handleException(e); | |
130 | } catch (const std::exception& ex) { | |
131 | debugs(54, DBG_CRITICAL, HERE << ex.what()); | |
132 | } | |
133 | AsyncJob::callException(e); | |
134 | } | |
135 | ||
136 | /// returns and forgets the right Inquirer callback for strand request | |
137 | AsyncCall::Pointer | |
138 | Ipc::Inquirer::DequeueRequest(unsigned int requestId) | |
139 | { | |
140 | debugs(54, 3, HERE << " requestId " << requestId); | |
141 | Must(requestId != 0); | |
142 | AsyncCall::Pointer call; | |
143 | RequestsMap::iterator request = TheRequestsMap.find(requestId); | |
144 | if (request != TheRequestsMap.end()) { | |
145 | call = request->second; | |
146 | Must(call != NULL); | |
147 | TheRequestsMap.erase(request); | |
148 | } | |
149 | return call; | |
150 | } | |
151 | ||
152 | void | |
153 | Ipc::Inquirer::HandleRemoteAck(const Response& response) | |
154 | { | |
155 | Must(response.requestId != 0); | |
156 | AsyncCall::Pointer call = DequeueRequest(response.requestId); | |
157 | if (call != NULL) { | |
158 | HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer()); | |
159 | Must(dialer); | |
160 | dialer->arg1 = response.clone(); | |
161 | ScheduleCallHere(call); | |
162 | } | |
163 | } | |
164 | ||
165 | /// called when we are no longer waiting for the strand to respond | |
166 | void | |
167 | Ipc::Inquirer::removeTimeoutEvent() | |
168 | { | |
169 | if (eventFind(&Inquirer::RequestTimedOut, this)) | |
170 | eventDelete(&Inquirer::RequestTimedOut, this); | |
171 | } | |
172 | ||
173 | /// Ipc::Inquirer::requestTimedOut wrapper | |
174 | void | |
175 | Ipc::Inquirer::RequestTimedOut(void* param) | |
176 | { | |
177 | debugs(54, 3, HERE); | |
178 | Must(param != NULL); | |
179 | Inquirer* cmi = static_cast<Inquirer*>(param); | |
180 | // use async call to enable job call protection that time events lack | |
933a6aed | 181 | CallJobHere(54, 5, cmi, Inquirer, requestTimedOut); |
51ea0904 CT |
182 | } |
183 | ||
184 | /// called when the strand failed to respond (or finish responding) in time | |
185 | void | |
186 | Ipc::Inquirer::requestTimedOut() | |
187 | { | |
188 | debugs(54, 3, HERE); | |
189 | if (request->requestId != 0) { | |
190 | DequeueRequest(request->requestId); | |
191 | request->requestId = 0; | |
192 | Must(!done()); // or we should not be called | |
193 | ++pos; // advance after a failed inquiry | |
194 | inquire(); | |
195 | } | |
196 | } | |
197 | ||
198 | const char* | |
199 | Ipc::Inquirer::status() const | |
200 | { | |
201 | static MemBuf buf; | |
202 | buf.reset(); | |
203 | buf.Printf(" [request->requestId %u]", request->requestId); | |
204 | buf.terminate(); | |
205 | return buf.content(); | |
206 | } |