/*
- * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
public:
CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
+ /// prints message parameters; suitable for cache manager reports
+ void stat(std::ostream &);
+
public:
int sender; ///< kid ID of sending process
sfileno xitIndex;
};
+void
+CollapsedForwardingMsg::stat(std::ostream &os)
+{
+ os << "sender: " << sender << ", xitIndex: " << xitIndex;
+}
+
// CollapsedForwarding
void
}
void
-CollapsedForwarding::Broadcast(const StoreEntry &e)
+CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
{
- if (!queue.get())
- return;
-
- if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
+ if (!e.hasTransients() ||
!Store::Root().transientReaders(e)) {
debugs(17, 7, "nobody reads " << e);
return;
}
+ debugs(17, 5, e);
+ Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
+}
+
+void
+CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
+{
+ if (!queue.get())
+ return;
+
CollapsedForwardingMsg msg;
msg.sender = KidIdentifier;
- msg.xitIndex = e.mem_obj->xitTable.index;
+ msg.xitIndex = index;
- debugs(17, 5, e << " to " << Config.workers << "-1 workers");
+ debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
// TODO: send only to workers who are waiting for data
for (int workerId = 1; workerId <= Config.workers; ++workerId) {
try {
- if (workerId != KidIdentifier && queue->push(workerId, msg))
+ if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
Notify(workerId);
} catch (const Queue::Full &) {
debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
HandleNewData("after notification");
}
+void
+CollapsedForwarding::StatQueue(std::ostream &os)
+{
+ if (queue.get()) {
+ os << "Transients queues:\n";
+ queue->stat<CollapsedForwardingMsg>(os);
+ }
+}
+
/// initializes shared queue used by CollapsedForwarding
class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
{