]> git.ipfire.org Git - thirdparty/squid.git/blame - src/CollapsedForwarding.cc
Maintenance: Removed most NULLs using modernize-use-nullptr (#1075)
[thirdparty/squid.git] / src / CollapsedForwarding.cc
CommitLineData
807feb1d 1/*
bf95c10a 2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
807feb1d 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
807feb1d
DK
7 */
8
bbc27441
AJ
9/* DEBUG: section 17 Request Forwarding */
10
807feb1d 11#include "squid.h"
5faec1a1 12#include "base/AsyncFunCalls.h"
e4d13993
AR
13#include "CollapsedForwarding.h"
14#include "globals.h"
807feb1d
DK
15#include "ipc/mem/Segment.h"
16#include "ipc/Messages.h"
17#include "ipc/Port.h"
18#include "ipc/TypedMsgHdr.h"
d366a7fa 19#include "MemObject.h"
ce49546e
AR
20#include "SquidConfig.h"
21#include "Store.h"
22#include "store_key_md5.h"
807feb1d
DK
23#include "tools.h"
24
25/// shared memory segment path to use for CollapsedForwarding queue
26static const char *const ShmLabel = "cf";
27/// a single worker-to-worker queue capacity
28// TODO: make configurable or compute from squid.conf settings if possible
29static const int QueueCapacity = 1024;
30
829030b5 31std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
807feb1d
DK
32
33/// IPC queue message
34class CollapsedForwardingMsg
35{
36public:
6919be24 37 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
807feb1d 38
d8ee9e8d
EB
39 /// prints message parameters; suitable for cache manager reports
40 void stat(std::ostream &);
41
807feb1d 42public:
6919be24
AR
43 int sender; ///< kid ID of sending process
44
45 /// transients index, so that workers can find [private] entries to sync
9d4e9cfb 46 sfileno xitIndex;
807feb1d
DK
47};
48
d8ee9e8d
EB
49void
50CollapsedForwardingMsg::stat(std::ostream &os)
51{
52 os << "sender: " << sender << ", xitIndex: " << xitIndex;
53}
54
807feb1d
DK
55// CollapsedForwarding
56
57void
58CollapsedForwarding::Init()
59{
60 Must(!queue.get());
5faec1a1 61 if (UsingSmp() && IamWorkerProcess()) {
ce49546e 62 queue.reset(new Queue(ShmLabel, KidIdentifier));
5faec1a1
EB
63 AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
64 NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart));
65 ScheduleCallHere(callback);
66 }
807feb1d
DK
67}
68
69void
4310f8b0 70CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
807feb1d 71{
4310f8b0 72 if (!e.hasTransients() ||
9d4e9cfb 73 !Store::Root().transientReaders(e)) {
f54986ad 74 debugs(17, 7, "nobody reads " << e);
d366a7fa
AR
75 return;
76 }
77
4310f8b0
EB
78 debugs(17, 5, e);
79 Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
80}
81
82void
83CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
84{
85 if (!queue.get())
86 return;
87
807feb1d 88 CollapsedForwardingMsg msg;
ce49546e 89 msg.sender = KidIdentifier;
4310f8b0 90 msg.xitIndex = index;
ce49546e 91
4310f8b0 92 debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
807feb1d
DK
93
94 // TODO: send only to workers who are waiting for data
807feb1d
DK
95 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
96 try {
4310f8b0 97 if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
807feb1d
DK
98 Notify(workerId);
99 } catch (const Queue::Full &) {
5c9846c7
AR
100 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
101 "queue overflow for kid" << workerId <<
102 " at " << queue->outSize(workerId) << " items");
807feb1d
DK
103 // TODO: grow queue size
104 }
105 }
106}
107
108void
109CollapsedForwarding::Notify(const int workerId)
110{
111 // TODO: Count and report the total number of notifications, pops, pushes.
ce49546e 112 debugs(17, 7, "to kid" << workerId);
807feb1d 113 Ipc::TypedMsgHdr msg;
807feb1d
DK
114 msg.setType(Ipc::mtCollapsedForwardingNotification);
115 msg.putInt(KidIdentifier);
1ee292b7 116 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
807feb1d
DK
117 Ipc::SendMessage(addr, msg);
118}
119
120void
121CollapsedForwarding::HandleNewData(const char *const when)
122{
ce49546e 123 debugs(17, 4, "popping all " << when);
807feb1d
DK
124 CollapsedForwardingMsg msg;
125 int workerId;
126 int poppedCount = 0;
127 while (queue->pop(workerId, msg)) {
ce49546e
AR
128 debugs(17, 3, "message from kid" << workerId);
129 if (workerId != msg.sender) {
130 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
131 " != " << msg.sender);
807feb1d
DK
132 }
133
6919be24
AR
134 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
135 Store::Root().syncCollapsed(msg.xitIndex);
136 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
ce49546e 137
807feb1d 138 // XXX: stop and schedule an async call to continue
432ad89c
FC
139 ++poppedCount;
140 assert(poppedCount < SQUID_MAXFD);
807feb1d
DK
141 }
142}
143
144void
145CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
146{
147 const int from = msg.getInt();
539283df 148 debugs(17, 7, "from " << from);
ce49546e 149 assert(queue.get());
807feb1d
DK
150 queue->clearReaderSignal(from);
151 HandleNewData("after notification");
152}
153
5faec1a1
EB
154/// Handle queued IPC messages for the first time in this process lifetime, when
155/// the queue may be reflecting the state of our killed predecessor.
156void
157CollapsedForwarding::HandleNewDataAtStart()
158{
159 /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic
160 queue->clearAllReaderSignals();
161 HandleNewData("at start");
162}
163
d8ee9e8d
EB
164void
165CollapsedForwarding::StatQueue(std::ostream &os)
166{
167 if (queue.get()) {
168 os << "Transients queues:\n";
169 queue->stat<CollapsedForwardingMsg>(os);
170 }
171}
172
807feb1d
DK
173/// initializes shared queue used by CollapsedForwarding
174class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
175{
176public:
177 /* RegisteredRunner API */
aee3523a 178 CollapsedForwardingRr(): owner(nullptr) {}
807feb1d
DK
179 virtual ~CollapsedForwardingRr();
180
181protected:
21b7990f
AR
182 virtual void create();
183 virtual void open();
807feb1d
DK
184
185private:
186 Ipc::MultiQueue::Owner *owner;
187};
188
21b7990f 189RunnerRegistrationEntry(CollapsedForwardingRr);
807feb1d 190
21b7990f 191void CollapsedForwardingRr::create()
807feb1d
DK
192{
193 Must(!owner);
194 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
195 sizeof(CollapsedForwardingMsg),
196 QueueCapacity);
197}
198
21b7990f 199void CollapsedForwardingRr::open()
807feb1d 200{
ce49546e 201 CollapsedForwarding::Init();
807feb1d
DK
202}
203
204CollapsedForwardingRr::~CollapsedForwardingRr()
205{
206 delete owner;
207}
f53969cc 208