2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 17 Request Forwarding */
12 #include "CollapsedForwarding.h"
14 #include "ipc/mem/Segment.h"
15 #include "ipc/Messages.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemObject.h"
19 #include "SquidConfig.h"
21 #include "store_key_md5.h"
24 /// shared memory segment path to use for CollapsedForwarding queue
25 static const char *const ShmLabel
= "cf";
26 /// a single worker-to-worker queue capacity
27 // TODO: make configurable or compute from squid.conf settings if possible
28 static const int QueueCapacity
= 1024;
30 std::unique_ptr
<CollapsedForwarding::Queue
> CollapsedForwarding::queue
;
33 class CollapsedForwardingMsg
36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
39 int sender
; ///< kid ID of sending process
41 /// transients index, so that workers can find [private] entries to sync
45 // CollapsedForwarding
48 CollapsedForwarding::Init()
51 if (UsingSmp() && IamWorkerProcess())
52 queue
.reset(new Queue(ShmLabel
, KidIdentifier
));
56 CollapsedForwarding::Broadcast(const StoreEntry
&e
)
61 if (!e
.mem_obj
|| e
.mem_obj
->xitTable
.index
< 0 ||
62 !Store::Root().transientReaders(e
)) {
63 debugs(17, 7, "nobody reads " << e
);
67 CollapsedForwardingMsg msg
;
68 msg
.sender
= KidIdentifier
;
69 msg
.xitIndex
= e
.mem_obj
->xitTable
.index
;
71 debugs(17, 5, e
<< " to " << Config
.workers
<< "-1 workers");
73 // TODO: send only to workers who are waiting for data
74 for (int workerId
= 1; workerId
<= Config
.workers
; ++workerId
) {
76 if (workerId
!= KidIdentifier
&& queue
->push(workerId
, msg
))
78 } catch (const Queue::Full
&) {
79 debugs(17, DBG_IMPORTANT
, "ERROR: Collapsed forwarding " <<
80 "queue overflow for kid" << workerId
<<
81 " at " << queue
->outSize(workerId
) << " items");
82 // TODO: grow queue size
88 CollapsedForwarding::Notify(const int workerId
)
90 // TODO: Count and report the total number of notifications, pops, pushes.
91 debugs(17, 7, "to kid" << workerId
);
93 msg
.setType(Ipc::mtCollapsedForwardingNotification
);
94 msg
.putInt(KidIdentifier
);
95 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, workerId
);
96 Ipc::SendMessage(addr
, msg
);
100 CollapsedForwarding::HandleNewData(const char *const when
)
102 debugs(17, 4, "popping all " << when
);
103 CollapsedForwardingMsg msg
;
106 while (queue
->pop(workerId
, msg
)) {
107 debugs(17, 3, "message from kid" << workerId
);
108 if (workerId
!= msg
.sender
) {
109 debugs(17, DBG_IMPORTANT
, "mismatching kid IDs: " << workerId
<<
110 " != " << msg
.sender
);
113 debugs(17, 7, "handling entry " << msg
.xitIndex
<< " in transients_map");
114 Store::Root().syncCollapsed(msg
.xitIndex
);
115 debugs(17, 7, "handled entry " << msg
.xitIndex
<< " in transients_map");
117 // XXX: stop and schedule an async call to continue
119 assert(poppedCount
< SQUID_MAXFD
);
124 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
126 const int from
= msg
.getInt();
127 debugs(17, 7, "from " << from
);
129 queue
->clearReaderSignal(from
);
130 HandleNewData("after notification");
133 /// initializes shared queue used by CollapsedForwarding
134 class CollapsedForwardingRr
: public Ipc::Mem::RegisteredRunner
137 /* RegisteredRunner API */
138 CollapsedForwardingRr(): owner(NULL
) {}
139 virtual ~CollapsedForwardingRr();
142 virtual void create();
146 Ipc::MultiQueue::Owner
*owner
;
149 RunnerRegistrationEntry(CollapsedForwardingRr
);
151 void CollapsedForwardingRr::create()
154 owner
= Ipc::MultiQueue::Init(ShmLabel
, Config
.workers
, 1,
155 sizeof(CollapsedForwardingMsg
),
159 void CollapsedForwardingRr::open()
161 CollapsedForwarding::Init();
164 CollapsedForwardingRr::~CollapsedForwardingRr()