]> git.ipfire.org Git - thirdparty/squid.git/blob - src/CollapsedForwarding.cc
4d87420bb208f05aaa552a03edf92a0bb3472cbd
[thirdparty/squid.git] / src / CollapsedForwarding.cc
1 /*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 17 Request Forwarding */
10
11 #include "squid.h"
12 #include "base/AsyncFunCalls.h"
13 #include "CollapsedForwarding.h"
14 #include "globals.h"
15 #include "ipc/mem/Segment.h"
16 #include "ipc/Messages.h"
17 #include "ipc/Port.h"
18 #include "ipc/TypedMsgHdr.h"
19 #include "MemObject.h"
20 #include "SquidConfig.h"
21 #include "Store.h"
22 #include "store_key_md5.h"
23 #include "tools.h"
24
25 /// shared memory segment path to use for CollapsedForwarding queue
26 static const char *const ShmLabel = "cf";
27 /// a single worker-to-worker queue capacity
28 // TODO: make configurable or compute from squid.conf settings if possible
29 static const int QueueCapacity = 1024;
30
31 std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
32
33 /// IPC queue message
34 class CollapsedForwardingMsg
35 {
36 public:
37 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
38
39 /// prints message parameters; suitable for cache manager reports
40 void stat(std::ostream &);
41
42 public:
43 int sender; ///< kid ID of sending process
44
45 /// transients index, so that workers can find [private] entries to sync
46 sfileno xitIndex;
47 };
48
49 void
50 CollapsedForwardingMsg::stat(std::ostream &os)
51 {
52 os << "sender: " << sender << ", xitIndex: " << xitIndex;
53 }
54
55 // CollapsedForwarding
56
57 void
58 CollapsedForwarding::Init()
59 {
60 Must(!queue.get());
61 if (UsingSmp() && IamWorkerProcess()) {
62 queue.reset(new Queue(ShmLabel, KidIdentifier));
63 AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
64 NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart));
65 ScheduleCallHere(callback);
66 }
67 }
68
69 void
70 CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
71 {
72 if (!e.hasTransients() ||
73 !Store::Root().transientReaders(e)) {
74 debugs(17, 7, "nobody reads " << e);
75 return;
76 }
77
78 debugs(17, 5, e);
79 Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
80 }
81
82 void
83 CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
84 {
85 if (!queue.get())
86 return;
87
88 CollapsedForwardingMsg msg;
89 msg.sender = KidIdentifier;
90 msg.xitIndex = index;
91
92 debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
93
94 // TODO: send only to workers who are waiting for data
95 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
96 try {
97 if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
98 Notify(workerId);
99 } catch (const Queue::Full &) {
100 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
101 "queue overflow for kid" << workerId <<
102 " at " << queue->outSize(workerId) << " items");
103 // TODO: grow queue size
104 }
105 }
106 }
107
108 void
109 CollapsedForwarding::Notify(const int workerId)
110 {
111 // TODO: Count and report the total number of notifications, pops, pushes.
112 debugs(17, 7, "to kid" << workerId);
113 Ipc::TypedMsgHdr msg;
114 msg.setType(Ipc::mtCollapsedForwardingNotification);
115 msg.putInt(KidIdentifier);
116 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
117 Ipc::SendMessage(addr, msg);
118 }
119
120 void
121 CollapsedForwarding::HandleNewData(const char *const when)
122 {
123 debugs(17, 4, "popping all " << when);
124 CollapsedForwardingMsg msg;
125 int workerId;
126 int poppedCount = 0;
127 while (queue->pop(workerId, msg)) {
128 debugs(17, 3, "message from kid" << workerId);
129 if (workerId != msg.sender) {
130 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
131 " != " << msg.sender);
132 }
133
134 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
135 Store::Root().syncCollapsed(msg.xitIndex);
136 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
137
138 // XXX: stop and schedule an async call to continue
139 ++poppedCount;
140 assert(poppedCount < SQUID_MAXFD);
141 }
142 }
143
144 void
145 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
146 {
147 const int from = msg.getInt();
148 debugs(17, 7, "from " << from);
149 assert(queue.get());
150 queue->clearReaderSignal(from);
151 HandleNewData("after notification");
152 }
153
154 /// Handle queued IPC messages for the first time in this process lifetime, when
155 /// the queue may be reflecting the state of our killed predecessor.
156 void
157 CollapsedForwarding::HandleNewDataAtStart()
158 {
159 /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic
160 queue->clearAllReaderSignals();
161 HandleNewData("at start");
162 }
163
164 void
165 CollapsedForwarding::StatQueue(std::ostream &os)
166 {
167 if (queue.get()) {
168 os << "Transients queues:\n";
169 queue->stat<CollapsedForwardingMsg>(os);
170 }
171 }
172
173 /// initializes shared queue used by CollapsedForwarding
174 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
175 {
176 public:
177 /* RegisteredRunner API */
178 CollapsedForwardingRr(): owner(nullptr) {}
179 ~CollapsedForwardingRr() override;
180
181 protected:
182 void create() override;
183 void open() override;
184
185 private:
186 Ipc::MultiQueue::Owner *owner;
187 };
188
189 RunnerRegistrationEntry(CollapsedForwardingRr);
190
191 void CollapsedForwardingRr::create()
192 {
193 Must(!owner);
194 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
195 sizeof(CollapsedForwardingMsg),
196 QueueCapacity);
197 }
198
199 void CollapsedForwardingRr::open()
200 {
201 CollapsedForwarding::Init();
202 }
203
204 CollapsedForwardingRr::~CollapsedForwardingRr()
205 {
206 delete owner;
207 }
208