From d74dae63a63db2ccfec52654aabbf2808e1f8987 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Sat, 26 Sep 2015 17:03:03 -0700 Subject: [PATCH] SMP: register worker listening ports one by one When operating with many listening ports workers can flood the UDS queue buffers and run into a timeout waiting for the coordinator to respond. To prevent that we for a queue and wait for each port to get a response before registering the next. --- src/ipc/SharedListen.cc | 57 +++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/src/ipc/SharedListen.cc b/src/ipc/SharedListen.cc index bfbf44925f..ca50dd0ade 100644 --- a/src/ipc/SharedListen.cc +++ b/src/ipc/SharedListen.cc @@ -21,6 +21,7 @@ #include "ipc/TypedMsgHdr.h" #include "tools.h" +#include #include /// holds information necessary to handle JoinListen response @@ -35,6 +36,10 @@ public: typedef std::map SharedListenRequestMap; static SharedListenRequestMap TheSharedListenRequestMap; +/// accumulates delayed requests until they are ready to be sent, in FIFO order +typedef std::list DelayedSharedListenRequests; +static DelayedSharedListenRequests TheDelayedRequests; + static int AddToMap(const PendingOpenRequest &por) { @@ -106,31 +111,59 @@ void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const hdrMsg.putFd(fd); } -void Ipc::JoinSharedListen(const OpenListenerParams ¶ms, - AsyncCall::Pointer &callback) +static void +SendSharedListenRequest(const PendingOpenRequest &por) { - PendingOpenRequest por; - por.params = params; - por.callback = callback; - - SharedListenRequest request; + Ipc::SharedListenRequest request; request.requestorId = KidIdentifier; request.params = por.params; request.mapId = AddToMap(por); - debugs(54, 3, HERE << "getting listening FD for " << request.params.addr << + debugs(54, 3, "getting listening FD for " << request.params.addr << " mapId=" << request.mapId); - TypedMsgHdr message; + Ipc::TypedMsgHdr message; request.pack(message); SendMessage(Ipc::Port::CoordinatorAddr(), message); } +static void +kickDelayedRequest() +{ + if (TheDelayedRequests.empty()) + return; // no pending requests to resume + + debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + + SendSharedListenRequest(*TheDelayedRequests.begin()); + TheDelayedRequests.pop_front(); +} + +void +Ipc::JoinSharedListen(const OpenListenerParams ¶ms, AsyncCall::Pointer &cb) +{ + PendingOpenRequest por; + por.params = params; + por.callback = cb; + + const DelayedSharedListenRequests::size_type concurrencyLimit = 1; + if (TheSharedListenRequestMap.size() >= concurrencyLimit) { + debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + TheDelayedRequests.push_back(por); + } else { + SendSharedListenRequest(por); + } +} + void Ipc::SharedListenJoined(const SharedListenResponse &response) { // Dont debugs c fully since only FD is filled right now. - debugs(54, 3, HERE << "got listening FD " << response.fd << " errNo=" << - response.errNo << " mapId=" << response.mapId); + debugs(54, 3, "got listening FD " << response.fd << " errNo=" << + response.errNo << " mapId=" << response.mapId << " with " << + TheSharedListenRequestMap.size() << " active + " << + TheDelayedRequests.size() << " delayed requests"); Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); PendingOpenRequest por = TheSharedListenRequestMap[response.mapId]; @@ -158,5 +191,7 @@ void Ipc::SharedListenJoined(const SharedListenResponse &response) cbd->errNo = response.errNo; cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); + + kickDelayedRequest(); } -- 2.47.2