2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 17 Request Forwarding */
12 #include "CollapsedForwarding.h"
14 #include "ipc/mem/Segment.h"
15 #include "ipc/Messages.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemObject.h"
19 #include "SquidConfig.h"
21 #include "store_key_md5.h"
24 /// shared memory segment path to use for CollapsedForwarding queue
25 static const char *const ShmLabel
= "cf";
26 /// a single worker-to-worker queue capacity
27 // TODO: make configurable or compute from squid.conf settings if possible
28 static const int QueueCapacity
= 1024;
30 std::unique_ptr
<CollapsedForwarding::Queue
> CollapsedForwarding::queue
;
33 class CollapsedForwardingMsg
36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
38 /// prints message parameters; suitable for cache manager reports
39 void stat(std::ostream
&);
42 int sender
; ///< kid ID of sending process
44 /// transients index, so that workers can find [private] entries to sync
49 CollapsedForwardingMsg::stat(std::ostream
&os
)
51 os
<< "sender: " << sender
<< ", xitIndex: " << xitIndex
;
54 // CollapsedForwarding
57 CollapsedForwarding::Init()
60 if (UsingSmp() && IamWorkerProcess())
61 queue
.reset(new Queue(ShmLabel
, KidIdentifier
));
65 CollapsedForwarding::Broadcast(const StoreEntry
&e
, const bool includingThisWorker
)
67 if (!e
.hasTransients() ||
68 !Store::Root().transientReaders(e
)) {
69 debugs(17, 7, "nobody reads " << e
);
74 Broadcast(e
.mem_obj
->xitTable
.index
, includingThisWorker
);
78 CollapsedForwarding::Broadcast(const sfileno index
, const bool includingThisWorker
)
83 CollapsedForwardingMsg msg
;
84 msg
.sender
= KidIdentifier
;
87 debugs(17, 7, "entry " << index
<< " to " << Config
.workers
<< (includingThisWorker
? "" : "-1") << " workers");
89 // TODO: send only to workers who are waiting for data
90 for (int workerId
= 1; workerId
<= Config
.workers
; ++workerId
) {
92 if ((workerId
!= KidIdentifier
|| includingThisWorker
) && queue
->push(workerId
, msg
))
94 } catch (const Queue::Full
&) {
95 debugs(17, DBG_IMPORTANT
, "ERROR: Collapsed forwarding " <<
96 "queue overflow for kid" << workerId
<<
97 " at " << queue
->outSize(workerId
) << " items");
98 // TODO: grow queue size
104 CollapsedForwarding::Notify(const int workerId
)
106 // TODO: Count and report the total number of notifications, pops, pushes.
107 debugs(17, 7, "to kid" << workerId
);
108 Ipc::TypedMsgHdr msg
;
109 msg
.setType(Ipc::mtCollapsedForwardingNotification
);
110 msg
.putInt(KidIdentifier
);
111 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, workerId
);
112 Ipc::SendMessage(addr
, msg
);
116 CollapsedForwarding::HandleNewData(const char *const when
)
118 debugs(17, 4, "popping all " << when
);
119 CollapsedForwardingMsg msg
;
122 while (queue
->pop(workerId
, msg
)) {
123 debugs(17, 3, "message from kid" << workerId
);
124 if (workerId
!= msg
.sender
) {
125 debugs(17, DBG_IMPORTANT
, "mismatching kid IDs: " << workerId
<<
126 " != " << msg
.sender
);
129 debugs(17, 7, "handling entry " << msg
.xitIndex
<< " in transients_map");
130 Store::Root().syncCollapsed(msg
.xitIndex
);
131 debugs(17, 7, "handled entry " << msg
.xitIndex
<< " in transients_map");
133 // XXX: stop and schedule an async call to continue
135 assert(poppedCount
< SQUID_MAXFD
);
140 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
142 const int from
= msg
.getInt();
143 debugs(17, 7, "from " << from
);
145 queue
->clearReaderSignal(from
);
146 HandleNewData("after notification");
150 CollapsedForwarding::StatQueue(std::ostream
&os
)
153 os
<< "Transients queues:\n";
154 queue
->stat
<CollapsedForwardingMsg
>(os
);
158 /// initializes shared queue used by CollapsedForwarding
159 class CollapsedForwardingRr
: public Ipc::Mem::RegisteredRunner
162 /* RegisteredRunner API */
163 CollapsedForwardingRr(): owner(NULL
) {}
164 virtual ~CollapsedForwardingRr();
167 virtual void create();
171 Ipc::MultiQueue::Owner
*owner
;
174 RunnerRegistrationEntry(CollapsedForwardingRr
);
176 void CollapsedForwardingRr::create()
179 owner
= Ipc::MultiQueue::Init(ShmLabel
, Config
.workers
, 1,
180 sizeof(CollapsedForwardingMsg
),
184 void CollapsedForwardingRr::open()
186 CollapsedForwarding::Init();
189 CollapsedForwardingRr::~CollapsedForwardingRr()