2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 17 Request Forwarding */
12 #include "base/AsyncFunCalls.h"
13 #include "CollapsedForwarding.h"
15 #include "ipc/mem/Segment.h"
16 #include "ipc/Messages.h"
18 #include "ipc/TypedMsgHdr.h"
19 #include "MemObject.h"
20 #include "SquidConfig.h"
22 #include "store_key_md5.h"
25 /// shared memory segment path to use for CollapsedForwarding queue
26 static const char *const ShmLabel
= "cf";
27 /// a single worker-to-worker queue capacity
28 // TODO: make configurable or compute from squid.conf settings if possible
29 static const int QueueCapacity
= 1024;
31 std::unique_ptr
<CollapsedForwarding::Queue
> CollapsedForwarding::queue
;
34 class CollapsedForwardingMsg
37 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
39 /// prints message parameters; suitable for cache manager reports
40 void stat(std::ostream
&);
43 int sender
; ///< kid ID of sending process
45 /// transients index, so that workers can find [private] entries to sync
50 CollapsedForwardingMsg::stat(std::ostream
&os
)
52 os
<< "sender: " << sender
<< ", xitIndex: " << xitIndex
;
55 // CollapsedForwarding
58 CollapsedForwarding::Init()
61 if (UsingSmp() && IamWorkerProcess()) {
62 queue
.reset(new Queue(ShmLabel
, KidIdentifier
));
63 AsyncCall::Pointer callback
= asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
64 NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart
));
65 ScheduleCallHere(callback
);
70 CollapsedForwarding::Broadcast(const StoreEntry
&e
, const bool includingThisWorker
)
72 if (!e
.hasTransients() ||
73 !Store::Root().transientReaders(e
)) {
74 debugs(17, 7, "nobody reads " << e
);
79 Broadcast(e
.mem_obj
->xitTable
.index
, includingThisWorker
);
83 CollapsedForwarding::Broadcast(const sfileno index
, const bool includingThisWorker
)
88 CollapsedForwardingMsg msg
;
89 msg
.sender
= KidIdentifier
;
92 debugs(17, 7, "entry " << index
<< " to " << Config
.workers
<< (includingThisWorker
? "" : "-1") << " workers");
94 // TODO: send only to workers who are waiting for data
95 for (int workerId
= 1; workerId
<= Config
.workers
; ++workerId
) {
97 if ((workerId
!= KidIdentifier
|| includingThisWorker
) && queue
->push(workerId
, msg
))
99 } catch (const Queue::Full
&) {
100 debugs(17, DBG_IMPORTANT
, "ERROR: Collapsed forwarding " <<
101 "queue overflow for kid" << workerId
<<
102 " at " << queue
->outSize(workerId
) << " items");
103 // TODO: grow queue size
109 CollapsedForwarding::Notify(const int workerId
)
111 // TODO: Count and report the total number of notifications, pops, pushes.
112 debugs(17, 7, "to kid" << workerId
);
113 Ipc::TypedMsgHdr msg
;
114 msg
.setType(Ipc::mtCollapsedForwardingNotification
);
115 msg
.putInt(KidIdentifier
);
116 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, workerId
);
117 Ipc::SendMessage(addr
, msg
);
121 CollapsedForwarding::HandleNewData(const char *const when
)
123 debugs(17, 4, "popping all " << when
);
124 CollapsedForwardingMsg msg
;
127 while (queue
->pop(workerId
, msg
)) {
128 debugs(17, 3, "message from kid" << workerId
);
129 if (workerId
!= msg
.sender
) {
130 debugs(17, DBG_IMPORTANT
, "mismatching kid IDs: " << workerId
<<
131 " != " << msg
.sender
);
134 debugs(17, 7, "handling entry " << msg
.xitIndex
<< " in transients_map");
135 Store::Root().syncCollapsed(msg
.xitIndex
);
136 debugs(17, 7, "handled entry " << msg
.xitIndex
<< " in transients_map");
138 // XXX: stop and schedule an async call to continue
140 assert(poppedCount
< SQUID_MAXFD
);
145 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
147 const int from
= msg
.getInt();
148 debugs(17, 7, "from " << from
);
150 queue
->clearReaderSignal(from
);
151 HandleNewData("after notification");
154 /// Handle queued IPC messages for the first time in this process lifetime, when
155 /// the queue may be reflecting the state of our killed predecessor.
157 CollapsedForwarding::HandleNewDataAtStart()
159 /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic
160 queue
->clearAllReaderSignals();
161 HandleNewData("at start");
165 CollapsedForwarding::StatQueue(std::ostream
&os
)
168 os
<< "Transients queues:\n";
169 queue
->stat
<CollapsedForwardingMsg
>(os
);
173 /// initializes shared queue used by CollapsedForwarding
174 class CollapsedForwardingRr
: public Ipc::Mem::RegisteredRunner
177 /* RegisteredRunner API */
178 CollapsedForwardingRr(): owner(nullptr) {}
179 ~CollapsedForwardingRr() override
;
182 void create() override
;
183 void open() override
;
186 Ipc::MultiQueue::Owner
*owner
;
189 DefineRunnerRegistrator(CollapsedForwardingRr
);
191 void CollapsedForwardingRr::create()
194 owner
= Ipc::MultiQueue::Init(ShmLabel
, Config
.workers
, 1,
195 sizeof(CollapsedForwardingMsg
),
199 void CollapsedForwardingRr::open()
201 CollapsedForwarding::Init();
204 CollapsedForwardingRr::~CollapsedForwardingRr()