#include "ipc/TypedMsgHdr.h"
#include "tools.h"
+#include <list>
#include <map>
/// holds information necessary to handle JoinListen response
typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
static SharedListenRequestMap TheSharedListenRequestMap;
+/// accumulates delayed requests until they are ready to be sent, in FIFO order
+typedef std::list<PendingOpenRequest> DelayedSharedListenRequests;
+static DelayedSharedListenRequests TheDelayedRequests;
+
static int
AddToMap(const PendingOpenRequest &por)
{
hdrMsg.putFd(fd);
}
-void Ipc::JoinSharedListen(const OpenListenerParams ¶ms,
- AsyncCall::Pointer &callback)
+static void
+SendSharedListenRequest(const PendingOpenRequest &por)
{
- PendingOpenRequest por;
- por.params = params;
- por.callback = callback;
-
- SharedListenRequest request;
+ Ipc::SharedListenRequest request;
request.requestorId = KidIdentifier;
request.params = por.params;
request.mapId = AddToMap(por);
- debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
+ debugs(54, 3, "getting listening FD for " << request.params.addr <<
" mapId=" << request.mapId);
- TypedMsgHdr message;
+ Ipc::TypedMsgHdr message;
request.pack(message);
SendMessage(Ipc::Port::CoordinatorAddr(), message);
}
+static void
+kickDelayedRequest()
+{
+ if (TheDelayedRequests.empty())
+ return; // no pending requests to resume
+
+ debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() <<
+ " active + " << TheDelayedRequests.size() << " delayed requests");
+
+ SendSharedListenRequest(*TheDelayedRequests.begin());
+ TheDelayedRequests.pop_front();
+}
+
+void
+Ipc::JoinSharedListen(const OpenListenerParams ¶ms, AsyncCall::Pointer &cb)
+{
+ PendingOpenRequest por;
+ por.params = params;
+ por.callback = cb;
+
+ const DelayedSharedListenRequests::size_type concurrencyLimit = 1;
+ if (TheSharedListenRequestMap.size() >= concurrencyLimit) {
+ debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() <<
+ " active + " << TheDelayedRequests.size() << " delayed requests");
+ TheDelayedRequests.push_back(por);
+ } else {
+ SendSharedListenRequest(por);
+ }
+}
+
void Ipc::SharedListenJoined(const SharedListenResponse &response)
{
// Dont debugs c fully since only FD is filled right now.
- debugs(54, 3, HERE << "got listening FD " << response.fd << " errNo=" <<
- response.errNo << " mapId=" << response.mapId);
+ debugs(54, 3, "got listening FD " << response.fd << " errNo=" <<
+ response.errNo << " mapId=" << response.mapId << " with " <<
+ TheSharedListenRequestMap.size() << " active + " <<
+ TheDelayedRequests.size() << " delayed requests");
Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
cbd->errNo = response.errNo;
cbd->handlerSubscription = por.params.handlerSubscription;
ScheduleCallHere(por.callback);
+
+ kickDelayedRequest();
}