*
*/
-
-#include "config.h"
+#include "squid.h"
#include "base/Subscription.h"
#include "base/TextException.h"
#include "CacheManager.h"
#include "mgr/Inquirer.h"
#include "mgr/Request.h"
#include "mgr/Response.h"
+#include "protos.h"
#if SQUID_SNMP
#include "snmp/Inquirer.h"
#include "snmp/Request.h"
#include "snmp/Response.h"
#endif
+#if HAVE_ERRNO_H
+#include <errno.h>
+#endif
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
-
Ipc::Coordinator::Coordinator():
Port(coordinatorAddr)
{
void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
{
- if (StrandCoord* found = findStrand(strand.kidId))
+ debugs(54, 3, HERE << "registering kid" << strand.kidId <<
+ ' ' << strand.tag);
+ if (StrandCoord* found = findStrand(strand.kidId)) {
+ const String oldTag = found->tag;
*found = strand;
- else
+ if (oldTag.size() && !strand.tag.size())
+ found->tag = oldTag; // keep more detailed info (XXX?)
+ } else {
strands_.push_back(strand);
+ }
+
+ // notify searchers waiting for this new strand, if any
+ typedef Searchers::iterator SRI;
+ for (SRI i = searchers.begin(); i != searchers.end();) {
+ if (i->tag == strand.tag) {
+ notifySearcher(*i, strand);
+ i = searchers.erase(i);
+ } else {
+ ++i;
+ }
+ }
}
void Ipc::Coordinator::receive(const TypedMsgHdr& message)
switch (message.type()) {
case mtRegistration:
debugs(54, 6, HERE << "Registration request");
- handleRegistrationRequest(StrandCoord(message));
+ handleRegistrationRequest(HereIamMessage(message));
break;
+ case mtStrandSearchRequest: {
+ const StrandSearchRequest sr(message);
+ debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
+ " tag: " << sr.tag);
+ handleSearchRequest(sr);
+ break;
+ }
+
case mtSharedListenRequest:
debugs(54, 6, HERE << "Shared listen request");
handleSharedListenRequest(SharedListenRequest(message));
#endif
default:
- debugs(54, 1, HERE << "Unhandled message type: " << message.type());
+ debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
break;
}
}
-void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
{
- registerStrand(strand);
+ registerStrand(msg.strand);
// send back an acknowledgement; TODO: remove as not needed?
TypedMsgHdr message;
- strand.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
+ msg.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
}
void
Listeners::const_iterator i = listeners.find(request.params);
int errNo = 0;
const Comm::ConnectionPointer c = (i != listeners.end()) ?
- i->second : openListenSocket(request, errNo);
+ i->second : openListenSocket(request, errNo);
debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
request.params.addr << " to kid" << request.requestorId <<
" mapId=" << request.mapId);
- SharedListenResponse response(c, errNo, request.mapId);
+ SharedListenResponse response(c->fd, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
{
debugs(54, 4, HERE);
+ try {
+ Mgr::Action::Pointer action =
+ CacheManager::GetInstance()->createRequestedAction(request.params);
+ AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
+ } catch (const std::exception &ex) {
+ debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" <<
+ request.params.actionName << ": " << ex.what());
+ // TODO: Avoid half-baked Connections or teach them how to close.
+ ::close(request.conn->fd);
+ request.conn->fd = -1;
+ return; // the worker will timeout and close
+ }
+
// Let the strand know that we are now responsible for handling the request
Mgr::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
- Mgr::Action::Pointer action =
- CacheManager::GetInstance()->createRequestedAction(request.params);
- AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
}
void
Mgr::Inquirer::HandleRemoteAck(response);
}
+void
+Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
+{
+ // do we know of a strand with the given search tag?
+ const StrandCoord *strand = NULL;
+ typedef StrandCoords::const_iterator SCCI;
+ for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
+ if (i->tag == request.tag)
+ strand = &(*i);
+ }
+
+ if (strand) {
+ notifySearcher(request, *strand);
+ return;
+ }
+
+ searchers.push_back(request);
+ debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
+ " who " << request.tag << " is");
+}
+
+void
+Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
+ const StrandCoord& strand)
+{
+ debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
+ request.tag << " is kid" << strand.kidId);
+ const StrandSearchResponse response(strand);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
#if SQUID_SNMP
void
Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)