/*
- * $Id$
- *
- * DEBUG: section 54 Interprocess Communication
+ * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
+/* DEBUG: section 54 Interprocess Communication */
-#include "config.h"
+#include "squid.h"
#include "base/Subscription.h"
#include "base/TextException.h"
#include "CacheManager.h"
#include "mgr/Inquirer.h"
#include "mgr/Request.h"
#include "mgr/Response.h"
+#include "tools.h"
#if SQUID_SNMP
#include "snmp/Inquirer.h"
#include "snmp/Request.h"
#include "snmp/Response.h"
#endif
+#include <cerrno>
+
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
-
Ipc::Coordinator::Coordinator():
- Port(coordinatorAddr)
+ Port(Ipc::Port::CoordinatorAddr())
{
}
#endif
default:
- debugs(54, 1, HERE << "Unhandled message type: " << message.type());
+ debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
break;
}
}
// send back an acknowledgement; TODO: remove as not needed?
TypedMsgHdr message;
msg.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
+ SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
}
void
SharedListenResponse response(c->fd, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
void
{
debugs(54, 4, HERE);
+ try {
+ Mgr::Action::Pointer action =
+ CacheManager::GetInstance()->createRequestedAction(request.params);
+ AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
+ } catch (const std::exception &ex) {
+ debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
+ request.params.actionName << ": " << ex.what());
+ // TODO: Avoid half-baked Connections or teach them how to close.
+ ::close(request.conn->fd);
+ request.conn->fd = -1;
+ return; // the worker will timeout and close
+ }
+
// Let the strand know that we are now responsible for handling the request
Mgr::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
- Mgr::Action::Pointer action =
- CacheManager::GetInstance()->createRequestedAction(request.params);
- AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
}
void
const StrandSearchResponse response(strand);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
#if SQUID_SNMP
Snmp::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
AsyncJob::Start(new Snmp::Inquirer(request, strands_));
}
debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
request.requestorId);
- Comm::ConnectionPointer conn = new Comm::Connection;
- conn->local = p.addr; // comm_open_listener may modify it
- conn->flags = p.flags;
+ Comm::ConnectionPointer newConn = new Comm::Connection;
+ newConn->local = p.addr; // comm_open_listener may modify it
+ newConn->flags = p.flags;
enter_suid();
- comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
- errNo = Comm::IsConnOpen(conn) ? 0 : errno;
+ comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
+ errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
leave_suid();
- debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+ debugs(54, 6, HERE << "tried listening on " << newConn << " for kid" <<
request.requestorId);
// cache positive results
- if (Comm::IsConnOpen(conn))
- listeners[request.params] = conn;
+ if (Comm::IsConnOpen(newConn))
+ listeners[request.params] = newConn;
- return conn;
+ return newConn;
}
void Ipc::Coordinator::broadcastSignal(int sig) const
{
return strands_;
}
+