]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/Inquirer.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / mgr / Inquirer.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 16 Cache Manager API
5 *
6 */
7
8 #include "squid.h"
9 #include "base/TextException.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "comm/Write.h"
13 #include "CommCalls.h"
14 #include "HttpReply.h"
15 #include "HttpRequest.h"
16 #include "ipc/UdsOp.h"
17 #include "mgr/ActionWriter.h"
18 #include "mgr/IntParam.h"
19 #include "mgr/Inquirer.h"
20 #include "mgr/Command.h"
21 #include "mgr/Request.h"
22 #include "mgr/Response.h"
23 #include "SquidTime.h"
24 #include "errorpage.h"
25 #include <memory>
26 #include <algorithm>
27
28 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
29
30 Mgr::Inquirer::Inquirer(Action::Pointer anAction,
31 const Request &aCause, const Ipc::StrandCoords &coords):
32 Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
33 aggrAction(anAction)
34 {
35 conn = aCause.conn;
36 Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
37
38 debugs(16, 5, HERE << conn << " action: " << aggrAction);
39
40 closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
41 CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
42 comm_add_close_handler(conn->fd, closer);
43 }
44
45 /// closes our copy of the client HTTP connection socket
46 void
47 Mgr::Inquirer::cleanup()
48 {
49 if (Comm::IsConnOpen(conn)) {
50 removeCloseHandler();
51 conn->close();
52 }
53 }
54
55 void
56 Mgr::Inquirer::removeCloseHandler()
57 {
58 if (closer != NULL) {
59 comm_remove_close_handler(conn->fd, closer);
60 closer = NULL;
61 }
62 }
63
64 void
65 Mgr::Inquirer::start()
66 {
67 debugs(16, 5, HERE);
68 Ipc::Inquirer::start();
69 Must(Comm::IsConnOpen(conn));
70 Must(aggrAction != NULL);
71
72 #if HAVE_UNIQUE_PTR
73 std::unique_ptr<MemBuf> replyBuf;
74 #else
75 std::auto_ptr<MemBuf> replyBuf;
76 #endif
77 if (strands.empty()) {
78 LOCAL_ARRAY(char, url, MAX_URL);
79 snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
80 HttpRequest *req = HttpRequest::CreateFromUrl(url);
81 ErrorState err(ERR_INVALID_URL, HTTP_NOT_FOUND, req);
82 #if HAVE_UNIQUE_PTR
83 std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
84 #else
85 std::auto_ptr<HttpReply> reply(err.BuildHttpReply());
86 #endif
87 replyBuf.reset(reply->pack());
88 } else {
89 #if HAVE_UNIQUE_PTR
90 std::unique_ptr<HttpReply> reply(new HttpReply);
91 #else
92 std::auto_ptr<HttpReply> reply(new HttpReply);
93 #endif
94 reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
95 reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
96 replyBuf.reset(reply->pack());
97 }
98 writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
99 CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
100 Comm::Write(conn, replyBuf.get(), writer);
101 }
102
103 /// called when we wrote the response header
104 void
105 Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
106 {
107 debugs(16, 5, HERE);
108 writer = NULL;
109 Must(params.flag == COMM_OK);
110 Must(params.conn.getRaw() == conn.getRaw());
111 Must(params.size != 0);
112 // start inquiries at the initial pos
113 inquire();
114 }
115
116 /// called when the HTTP client or some external force closed our socket
117 void
118 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
119 {
120 debugs(16, 5, HERE);
121 Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
122 conn = NULL;
123 mustStop("commClosed");
124 }
125
126 bool
127 Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
128 {
129 Mgr::Response& response = static_cast<Response&>(*aResponse);
130 if (response.hasAction())
131 aggrAction->add(response.getAction());
132 return true;
133 }
134
135 void
136 Mgr::Inquirer::sendResponse()
137 {
138 if (!strands.empty() && aggrAction->aggregatable()) {
139 removeCloseHandler();
140 AsyncJob::Start(new ActionWriter(aggrAction, conn));
141 conn = NULL; // should not close because we passed it to ActionWriter
142 }
143 }
144
145 bool
146 Mgr::Inquirer::doneAll() const
147 {
148 return !writer && Ipc::Inquirer::doneAll();
149 }
150
151 Ipc::StrandCoords
152 Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
153 {
154 Ipc::StrandCoords sc;
155
156 QueryParam::Pointer processesParam = aParams.get("processes");
157 QueryParam::Pointer workersParam = aParams.get("workers");
158
159 if (processesParam == NULL || workersParam == NULL) {
160 if (processesParam != NULL) {
161 IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
162 if (param != NULL && param->type == QueryParam::ptInt) {
163 const std::vector<int>& processes = param->value();
164 for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
165 iter != aStrands.end(); ++iter) {
166 if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
167 sc.push_back(*iter);
168 }
169 }
170 } else if (workersParam != NULL) {
171 IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
172 if (param != NULL && param->type == QueryParam::ptInt) {
173 const std::vector<int>& workers = param->value();
174 for (int i = 0; i < (int)aStrands.size(); ++i) {
175 if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
176 sc.push_back(aStrands[i]);
177 }
178 }
179 } else {
180 sc = aStrands;
181 }
182 }
183
184 debugs(16, 4, HERE << "strands kid IDs = ");
185 for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
186 debugs(16, 4, HERE << iter->kidId);
187 }
188
189 return sc;
190 }