]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
SMP: register worker listening ports one by one
authorAlex Rousskov <rousskov@measurement-factory.com>
Sun, 27 Sep 2015 00:03:03 +0000 (17:03 -0700)
committerAmos Jeffries <squid3@treenet.co.nz>
Sun, 27 Sep 2015 00:03:03 +0000 (17:03 -0700)
When operating with many listening ports workers can flood the UDS queue
buffers and run into a timeout waiting for the coordinator to respond.

To prevent that we for a queue and wait for each port to get a response
before registering the next.

src/ipc/SharedListen.cc

index bfbf44925ff7b80058964bec11ddce166a678626..ca50dd0adef7347dd5ba8d1f783c7dbe64063662 100644 (file)
@@ -21,6 +21,7 @@
 #include "ipc/TypedMsgHdr.h"
 #include "tools.h"
 
+#include <list>
 #include <map>
 
 /// holds information necessary to handle JoinListen response
@@ -35,6 +36,10 @@ public:
 typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
 static SharedListenRequestMap TheSharedListenRequestMap;
 
+/// accumulates delayed requests until they are ready to be sent, in FIFO order
+typedef std::list<PendingOpenRequest> DelayedSharedListenRequests;
+static DelayedSharedListenRequests TheDelayedRequests;
+
 static int
 AddToMap(const PendingOpenRequest &por)
 {
@@ -106,31 +111,59 @@ void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
     hdrMsg.putFd(fd);
 }
 
-void Ipc::JoinSharedListen(const OpenListenerParams &params,
-                           AsyncCall::Pointer &callback)
+static void
+SendSharedListenRequest(const PendingOpenRequest &por)
 {
-    PendingOpenRequest por;
-    por.params = params;
-    por.callback = callback;
-
-    SharedListenRequest request;
+    Ipc::SharedListenRequest request;
     request.requestorId = KidIdentifier;
     request.params = por.params;
     request.mapId = AddToMap(por);
 
-    debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
+    debugs(54, 3, "getting listening FD for " << request.params.addr <<
            " mapId=" << request.mapId);
 
-    TypedMsgHdr message;
+    Ipc::TypedMsgHdr message;
     request.pack(message);
     SendMessage(Ipc::Port::CoordinatorAddr(), message);
 }
 
+static void
+kickDelayedRequest()
+{
+    if (TheDelayedRequests.empty())
+        return; // no pending requests to resume
+
+    debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() <<
+           " active + " << TheDelayedRequests.size() << " delayed requests");
+
+    SendSharedListenRequest(*TheDelayedRequests.begin());
+    TheDelayedRequests.pop_front();
+}
+
+void
+Ipc::JoinSharedListen(const OpenListenerParams &params, AsyncCall::Pointer &cb)
+{
+    PendingOpenRequest por;
+    por.params = params;
+    por.callback = cb;
+
+    const DelayedSharedListenRequests::size_type concurrencyLimit = 1;
+    if (TheSharedListenRequestMap.size() >= concurrencyLimit) {
+        debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() <<
+               " active + " << TheDelayedRequests.size() << " delayed requests");
+        TheDelayedRequests.push_back(por);
+    } else {
+        SendSharedListenRequest(por);
+    }
+}
+
 void Ipc::SharedListenJoined(const SharedListenResponse &response)
 {
     // Dont debugs c fully since only FD is filled right now.
-    debugs(54, 3, HERE << "got listening FD " << response.fd << " errNo=" <<
-           response.errNo << " mapId=" << response.mapId);
+    debugs(54, 3, "got listening FD " << response.fd << " errNo=" <<
+           response.errNo << " mapId=" << response.mapId << " with " <<
+           TheSharedListenRequestMap.size() << " active + " <<
+           TheDelayedRequests.size() << " delayed requests");
 
     Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
     PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
@@ -158,5 +191,7 @@ void Ipc::SharedListenJoined(const SharedListenResponse &response)
     cbd->errNo = response.errNo;
     cbd->handlerSubscription = por.params.handlerSubscription;
     ScheduleCallHere(por.callback);
+
+    kickDelayedRequest();
 }