/*
- * $Id$
- *
* DEBUG: section 16 Cache Manager API
*
*/
-#include "config.h"
+#include "squid.h"
#include "base/TextException.h"
+#include "comm.h"
+#include "comm/Connection.h"
#include "comm/Write.h"
#include "CommCalls.h"
+#include "errorpage.h"
#include "HttpReply.h"
+#include "HttpRequest.h"
#include "ipc/UdsOp.h"
#include "mgr/ActionWriter.h"
+#include "mgr/Command.h"
#include "mgr/Inquirer.h"
+#include "mgr/IntParam.h"
#include "mgr/Request.h"
#include "mgr/Response.h"
#include "SquidTime.h"
#include <memory>
-
+#include <algorithm>
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
-
Mgr::Inquirer::Inquirer(Action::Pointer anAction,
const Request &aCause, const Ipc::StrandCoords &coords):
- Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100),
- aggrAction(anAction),
- fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket))
+ Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
+ aggrAction(anAction)
{
- debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction);
+ conn = aCause.conn;
+ Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
+
+ debugs(16, 5, HERE << conn << " action: " << aggrAction);
closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
- comm_add_close_handler(fd, closer);
+ comm_add_close_handler(conn->fd, closer);
}
/// closes our copy of the client HTTP connection socket
void
Mgr::Inquirer::cleanup()
{
- if (fd >= 0) {
+ if (Comm::IsConnOpen(conn)) {
removeCloseHandler();
- comm_close(fd);
- fd = -1;
+ conn->close();
}
}
Mgr::Inquirer::removeCloseHandler()
{
if (closer != NULL) {
- comm_remove_close_handler(fd, closer);
+ comm_remove_close_handler(conn->fd, closer);
closer = NULL;
}
}
{
debugs(16, 5, HERE);
Ipc::Inquirer::start();
- Must(fd >= 0);
+ Must(Comm::IsConnOpen(conn));
Must(aggrAction != NULL);
- std::auto_ptr<HttpReply> reply(new HttpReply);
- reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
- reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
- std::auto_ptr<MemBuf> replyBuf(reply->pack());
+#if HAVE_UNIQUE_PTR
+ std::unique_ptr<MemBuf> replyBuf;
+#else
+ std::auto_ptr<MemBuf> replyBuf;
+#endif
+ if (strands.empty()) {
+ LOCAL_ARRAY(char, url, MAX_URL);
+ snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
+ HttpRequest *req = HttpRequest::CreateFromUrl(url);
+ ErrorState err(ERR_INVALID_URL, Http::scNotFound, req);
+#if HAVE_UNIQUE_PTR
+ std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
+#else
+ std::auto_ptr<HttpReply> reply(err.BuildHttpReply());
+#endif
+ replyBuf.reset(reply->pack());
+ } else {
+#if HAVE_UNIQUE_PTR
+ std::unique_ptr<HttpReply> reply(new HttpReply);
+#else
+ std::auto_ptr<HttpReply> reply(new HttpReply);
+#endif
+ reply->setHeaders(Http::scOkay, NULL, "text/plain", -1, squid_curtime, squid_curtime);
+ reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
+ replyBuf.reset(reply->pack());
+ }
writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
- Comm::Write(fd, replyBuf.get(), writer);
+ Comm::Write(conn, replyBuf.get(), writer);
}
/// called when we wrote the response header
debugs(16, 5, HERE);
writer = NULL;
Must(params.flag == COMM_OK);
- Must(params.fd == fd);
+ Must(params.conn.getRaw() == conn.getRaw());
Must(params.size != 0);
// start inquiries at the initial pos
inquire();
Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
{
debugs(16, 5, HERE);
- Must(fd < 0 || fd == params.fd);
- fd = -1;
+ Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
+ conn = NULL;
mustStop("commClosed");
}
void
Mgr::Inquirer::sendResponse()
{
- if (aggrAction->aggregatable()) {
+ if (!strands.empty() && aggrAction->aggregatable()) {
removeCloseHandler();
- AsyncJob::Start(new ActionWriter(aggrAction, fd));
- fd = -1; // should not close fd because we passed it to ActionWriter
+ AsyncJob::Start(new ActionWriter(aggrAction, conn));
+ conn = NULL; // should not close because we passed it to ActionWriter
}
}
{
return !writer && Ipc::Inquirer::doneAll();
}
+
+Ipc::StrandCoords
+Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
+{
+ Ipc::StrandCoords sc;
+
+ QueryParam::Pointer processesParam = aParams.get("processes");
+ QueryParam::Pointer workersParam = aParams.get("workers");
+
+ if (processesParam == NULL || workersParam == NULL) {
+ if (processesParam != NULL) {
+ IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
+ if (param != NULL && param->type == QueryParam::ptInt) {
+ const std::vector<int>& processes = param->value();
+ for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
+ iter != aStrands.end(); ++iter) {
+ if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
+ sc.push_back(*iter);
+ }
+ }
+ } else if (workersParam != NULL) {
+ IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
+ if (param != NULL && param->type == QueryParam::ptInt) {
+ const std::vector<int>& workers = param->value();
+ for (int i = 0; i < (int)aStrands.size(); ++i) {
+ if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
+ sc.push_back(aStrands[i]);
+ }
+ }
+ } else {
+ sc = aStrands;
+ }
+ }
+
+ debugs(16, 4, HERE << "strands kid IDs = ");
+ for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
+ debugs(16, 4, HERE << iter->kidId);
+ }
+
+ return sc;
+}