2 * DEBUG: section 17 Request Forwarding
7 #include "ipc/mem/Segment.h"
8 #include "ipc/Messages.h"
10 #include "ipc/TypedMsgHdr.h"
11 #include "CollapsedForwarding.h"
13 #include "SquidConfig.h"
15 #include "store_key_md5.h"
18 /// shared memory segment path to use for CollapsedForwarding queue
19 static const char *const ShmLabel
= "cf";
20 /// a single worker-to-worker queue capacity
21 // TODO: make configurable or compute from squid.conf settings if possible
22 static const int QueueCapacity
= 1024;
24 std::auto_ptr
<CollapsedForwarding::Queue
> CollapsedForwarding::queue
;
27 class CollapsedForwardingMsg
30 CollapsedForwardingMsg(): sender(-1) { key
[0] = key
[1] = 0; }
33 int sender
; /// kid ID of sending process
34 uint64_t key
[2]; ///< StoreEntry key
37 // CollapsedForwarding
40 CollapsedForwarding::Init()
43 if (UsingSmp() && IamWorkerProcess())
44 queue
.reset(new Queue(ShmLabel
, KidIdentifier
));
48 CollapsedForwarding::Broadcast(const cache_key
*key
)
53 CollapsedForwardingMsg msg
;
54 msg
.sender
= KidIdentifier
;
55 memcpy(msg
.key
, key
, sizeof(msg
.key
));
57 debugs(17, 5, storeKeyText(key
) << " to " << Config
.workers
<< "-1 workers");
59 // TODO: send only to workers who are waiting for data
60 for (int workerId
= 1; workerId
<= Config
.workers
; ++workerId
) {
62 if (workerId
!= KidIdentifier
&& queue
->push(workerId
, msg
))
64 } catch (const Queue::Full
&) {
65 debugs(17, DBG_IMPORTANT
, "ERROR: Collapsed forwarding " <<
66 "queue overflow for kid" << workerId
<<
67 " at " << queue
->outSize(workerId
) << " items");
68 // TODO: grow queue size
74 CollapsedForwarding::Notify(const int workerId
)
76 // TODO: Count and report the total number of notifications, pops, pushes.
77 debugs(17, 7, "to kid" << workerId
);
79 // TODO: add proper message type?
80 msg
.setType(Ipc::mtCollapsedForwardingNotification
);
81 msg
.putInt(KidIdentifier
);
82 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrPfx
, workerId
);
83 Ipc::SendMessage(addr
, msg
);
87 CollapsedForwarding::HandleNewData(const char *const when
)
89 debugs(17, 4, "popping all " << when
);
90 CollapsedForwardingMsg msg
;
93 while (queue
->pop(workerId
, msg
)) {
94 debugs(17, 3, "message from kid" << workerId
);
95 if (workerId
!= msg
.sender
) {
96 debugs(17, DBG_IMPORTANT
, "mismatching kid IDs: " << workerId
<<
97 " != " << msg
.sender
);
100 const cache_key
*key
= reinterpret_cast<const cache_key
*>(msg
.key
);
101 debugs(17, 7, "hadling " << storeKeyText(key
));
102 Store::Root().syncCollapsed(key
);
103 debugs(17, 7, "handled " << storeKeyText(key
));
105 // XXX: stop and schedule an async call to continue
106 assert(++poppedCount
< SQUID_MAXFD
);
111 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
113 const int from
= msg
.getInt();
114 debugs(17, 7, HERE
<< "from " << from
);
116 queue
->clearReaderSignal(from
);
117 HandleNewData("after notification");
120 /// initializes shared queue used by CollapsedForwarding
121 class CollapsedForwardingRr
: public Ipc::Mem::RegisteredRunner
124 /* RegisteredRunner API */
125 CollapsedForwardingRr(): owner(NULL
) {}
126 virtual ~CollapsedForwardingRr();
129 virtual void create(const RunnerRegistry
&);
130 virtual void open(const RunnerRegistry
&);
133 Ipc::MultiQueue::Owner
*owner
;
136 RunnerRegistrationEntry(rrAfterConfig
, CollapsedForwardingRr
);
138 void CollapsedForwardingRr::create(const RunnerRegistry
&)
141 owner
= Ipc::MultiQueue::Init(ShmLabel
, Config
.workers
, 1,
142 sizeof(CollapsedForwardingMsg
),
146 void CollapsedForwardingRr::open(const RunnerRegistry
&)
148 CollapsedForwarding::Init();
151 CollapsedForwardingRr::~CollapsedForwardingRr()