Ipc::TypedMsgHdr msg;
msg.setType(Ipc::mtCollapsedForwardingNotification);
msg.putInt(KidIdentifier);
- const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId);
+ const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
Ipc::SendMessage(addr, msg);
}
ann.strand.tag = dbName;
Ipc::TypedMsgHdr message;
ann.pack(message);
- SendMessage(Ipc::coordinatorAddr, message);
+ SendMessage(Ipc::Port::CoordinatorAddr(), message);
ioRequestor->ioCompletedNotification();
return;
Ipc::TypedMsgHdr msg;
request.pack(msg);
- Ipc::SendMessage(Ipc::coordinatorAddr, msg);
+ Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
WaitingForOpen.push_back(this);
Ipc::TypedMsgHdr msg;
msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
msg.putInt(KidIdentifier);
- const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
+ const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
Ipc::SendMessage(addr, msg);
}
Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
Ipc::Coordinator::Coordinator():
- Port(coordinatorAddr)
+ Port(Ipc::Port::CoordinatorAddr())
{
}
// send back an acknowledgement; TODO: remove as not needed?
TypedMsgHdr message;
msg.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
+ SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
}
void
SharedListenResponse response(c->fd, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
void
Mgr::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
const StrandSearchResponse response(strand);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
}
#if SQUID_SNMP
Snmp::Response response(request.requestId);
TypedMsgHdr message;
response.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+ SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
AsyncJob::Start(new Snmp::Inquirer(request, strands_));
}
handleError();
}
- SendMessage(coordinatorAddr, message);
+ SendMessage(Ipc::Port::CoordinatorAddr(), message);
eventAdd("Ipc::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut,
this, timeout, 0, false);
}
TheRequestsMap[request->requestId] = callback;
TypedMsgHdr message;
request->pack(message);
- SendMessage(Port::MakeAddr(strandAddrPfx, kidId), message);
+ SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
this, timeout, 0, false);
}
#include "comm.h"
#include "comm/Connection.h"
#include "CommCalls.h"
+#include "globals.h"
#include "ipc/Port.h"
-const char Ipc::coordinatorAddr[] = DEFAULT_STATEDIR "/coordinator.ipc";
-const char Ipc::strandAddrPfx[] = DEFAULT_STATEDIR "/kid";
+static const char channelPathPfx[] = DEFAULT_STATEDIR "/";
+static const char coordinatorAddrLabel[] = "-coordinator";
+const char Ipc::strandAddrLabel[] = "-kid";
Ipc::Port::Port(const String& aListenAddr):
UdsOp(aListenAddr)
return false; // listen forever
}
-String Ipc::Port::MakeAddr(const char* pathAddr, int id)
+String Ipc::Port::MakeAddr(const char* processLabel, int id)
{
assert(id >= 0);
- String addr = pathAddr;
+ String addr = channelPathPfx;
+ addr.append(service_name);
+ addr.append(processLabel);
addr.append('-');
addr.append(xitoa(id));
addr.append(".ipc");
return addr;
}
+String
+Ipc::Port::CoordinatorAddr()
+{
+ static String coordinatorAddr;
+ if (!coordinatorAddr.size()) {
+ coordinatorAddr= channelPathPfx;
+ coordinatorAddr.append(service_name);
+ coordinatorAddr.append(coordinatorAddrLabel);
+ coordinatorAddr.append(".ipc");
+ }
+ return coordinatorAddr;
+}
+
void Ipc::Port::noteRead(const CommIoCbParams& params)
{
debugs(54, 6, HERE << params.conn << " flag " << params.flag <<
{
public:
Port(const String &aListenAddr);
- /// calculates IPC message address for strand #id at path
- static String MakeAddr(const char *path, int id);
+ /// calculates IPC message address for strand #id of processLabel type
+ static String MakeAddr(const char *proccessLabel, int id);
+
+ /// get the IPC message address for coordinator process
+ static String CoordinatorAddr();
protected:
virtual void start() = 0; // UdsOp (AsyncJob) API; has body
TypedMsgHdr buf; ///< msghdr struct filled by Comm
};
-extern const char coordinatorAddr[]; ///< where coordinator listens
-extern const char strandAddrPfx[]; ///< strand's listening address prefix
+extern const char strandAddrLabel[]; ///< strand's listening address unique label
} // namespace Ipc
TypedMsgHdr message;
request.pack(message);
- SendMessage(coordinatorAddr, message);
+ SendMessage(Ipc::Port::CoordinatorAddr(), message);
}
void Ipc::SharedListenJoined(const SharedListenResponse &response)
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
Ipc::Strand::Strand():
- Port(MakeAddr(strandAddrPfx, KidIdentifier)),
+ Port(MakeAddr(strandAddrLabel, KidIdentifier)),
isRegistered(false)
{
}
HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
TypedMsgHdr message;
ann.pack(message);
- SendMessage(coordinatorAddr, message);
+ SendMessage(Port::CoordinatorAddr(), message);
setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
}
Response response(requestId, this);
Ipc::TypedMsgHdr message;
response.pack(message);
- Ipc::SendMessage(Ipc::coordinatorAddr, message);
+ Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), message);
}
void
}
Ipc::TypedMsgHdr message;
response.pack(message);
- Ipc::SendMessage(Ipc::coordinatorAddr, message);
+ Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), message);
}
#define STUB_API "ipc/Port.cc"
#include "tests/STUB.h"
-const char Ipc::coordinatorAddr[] = "";
-const char Ipc::strandAddrPfx[] = "";
+const char Ipc::strandAddrLabel[] = "-kid";
String Ipc::Port::MakeAddr(char const*, int) STUB_RETVAL("")
+String Ipc::Port::CoordinatorAddr() STUB_RETVAL("")