]>
Commit | Line | Data |
---|---|---|
807feb1d | 1 | /* |
bf95c10a | 2 | * Copyright (C) 1996-2022 The Squid Software Foundation and contributors |
807feb1d | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
807feb1d DK |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 17 Request Forwarding */ |
10 | ||
807feb1d | 11 | #include "squid.h" |
5faec1a1 | 12 | #include "base/AsyncFunCalls.h" |
e4d13993 AR |
13 | #include "CollapsedForwarding.h" |
14 | #include "globals.h" | |
807feb1d DK |
15 | #include "ipc/mem/Segment.h" |
16 | #include "ipc/Messages.h" | |
17 | #include "ipc/Port.h" | |
18 | #include "ipc/TypedMsgHdr.h" | |
d366a7fa | 19 | #include "MemObject.h" |
ce49546e AR |
20 | #include "SquidConfig.h" |
21 | #include "Store.h" | |
22 | #include "store_key_md5.h" | |
807feb1d DK |
23 | #include "tools.h" |
24 | ||
25 | /// shared memory segment path to use for CollapsedForwarding queue | |
26 | static const char *const ShmLabel = "cf"; | |
27 | /// a single worker-to-worker queue capacity | |
28 | // TODO: make configurable or compute from squid.conf settings if possible | |
29 | static const int QueueCapacity = 1024; | |
30 | ||
829030b5 | 31 | std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue; |
807feb1d DK |
32 | |
33 | /// IPC queue message | |
34 | class CollapsedForwardingMsg | |
35 | { | |
36 | public: | |
6919be24 | 37 | CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {} |
807feb1d | 38 | |
d8ee9e8d EB |
39 | /// prints message parameters; suitable for cache manager reports |
40 | void stat(std::ostream &); | |
41 | ||
807feb1d | 42 | public: |
6919be24 AR |
43 | int sender; ///< kid ID of sending process |
44 | ||
45 | /// transients index, so that workers can find [private] entries to sync | |
9d4e9cfb | 46 | sfileno xitIndex; |
807feb1d DK |
47 | }; |
48 | ||
d8ee9e8d EB |
49 | void |
50 | CollapsedForwardingMsg::stat(std::ostream &os) | |
51 | { | |
52 | os << "sender: " << sender << ", xitIndex: " << xitIndex; | |
53 | } | |
54 | ||
807feb1d DK |
55 | // CollapsedForwarding |
56 | ||
57 | void | |
58 | CollapsedForwarding::Init() | |
59 | { | |
60 | Must(!queue.get()); | |
5faec1a1 | 61 | if (UsingSmp() && IamWorkerProcess()) { |
ce49546e | 62 | queue.reset(new Queue(ShmLabel, KidIdentifier)); |
5faec1a1 EB |
63 | AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart", |
64 | NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart)); | |
65 | ScheduleCallHere(callback); | |
66 | } | |
807feb1d DK |
67 | } |
68 | ||
69 | void | |
4310f8b0 | 70 | CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker) |
807feb1d | 71 | { |
4310f8b0 | 72 | if (!e.hasTransients() || |
9d4e9cfb | 73 | !Store::Root().transientReaders(e)) { |
f54986ad | 74 | debugs(17, 7, "nobody reads " << e); |
d366a7fa AR |
75 | return; |
76 | } | |
77 | ||
4310f8b0 EB |
78 | debugs(17, 5, e); |
79 | Broadcast(e.mem_obj->xitTable.index, includingThisWorker); | |
80 | } | |
81 | ||
82 | void | |
83 | CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker) | |
84 | { | |
85 | if (!queue.get()) | |
86 | return; | |
87 | ||
807feb1d | 88 | CollapsedForwardingMsg msg; |
ce49546e | 89 | msg.sender = KidIdentifier; |
4310f8b0 | 90 | msg.xitIndex = index; |
ce49546e | 91 | |
4310f8b0 | 92 | debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers"); |
807feb1d DK |
93 | |
94 | // TODO: send only to workers who are waiting for data | |
807feb1d DK |
95 | for (int workerId = 1; workerId <= Config.workers; ++workerId) { |
96 | try { | |
4310f8b0 | 97 | if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg)) |
807feb1d DK |
98 | Notify(workerId); |
99 | } catch (const Queue::Full &) { | |
5c9846c7 AR |
100 | debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " << |
101 | "queue overflow for kid" << workerId << | |
102 | " at " << queue->outSize(workerId) << " items"); | |
807feb1d DK |
103 | // TODO: grow queue size |
104 | } | |
105 | } | |
106 | } | |
107 | ||
108 | void | |
109 | CollapsedForwarding::Notify(const int workerId) | |
110 | { | |
111 | // TODO: Count and report the total number of notifications, pops, pushes. | |
ce49546e | 112 | debugs(17, 7, "to kid" << workerId); |
807feb1d | 113 | Ipc::TypedMsgHdr msg; |
807feb1d DK |
114 | msg.setType(Ipc::mtCollapsedForwardingNotification); |
115 | msg.putInt(KidIdentifier); | |
1ee292b7 | 116 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId); |
807feb1d DK |
117 | Ipc::SendMessage(addr, msg); |
118 | } | |
119 | ||
120 | void | |
121 | CollapsedForwarding::HandleNewData(const char *const when) | |
122 | { | |
ce49546e | 123 | debugs(17, 4, "popping all " << when); |
807feb1d DK |
124 | CollapsedForwardingMsg msg; |
125 | int workerId; | |
126 | int poppedCount = 0; | |
127 | while (queue->pop(workerId, msg)) { | |
ce49546e AR |
128 | debugs(17, 3, "message from kid" << workerId); |
129 | if (workerId != msg.sender) { | |
130 | debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId << | |
131 | " != " << msg.sender); | |
807feb1d DK |
132 | } |
133 | ||
6919be24 AR |
134 | debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map"); |
135 | Store::Root().syncCollapsed(msg.xitIndex); | |
136 | debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map"); | |
ce49546e | 137 | |
807feb1d | 138 | // XXX: stop and schedule an async call to continue |
432ad89c FC |
139 | ++poppedCount; |
140 | assert(poppedCount < SQUID_MAXFD); | |
807feb1d DK |
141 | } |
142 | } | |
143 | ||
144 | void | |
145 | CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
146 | { | |
147 | const int from = msg.getInt(); | |
539283df | 148 | debugs(17, 7, "from " << from); |
ce49546e | 149 | assert(queue.get()); |
807feb1d DK |
150 | queue->clearReaderSignal(from); |
151 | HandleNewData("after notification"); | |
152 | } | |
153 | ||
5faec1a1 EB |
154 | /// Handle queued IPC messages for the first time in this process lifetime, when |
155 | /// the queue may be reflecting the state of our killed predecessor. | |
156 | void | |
157 | CollapsedForwarding::HandleNewDataAtStart() | |
158 | { | |
159 | /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic | |
160 | queue->clearAllReaderSignals(); | |
161 | HandleNewData("at start"); | |
162 | } | |
163 | ||
d8ee9e8d EB |
164 | void |
165 | CollapsedForwarding::StatQueue(std::ostream &os) | |
166 | { | |
167 | if (queue.get()) { | |
168 | os << "Transients queues:\n"; | |
169 | queue->stat<CollapsedForwardingMsg>(os); | |
170 | } | |
171 | } | |
172 | ||
807feb1d DK |
173 | /// initializes shared queue used by CollapsedForwarding |
174 | class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner | |
175 | { | |
176 | public: | |
177 | /* RegisteredRunner API */ | |
aee3523a | 178 | CollapsedForwardingRr(): owner(nullptr) {} |
807feb1d DK |
179 | virtual ~CollapsedForwardingRr(); |
180 | ||
181 | protected: | |
21b7990f AR |
182 | virtual void create(); |
183 | virtual void open(); | |
807feb1d DK |
184 | |
185 | private: | |
186 | Ipc::MultiQueue::Owner *owner; | |
187 | }; | |
188 | ||
21b7990f | 189 | RunnerRegistrationEntry(CollapsedForwardingRr); |
807feb1d | 190 | |
21b7990f | 191 | void CollapsedForwardingRr::create() |
807feb1d DK |
192 | { |
193 | Must(!owner); | |
194 | owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1, | |
195 | sizeof(CollapsedForwardingMsg), | |
196 | QueueCapacity); | |
197 | } | |
198 | ||
21b7990f | 199 | void CollapsedForwardingRr::open() |
807feb1d | 200 | { |
ce49546e | 201 | CollapsedForwarding::Init(); |
807feb1d DK |
202 | } |
203 | ||
204 | CollapsedForwardingRr::~CollapsedForwardingRr() | |
205 | { | |
206 | delete owner; | |
207 | } | |
f53969cc | 208 |