]> git.ipfire.org Git - thirdparty/squid.git/blob - src/CollapsedForwarding.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / CollapsedForwarding.cc
1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 17 Request Forwarding */
10
11 #include "squid.h"
12 #include "CollapsedForwarding.h"
13 #include "globals.h"
14 #include "ipc/mem/Segment.h"
15 #include "ipc/Messages.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemObject.h"
19 #include "SquidConfig.h"
20 #include "Store.h"
21 #include "store_key_md5.h"
22 #include "tools.h"
23
24 /// shared memory segment path to use for CollapsedForwarding queue
25 static const char *const ShmLabel = "cf";
26 /// a single worker-to-worker queue capacity
27 // TODO: make configurable or compute from squid.conf settings if possible
28 static const int QueueCapacity = 1024;
29
30 std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
31
32 /// IPC queue message
33 class CollapsedForwardingMsg
34 {
35 public:
36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
37
38 /// prints message parameters; suitable for cache manager reports
39 void stat(std::ostream &);
40
41 public:
42 int sender; ///< kid ID of sending process
43
44 /// transients index, so that workers can find [private] entries to sync
45 sfileno xitIndex;
46 };
47
48 void
49 CollapsedForwardingMsg::stat(std::ostream &os)
50 {
51 os << "sender: " << sender << ", xitIndex: " << xitIndex;
52 }
53
54 // CollapsedForwarding
55
56 void
57 CollapsedForwarding::Init()
58 {
59 Must(!queue.get());
60 if (UsingSmp() && IamWorkerProcess())
61 queue.reset(new Queue(ShmLabel, KidIdentifier));
62 }
63
64 void
65 CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
66 {
67 if (!e.hasTransients() ||
68 !Store::Root().transientReaders(e)) {
69 debugs(17, 7, "nobody reads " << e);
70 return;
71 }
72
73 debugs(17, 5, e);
74 Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
75 }
76
77 void
78 CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
79 {
80 if (!queue.get())
81 return;
82
83 CollapsedForwardingMsg msg;
84 msg.sender = KidIdentifier;
85 msg.xitIndex = index;
86
87 debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
88
89 // TODO: send only to workers who are waiting for data
90 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
91 try {
92 if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
93 Notify(workerId);
94 } catch (const Queue::Full &) {
95 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
96 "queue overflow for kid" << workerId <<
97 " at " << queue->outSize(workerId) << " items");
98 // TODO: grow queue size
99 }
100 }
101 }
102
103 void
104 CollapsedForwarding::Notify(const int workerId)
105 {
106 // TODO: Count and report the total number of notifications, pops, pushes.
107 debugs(17, 7, "to kid" << workerId);
108 Ipc::TypedMsgHdr msg;
109 msg.setType(Ipc::mtCollapsedForwardingNotification);
110 msg.putInt(KidIdentifier);
111 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
112 Ipc::SendMessage(addr, msg);
113 }
114
115 void
116 CollapsedForwarding::HandleNewData(const char *const when)
117 {
118 debugs(17, 4, "popping all " << when);
119 CollapsedForwardingMsg msg;
120 int workerId;
121 int poppedCount = 0;
122 while (queue->pop(workerId, msg)) {
123 debugs(17, 3, "message from kid" << workerId);
124 if (workerId != msg.sender) {
125 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
126 " != " << msg.sender);
127 }
128
129 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
130 Store::Root().syncCollapsed(msg.xitIndex);
131 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
132
133 // XXX: stop and schedule an async call to continue
134 ++poppedCount;
135 assert(poppedCount < SQUID_MAXFD);
136 }
137 }
138
139 void
140 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
141 {
142 const int from = msg.getInt();
143 debugs(17, 7, "from " << from);
144 assert(queue.get());
145 queue->clearReaderSignal(from);
146 HandleNewData("after notification");
147 }
148
149 void
150 CollapsedForwarding::StatQueue(std::ostream &os)
151 {
152 if (queue.get()) {
153 os << "Transients queues:\n";
154 queue->stat<CollapsedForwardingMsg>(os);
155 }
156 }
157
158 /// initializes shared queue used by CollapsedForwarding
159 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
160 {
161 public:
162 /* RegisteredRunner API */
163 CollapsedForwardingRr(): owner(NULL) {}
164 virtual ~CollapsedForwardingRr();
165
166 protected:
167 virtual void create();
168 virtual void open();
169
170 private:
171 Ipc::MultiQueue::Owner *owner;
172 };
173
174 RunnerRegistrationEntry(CollapsedForwardingRr);
175
176 void CollapsedForwardingRr::create()
177 {
178 Must(!owner);
179 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
180 sizeof(CollapsedForwardingMsg),
181 QueueCapacity);
182 }
183
184 void CollapsedForwardingRr::open()
185 {
186 CollapsedForwarding::Init();
187 }
188
189 CollapsedForwardingRr::~CollapsedForwardingRr()
190 {
191 delete owner;
192 }
193