]> git.ipfire.org Git - thirdparty/squid.git/blob - src/CollapsedForwarding.cc
Merge from trunk
[thirdparty/squid.git] / src / CollapsedForwarding.cc
1 /*
2 * DEBUG: section 17 Request Forwarding
3 *
4 */
5
6 #include "squid.h"
7 #include "CollapsedForwarding.h"
8 #include "globals.h"
9 #include "ipc/mem/Segment.h"
10 #include "ipc/Messages.h"
11 #include "ipc/Port.h"
12 #include "ipc/TypedMsgHdr.h"
13 #include "MemObject.h"
14 #include "SquidConfig.h"
15 #include "Store.h"
16 #include "store_key_md5.h"
17 #include "tools.h"
18
19 /// shared memory segment path to use for CollapsedForwarding queue
20 static const char *const ShmLabel = "cf";
21 /// a single worker-to-worker queue capacity
22 // TODO: make configurable or compute from squid.conf settings if possible
23 static const int QueueCapacity = 1024;
24
25 std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
26
27 /// IPC queue message
28 class CollapsedForwardingMsg
29 {
30 public:
31 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
32
33 public:
34 int sender; ///< kid ID of sending process
35
36 /// transients index, so that workers can find [private] entries to sync
37 sfileno xitIndex;
38 };
39
40 // CollapsedForwarding
41
42 void
43 CollapsedForwarding::Init()
44 {
45 Must(!queue.get());
46 if (UsingSmp() && IamWorkerProcess())
47 queue.reset(new Queue(ShmLabel, KidIdentifier));
48 }
49
50 void
51 CollapsedForwarding::Broadcast(const StoreEntry &e)
52 {
53 if (!queue.get())
54 return;
55
56 if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
57 !Store::Root().transientReaders(e)) {
58 debugs(17, 7, "nobody reads " << e);
59 return;
60 }
61
62 CollapsedForwardingMsg msg;
63 msg.sender = KidIdentifier;
64 msg.xitIndex = e.mem_obj->xitTable.index;
65
66 debugs(17, 5, e << " to " << Config.workers << "-1 workers");
67
68 // TODO: send only to workers who are waiting for data
69 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
70 try {
71 if (workerId != KidIdentifier && queue->push(workerId, msg))
72 Notify(workerId);
73 } catch (const Queue::Full &) {
74 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
75 "queue overflow for kid" << workerId <<
76 " at " << queue->outSize(workerId) << " items");
77 // TODO: grow queue size
78 }
79 }
80 }
81
82 void
83 CollapsedForwarding::Notify(const int workerId)
84 {
85 // TODO: Count and report the total number of notifications, pops, pushes.
86 debugs(17, 7, "to kid" << workerId);
87 Ipc::TypedMsgHdr msg;
88 msg.setType(Ipc::mtCollapsedForwardingNotification);
89 msg.putInt(KidIdentifier);
90 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
91 Ipc::SendMessage(addr, msg);
92 }
93
94 void
95 CollapsedForwarding::HandleNewData(const char *const when)
96 {
97 debugs(17, 4, "popping all " << when);
98 CollapsedForwardingMsg msg;
99 int workerId;
100 int poppedCount = 0;
101 while (queue->pop(workerId, msg)) {
102 debugs(17, 3, "message from kid" << workerId);
103 if (workerId != msg.sender) {
104 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
105 " != " << msg.sender);
106 }
107
108 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
109 Store::Root().syncCollapsed(msg.xitIndex);
110 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
111
112 // XXX: stop and schedule an async call to continue
113 ++poppedCount;
114 assert(poppedCount < SQUID_MAXFD);
115 }
116 }
117
118 void
119 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
120 {
121 const int from = msg.getInt();
122 debugs(17, 7, "from " << from);
123 assert(queue.get());
124 queue->clearReaderSignal(from);
125 HandleNewData("after notification");
126 }
127
128 /// initializes shared queue used by CollapsedForwarding
129 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
130 {
131 public:
132 /* RegisteredRunner API */
133 CollapsedForwardingRr(): owner(NULL) {}
134 virtual ~CollapsedForwardingRr();
135
136 protected:
137 virtual void create(const RunnerRegistry &);
138 virtual void open(const RunnerRegistry &);
139
140 private:
141 Ipc::MultiQueue::Owner *owner;
142 };
143
144 RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr);
145
146 void CollapsedForwardingRr::create(const RunnerRegistry &)
147 {
148 Must(!owner);
149 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
150 sizeof(CollapsedForwardingMsg),
151 QueueCapacity);
152 }
153
154 void CollapsedForwardingRr::open(const RunnerRegistry &)
155 {
156 CollapsedForwarding::Init();
157 }
158
159 CollapsedForwardingRr::~CollapsedForwardingRr()
160 {
161 delete owner;
162 }