]> git.ipfire.org Git - thirdparty/squid.git/blame - src/CollapsedForwarding.cc
Maintenance: Update astyle version to 3.1 (#841)
[thirdparty/squid.git] / src / CollapsedForwarding.cc
CommitLineData
807feb1d 1/*
f70aedc4 2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
807feb1d 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
807feb1d
DK
7 */
8
bbc27441
AJ
9/* DEBUG: section 17 Request Forwarding */
10
807feb1d 11#include "squid.h"
e4d13993
AR
12#include "CollapsedForwarding.h"
13#include "globals.h"
807feb1d
DK
14#include "ipc/mem/Segment.h"
15#include "ipc/Messages.h"
16#include "ipc/Port.h"
17#include "ipc/TypedMsgHdr.h"
d366a7fa 18#include "MemObject.h"
ce49546e
AR
19#include "SquidConfig.h"
20#include "Store.h"
21#include "store_key_md5.h"
807feb1d
DK
22#include "tools.h"
23
24/// shared memory segment path to use for CollapsedForwarding queue
25static const char *const ShmLabel = "cf";
26/// a single worker-to-worker queue capacity
27// TODO: make configurable or compute from squid.conf settings if possible
28static const int QueueCapacity = 1024;
29
829030b5 30std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
807feb1d
DK
31
32/// IPC queue message
33class CollapsedForwardingMsg
34{
35public:
6919be24 36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
807feb1d 37
d8ee9e8d
EB
38 /// prints message parameters; suitable for cache manager reports
39 void stat(std::ostream &);
40
807feb1d 41public:
6919be24
AR
42 int sender; ///< kid ID of sending process
43
44 /// transients index, so that workers can find [private] entries to sync
9d4e9cfb 45 sfileno xitIndex;
807feb1d
DK
46};
47
d8ee9e8d
EB
48void
49CollapsedForwardingMsg::stat(std::ostream &os)
50{
51 os << "sender: " << sender << ", xitIndex: " << xitIndex;
52}
53
807feb1d
DK
54// CollapsedForwarding
55
56void
57CollapsedForwarding::Init()
58{
59 Must(!queue.get());
ce49546e
AR
60 if (UsingSmp() && IamWorkerProcess())
61 queue.reset(new Queue(ShmLabel, KidIdentifier));
807feb1d
DK
62}
63
64void
4310f8b0 65CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
807feb1d 66{
4310f8b0 67 if (!e.hasTransients() ||
9d4e9cfb 68 !Store::Root().transientReaders(e)) {
f54986ad 69 debugs(17, 7, "nobody reads " << e);
d366a7fa
AR
70 return;
71 }
72
4310f8b0
EB
73 debugs(17, 5, e);
74 Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
75}
76
77void
78CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
79{
80 if (!queue.get())
81 return;
82
807feb1d 83 CollapsedForwardingMsg msg;
ce49546e 84 msg.sender = KidIdentifier;
4310f8b0 85 msg.xitIndex = index;
ce49546e 86
4310f8b0 87 debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
807feb1d
DK
88
89 // TODO: send only to workers who are waiting for data
807feb1d
DK
90 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
91 try {
4310f8b0 92 if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
807feb1d
DK
93 Notify(workerId);
94 } catch (const Queue::Full &) {
5c9846c7
AR
95 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
96 "queue overflow for kid" << workerId <<
97 " at " << queue->outSize(workerId) << " items");
807feb1d
DK
98 // TODO: grow queue size
99 }
100 }
101}
102
103void
104CollapsedForwarding::Notify(const int workerId)
105{
106 // TODO: Count and report the total number of notifications, pops, pushes.
ce49546e 107 debugs(17, 7, "to kid" << workerId);
807feb1d 108 Ipc::TypedMsgHdr msg;
807feb1d
DK
109 msg.setType(Ipc::mtCollapsedForwardingNotification);
110 msg.putInt(KidIdentifier);
1ee292b7 111 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
807feb1d
DK
112 Ipc::SendMessage(addr, msg);
113}
114
115void
116CollapsedForwarding::HandleNewData(const char *const when)
117{
ce49546e 118 debugs(17, 4, "popping all " << when);
807feb1d
DK
119 CollapsedForwardingMsg msg;
120 int workerId;
121 int poppedCount = 0;
122 while (queue->pop(workerId, msg)) {
ce49546e
AR
123 debugs(17, 3, "message from kid" << workerId);
124 if (workerId != msg.sender) {
125 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
126 " != " << msg.sender);
807feb1d
DK
127 }
128
6919be24
AR
129 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
130 Store::Root().syncCollapsed(msg.xitIndex);
131 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
ce49546e 132
807feb1d 133 // XXX: stop and schedule an async call to continue
432ad89c
FC
134 ++poppedCount;
135 assert(poppedCount < SQUID_MAXFD);
807feb1d
DK
136 }
137}
138
139void
140CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
141{
142 const int from = msg.getInt();
539283df 143 debugs(17, 7, "from " << from);
ce49546e 144 assert(queue.get());
807feb1d
DK
145 queue->clearReaderSignal(from);
146 HandleNewData("after notification");
147}
148
d8ee9e8d
EB
149void
150CollapsedForwarding::StatQueue(std::ostream &os)
151{
152 if (queue.get()) {
153 os << "Transients queues:\n";
154 queue->stat<CollapsedForwardingMsg>(os);
155 }
156}
157
807feb1d
DK
158/// initializes shared queue used by CollapsedForwarding
159class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
160{
161public:
162 /* RegisteredRunner API */
163 CollapsedForwardingRr(): owner(NULL) {}
164 virtual ~CollapsedForwardingRr();
165
166protected:
21b7990f
AR
167 virtual void create();
168 virtual void open();
807feb1d
DK
169
170private:
171 Ipc::MultiQueue::Owner *owner;
172};
173
21b7990f 174RunnerRegistrationEntry(CollapsedForwardingRr);
807feb1d 175
21b7990f 176void CollapsedForwardingRr::create()
807feb1d
DK
177{
178 Must(!owner);
179 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
180 sizeof(CollapsedForwardingMsg),
181 QueueCapacity);
182}
183
21b7990f 184void CollapsedForwardingRr::open()
807feb1d 185{
ce49546e 186 CollapsedForwarding::Init();
807feb1d
DK
187}
188
189CollapsedForwardingRr::~CollapsedForwardingRr()
190{
191 delete owner;
192}
f53969cc 193