]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/CollapsedForwarding.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / CollapsedForwarding.cc
index f95bbef913942b2c99ad65f8204d386a5caa6477..9892f393218891d89b478cfaae70a3084757958f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -35,6 +35,9 @@ class CollapsedForwardingMsg
 public:
     CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
 
+    /// prints message parameters; suitable for cache manager reports
+    void stat(std::ostream &);
+
 public:
     int sender; ///< kid ID of sending process
 
@@ -42,6 +45,12 @@ public:
     sfileno xitIndex;
 };
 
+void
+CollapsedForwardingMsg::stat(std::ostream &os)
+{
+    os << "sender: " << sender << ", xitIndex: " << xitIndex;
+}
+
 // CollapsedForwarding
 
 void
@@ -53,27 +62,34 @@ CollapsedForwarding::Init()
 }
 
 void
-CollapsedForwarding::Broadcast(const StoreEntry &e)
+CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
 {
-    if (!queue.get())
-        return;
-
-    if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
+    if (!e.hasTransients() ||
             !Store::Root().transientReaders(e)) {
         debugs(17, 7, "nobody reads " << e);
         return;
     }
 
+    debugs(17, 5, e);
+    Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
+}
+
+void
+CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
+{
+    if (!queue.get())
+        return;
+
     CollapsedForwardingMsg msg;
     msg.sender = KidIdentifier;
-    msg.xitIndex = e.mem_obj->xitTable.index;
+    msg.xitIndex = index;
 
-    debugs(17, 5, e << " to " << Config.workers << "-1 workers");
+    debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
 
     // TODO: send only to workers who are waiting for data
     for (int workerId = 1; workerId <= Config.workers; ++workerId) {
         try {
-            if (workerId != KidIdentifier && queue->push(workerId, msg))
+            if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
                 Notify(workerId);
         } catch (const Queue::Full &) {
             debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
@@ -130,6 +146,15 @@ CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
     HandleNewData("after notification");
 }
 
+void
+CollapsedForwarding::StatQueue(std::ostream &os)
+{
+    if (queue.get()) {
+        os << "Transients queues:\n";
+        queue->stat<CollapsedForwardingMsg>(os);
+    }
+}
+
 /// initializes shared queue used by CollapsedForwarding
 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
 {