*
*/
-#include "config.h"
+#include "squid.h"
+#include "base/Subscription.h"
+#include "base/TextException.h"
+#include "comm/Connection.h"
+#include "globals.h"
#include "ipc/Strand.h"
+#include "ipc/StrandCoord.h"
#include "ipc/Messages.h"
#include "ipc/SharedListen.h"
+#include "ipc/StrandSearch.h"
#include "ipc/Kids.h"
-
+#include "mgr/Request.h"
+#include "mgr/Response.h"
+#include "mgr/Forwarder.h"
+#include "SwapDir.h" /* XXX: scope boundary violation */
+#include "CacheManager.h"
+#if USE_DISKIO_IPCIO
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
+#endif
+#if SQUID_SNMP
+#include "snmp/Forwarder.h"
+#include "snmp/Request.h"
+#include "snmp/Response.h"
+#endif
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
-
Ipc::Strand::Strand():
- Port(MakeAddr(strandAddrPfx, KidIdentifier)),
- isRegistered(false)
+ Port(MakeAddr(strandAddrPfx, KidIdentifier)),
+ isRegistered(false)
{
}
{
debugs(54, 6, HERE);
Must(!isRegistered);
+
+ HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
TypedMsgHdr message;
- StrandCoord(KidIdentifier, getpid()).pack(message);
+ ann.pack(message);
SendMessage(coordinatorAddr, message);
setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
}
switch (message.type()) {
case mtRegistration:
- handleRegistrationResponse(StrandCoord(message));
+ handleRegistrationResponse(HereIamMessage(message));
break;
case mtSharedListenResponse:
SharedListenJoined(SharedListenResponse(message));
break;
- case mtDescriptorPut:
- putDescriptor(Descriptor(message));
+#if USE_DISKIO_IPCIO
+ case mtStrandSearchResponse:
+ IpcIoFile::HandleOpenResponse(StrandSearchResponse(message));
break;
+ case mtIpcIoNotification:
+ IpcIoFile::HandleNotification(message);
+ break;
+#endif /* USE_DISKIO_IPCIO */
+
+ case mtCacheMgrRequest: {
+ const Mgr::Request req(message);
+ handleCacheMgrRequest(req);
+ }
+ break;
+
+ case mtCacheMgrResponse: {
+ const Mgr::Response resp(message);
+ handleCacheMgrResponse(resp);
+ }
+ break;
+
+#if SQUID_SNMP
+ case mtSnmpRequest: {
+ const Snmp::Request req(message);
+ handleSnmpRequest(req);
+ }
+ break;
+
+ case mtSnmpResponse: {
+ const Snmp::Response resp(message);
+ handleSnmpResponse(resp);
+ }
+ break;
+#endif
+
default:
- debugs(54, 1, HERE << "Unhandled message type: " << message.type());
+ debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type());
break;
}
}
-void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
+void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
{
// handle registration response from the coordinator; it could be stale
- if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
+ if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
debugs(54, 6, "kid" << KidIdentifier << " registered");
clearTimeout(); // we are done
-
- debugs(54, 6, HERE << "requesting FD");
- Descriptor request(KidIdentifier, -1);
- TypedMsgHdr message;
- request.pack(message);
- SendMessage(coordinatorAddr, message);
} else {
// could be an ACK to the registration message of our dead predecessor
debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
}
}
-/// receive descriptor we asked for
-void Ipc::Strand::putDescriptor(const Descriptor &message)
+void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request& request)
+{
+ Mgr::Action::Pointer action =
+ CacheManager::GetInstance()->createRequestedAction(request.params);
+ action->respond(request);
+}
+
+void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response)
+{
+ Mgr::Forwarder::HandleRemoteAck(response.requestId);
+}
+
+#if SQUID_SNMP
+void Ipc::Strand::handleSnmpRequest(const Snmp::Request& request)
+{
+ debugs(54, 6, HERE);
+ Snmp::SendResponse(request.requestId, request.pdu);
+}
+
+void Ipc::Strand::handleSnmpResponse(const Snmp::Response& response)
{
- debugs(54, 6, HERE << "got FD " << message.fd);
- char buffer[64];
- const int n = snprintf(buffer, sizeof(buffer), "strand: kid%d wrote using FD %d\n", KidIdentifier, message.fd);
- ssize_t bytes = write(message.fd, buffer, n);
- Must(bytes == n);
+ debugs(54, 6, HERE);
+ Snmp::Forwarder::HandleRemoteAck(response.requestId);
}
+#endif
void Ipc::Strand::timedout()
{