2 * DEBUG: section 17 Request Forwarding
7 #include "CollapsedForwarding.h"
9 #include "ipc/mem/Segment.h"
10 #include "ipc/Messages.h"
12 #include "ipc/TypedMsgHdr.h"
13 #include "MemObject.h"
14 #include "SquidConfig.h"
16 #include "store_key_md5.h"
19 /// shared memory segment path to use for CollapsedForwarding queue
20 static const char *const ShmLabel
= "cf";
21 /// a single worker-to-worker queue capacity
22 // TODO: make configurable or compute from squid.conf settings if possible
23 static const int QueueCapacity
= 1024;
25 std::auto_ptr
<CollapsedForwarding::Queue
> CollapsedForwarding::queue
;
28 class CollapsedForwardingMsg
31 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
34 int sender
; ///< kid ID of sending process
36 /// transients index, so that workers can find [private] entries to sync
40 // CollapsedForwarding
43 CollapsedForwarding::Init()
46 if (UsingSmp() && IamWorkerProcess())
47 queue
.reset(new Queue(ShmLabel
, KidIdentifier
));
51 CollapsedForwarding::Broadcast(const StoreEntry
&e
)
56 if (!e
.mem_obj
|| e
.mem_obj
->xitTable
.index
< 0 ||
57 !Store::Root().transientReaders(e
)) {
58 debugs(17, 7, "nobody reads " << e
);
62 CollapsedForwardingMsg msg
;
63 msg
.sender
= KidIdentifier
;
64 msg
.xitIndex
= e
.mem_obj
->xitTable
.index
;
66 debugs(17, 5, e
<< " to " << Config
.workers
<< "-1 workers");
68 // TODO: send only to workers who are waiting for data
69 for (int workerId
= 1; workerId
<= Config
.workers
; ++workerId
) {
71 if (workerId
!= KidIdentifier
&& queue
->push(workerId
, msg
))
73 } catch (const Queue::Full
&) {
74 debugs(17, DBG_IMPORTANT
, "ERROR: Collapsed forwarding " <<
75 "queue overflow for kid" << workerId
<<
76 " at " << queue
->outSize(workerId
) << " items");
77 // TODO: grow queue size
83 CollapsedForwarding::Notify(const int workerId
)
85 // TODO: Count and report the total number of notifications, pops, pushes.
86 debugs(17, 7, "to kid" << workerId
);
88 msg
.setType(Ipc::mtCollapsedForwardingNotification
);
89 msg
.putInt(KidIdentifier
);
90 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, workerId
);
91 Ipc::SendMessage(addr
, msg
);
95 CollapsedForwarding::HandleNewData(const char *const when
)
97 debugs(17, 4, "popping all " << when
);
98 CollapsedForwardingMsg msg
;
101 while (queue
->pop(workerId
, msg
)) {
102 debugs(17, 3, "message from kid" << workerId
);
103 if (workerId
!= msg
.sender
) {
104 debugs(17, DBG_IMPORTANT
, "mismatching kid IDs: " << workerId
<<
105 " != " << msg
.sender
);
108 debugs(17, 7, "handling entry " << msg
.xitIndex
<< " in transients_map");
109 Store::Root().syncCollapsed(msg
.xitIndex
);
110 debugs(17, 7, "handled entry " << msg
.xitIndex
<< " in transients_map");
112 // XXX: stop and schedule an async call to continue
114 assert(poppedCount
< SQUID_MAXFD
);
119 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
121 const int from
= msg
.getInt();
122 debugs(17, 7, "from " << from
);
124 queue
->clearReaderSignal(from
);
125 HandleNewData("after notification");
128 /// initializes shared queue used by CollapsedForwarding
129 class CollapsedForwardingRr
: public Ipc::Mem::RegisteredRunner
132 /* RegisteredRunner API */
133 CollapsedForwardingRr(): owner(NULL
) {}
134 virtual ~CollapsedForwardingRr();
137 virtual void create();
141 Ipc::MultiQueue::Owner
*owner
;
144 RunnerRegistrationEntry(CollapsedForwardingRr
);
146 void CollapsedForwardingRr::create()
149 owner
= Ipc::MultiQueue::Init(ShmLabel
, Config
.workers
, 1,
150 sizeof(CollapsedForwardingMsg
),
154 void CollapsedForwardingRr::open()
156 CollapsedForwarding::Init();
159 CollapsedForwardingRr::~CollapsedForwardingRr()