*/
-#include "config.h"
+#include "squid.h"
+#include "base/Subscription.h"
#include "base/TextException.h"
#include "CacheManager.h"
#include "comm.h"
+#include "comm/Connection.h"
#include "ipc/Coordinator.h"
-#include "ipc/FdNotes.h"
#include "ipc/SharedListen.h"
#include "mgr/Inquirer.h"
#include "mgr/Request.h"
#include "mgr/Response.h"
-#include "mgr/StoreToCommWriter.h"
-#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
-
+#if SQUID_SNMP
+#include "snmp/Inquirer.h"
+#include "snmp/Request.h"
+#include "snmp/Response.h"
+#endif
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
if (i->tag == strand.tag) {
notifySearcher(*i, strand);
i = searchers.erase(i);
- } else {
+ } else {
++i;
- }
+ }
}
}
handleRegistrationRequest(HereIamMessage(message));
break;
- case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
- IpcIoRequest io(message);
- StrandSearchRequest sr;
- sr.requestorId = io.requestorId;
- sr.requestId = io.requestId;
- sr.tag.limitInit(io.buf, io.len);
- debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
+ case mtStrandSearchRequest: {
+ const StrandSearchRequest sr(message);
+ debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
+ " tag: " << sr.tag);
handleSearchRequest(sr);
break;
- }
+ }
case mtSharedListenRequest:
debugs(54, 6, HERE << "Shared listen request");
handleSharedListenRequest(SharedListenRequest(message));
break;
- case mtCacheMgrRequest:
+ case mtCacheMgrRequest: {
debugs(54, 6, HERE << "Cache manager request");
- handleCacheMgrRequest(Mgr::Request(message));
- break;
+ const Mgr::Request req(message);
+ handleCacheMgrRequest(req);
+ }
+ break;
- case mtCacheMgrResponse:
+ case mtCacheMgrResponse: {
debugs(54, 6, HERE << "Cache manager response");
- handleCacheMgrResponse(Mgr::Response(message));
- break;
+ const Mgr::Response resp(message);
+ handleCacheMgrResponse(resp);
+ }
+ break;
+
+#if SQUID_SNMP
+ case mtSnmpRequest: {
+ debugs(54, 6, HERE << "SNMP request");
+ const Snmp::Request req(message);
+ handleSnmpRequest(req);
+ }
+ break;
+
+ case mtSnmpResponse: {
+ debugs(54, 6, HERE << "SNMP response");
+ const Snmp::Response resp(message);
+ handleSnmpResponse(resp);
+ }
+ break;
+#endif
default:
debugs(54, 1, HERE << "Unhandled message type: " << message.type());
" needs shared listen FD for " << request.params.addr);
Listeners::const_iterator i = listeners.find(request.params);
int errNo = 0;
- const int sock = (i != listeners.end()) ?
- i->second : openListenSocket(request, errNo);
+ const Comm::ConnectionPointer c = (i != listeners.end()) ?
+ i->second : openListenSocket(request, errNo);
- debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+ debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
request.params.addr << " to kid" << request.requestorId <<
" mapId=" << request.mapId);
- SharedListenResponse response(sock, errNo, request.mapId);
+ SharedListenResponse response(c->fd, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
{
debugs(54, 4, HERE);
+ try {
+ Mgr::Action::Pointer action =
+ CacheManager::GetInstance()->createRequestedAction(request.params);
+ AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
+ } catch (const std::exception &ex) {
+ debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
+ request.params.actionName << ": " << ex.what());
+ // TODO: Avoid half-baked Connections or teach them how to close.
+ ::close(request.conn->fd);
+ request.conn->fd = -1;
+ return; // the worker will timeout and close
+ }
+
// Let the strand know that we are now responsible for handling the request
Mgr::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
- Mgr::Action::Pointer action =
- CacheManager::GetInstance()->createRequestedAction(request.params);
- AsyncJob::Start(new Mgr::Inquirer(action,
- Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
}
void
if (strand) {
notifySearcher(request, *strand);
return;
- }
+ }
searchers.push_back(request);
debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
- " who " << request.tag << " is");
+ " who " << request.tag << " is");
}
void
const StrandCoord& strand)
{
debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
- request.tag << " is kid" << strand.kidId);
- const StrandSearchResponse response0(request.requestId, strand);
- // XXX: we should use StrandSearchResponse instead of converting it
- IpcIoResponse io;
- io.diskId = strand.kidId;
- io.requestId = request.requestId;
- io.command = IpcIo::cmdOpen;
+ request.tag << " is kid" << strand.kidId);
+ const StrandSearchResponse response(strand);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+#if SQUID_SNMP
+void
+Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
+{
+ debugs(54, 4, HERE);
+
+ Snmp::Response response(request.requestId);
TypedMsgHdr message;
- io.pack(message);
+ response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+ AsyncJob::Start(new Snmp::Inquirer(request, strands_));
}
+void
+Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
+{
+ debugs(54, 4, HERE);
+ Snmp::Inquirer::HandleRemoteAck(response);
+}
+#endif
-int
+Comm::ConnectionPointer
Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
int &errNo)
{
debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
request.requestorId);
- Ip::Address addr = p.addr; // comm_open_listener may modify it
+ Comm::ConnectionPointer conn = new Comm::Connection;
+ conn->local = p.addr; // comm_open_listener may modify it
+ conn->flags = p.flags;
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
- FdNote(p.fdNote));
- errNo = (sock >= 0) ? 0 : errno;
+ comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+ errNo = Comm::IsConnOpen(conn) ? 0 : errno;
leave_suid();
+ debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+ request.requestorId);
+
// cache positive results
- if (sock >= 0)
- listeners[request.params] = sock;
+ if (Comm::IsConnOpen(conn))
+ listeners[request.params] = conn;
- return sock;
+ return conn;
}
void Ipc::Coordinator::broadcastSignal(int sig) const