]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/Inquirer.cc
transaction_initiator ACL for detecting various unusual transactions
[thirdparty/squid.git] / src / mgr / Inquirer.cc
1 /*
2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 16 Cache Manager API */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "comm/Write.h"
16 #include "CommCalls.h"
17 #include "errorpage.h"
18 #include "HttpReply.h"
19 #include "HttpRequest.h"
20 #include "ipc/UdsOp.h"
21 #include "mgr/ActionWriter.h"
22 #include "mgr/Command.h"
23 #include "mgr/Inquirer.h"
24 #include "mgr/IntParam.h"
25 #include "mgr/Request.h"
26 #include "mgr/Response.h"
27 #include "SquidTime.h"
28 #include <memory>
29 #include <algorithm>
30
31 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
32
33 Mgr::Inquirer::Inquirer(Action::Pointer anAction,
34 const Request &aCause, const Ipc::StrandCoords &coords):
35 Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
36 aggrAction(anAction)
37 {
38 conn = aCause.conn;
39 Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
40
41 debugs(16, 5, HERE << conn << " action: " << aggrAction);
42
43 closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
44 CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
45 comm_add_close_handler(conn->fd, closer);
46 }
47
48 /// closes our copy of the client HTTP connection socket
49 void
50 Mgr::Inquirer::cleanup()
51 {
52 if (Comm::IsConnOpen(conn)) {
53 removeCloseHandler();
54 conn->close();
55 }
56 }
57
58 void
59 Mgr::Inquirer::removeCloseHandler()
60 {
61 if (closer != NULL) {
62 comm_remove_close_handler(conn->fd, closer);
63 closer = NULL;
64 }
65 }
66
67 void
68 Mgr::Inquirer::start()
69 {
70 debugs(16, 5, HERE);
71 Ipc::Inquirer::start();
72 Must(Comm::IsConnOpen(conn));
73 Must(aggrAction != NULL);
74
75 std::unique_ptr<MemBuf> replyBuf;
76 if (strands.empty()) {
77 LOCAL_ARRAY(char, url, MAX_URL);
78 snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
79 const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIpc);
80 HttpRequest *req = HttpRequest::FromUrl(url, mx);
81 ErrorState err(ERR_INVALID_URL, Http::scNotFound, req);
82 std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
83 replyBuf.reset(reply->pack());
84 } else {
85 std::unique_ptr<HttpReply> reply(new HttpReply);
86 reply->setHeaders(Http::scOkay, NULL, "text/plain", -1, squid_curtime, squid_curtime);
87 reply->header.putStr(Http::HdrType::CONNECTION, "close"); // until we chunk response
88 replyBuf.reset(reply->pack());
89 }
90 writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
91 CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
92 Comm::Write(conn, replyBuf.get(), writer);
93 }
94
95 /// called when we wrote the response header
96 void
97 Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
98 {
99 debugs(16, 5, HERE);
100 writer = NULL;
101 Must(params.flag == Comm::OK);
102 Must(params.conn.getRaw() == conn.getRaw());
103 Must(params.size != 0);
104 // start inquiries at the initial pos
105 inquire();
106 }
107
108 /// called when the HTTP client or some external force closed our socket
109 void
110 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
111 {
112 debugs(16, 5, HERE);
113 Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
114 conn = NULL;
115 mustStop("commClosed");
116 }
117
118 bool
119 Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
120 {
121 Mgr::Response& response = static_cast<Response&>(*aResponse);
122 if (response.hasAction())
123 aggrAction->add(response.getAction());
124 return true;
125 }
126
127 void
128 Mgr::Inquirer::sendResponse()
129 {
130 if (!strands.empty() && aggrAction->aggregatable()) {
131 removeCloseHandler();
132 AsyncJob::Start(new ActionWriter(aggrAction, conn));
133 conn = NULL; // should not close because we passed it to ActionWriter
134 }
135 }
136
137 bool
138 Mgr::Inquirer::doneAll() const
139 {
140 return !writer && Ipc::Inquirer::doneAll();
141 }
142
143 Ipc::StrandCoords
144 Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
145 {
146 Ipc::StrandCoords sc;
147
148 QueryParam::Pointer processesParam = aParams.get("processes");
149 QueryParam::Pointer workersParam = aParams.get("workers");
150
151 if (processesParam == NULL || workersParam == NULL) {
152 if (processesParam != NULL) {
153 IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
154 if (param != NULL && param->type == QueryParam::ptInt) {
155 const std::vector<int>& processes = param->value();
156 for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
157 iter != aStrands.end(); ++iter) {
158 if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
159 sc.push_back(*iter);
160 }
161 }
162 } else if (workersParam != NULL) {
163 IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
164 if (param != NULL && param->type == QueryParam::ptInt) {
165 const std::vector<int>& workers = param->value();
166 for (int i = 0; i < (int)aStrands.size(); ++i) {
167 if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
168 sc.push_back(aStrands[i]);
169 }
170 }
171 } else {
172 sc = aStrands;
173 }
174 }
175
176 debugs(16, 4, HERE << "strands kid IDs = ");
177 for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
178 debugs(16, 4, HERE << iter->kidId);
179 }
180
181 return sc;
182 }
183