]>
Commit | Line | Data |
---|---|---|
807feb1d DK |
1 | /* |
2 | * DEBUG: section 17 Request Forwarding | |
3 | * | |
4 | */ | |
5 | ||
6 | #include "squid.h" | |
7 | #include "ipc/mem/Segment.h" | |
8 | #include "ipc/Messages.h" | |
9 | #include "ipc/Port.h" | |
10 | #include "ipc/TypedMsgHdr.h" | |
11 | #include "CollapsedForwarding.h" | |
807feb1d | 12 | #include "globals.h" |
ce49546e AR |
13 | #include "SquidConfig.h" |
14 | #include "Store.h" | |
15 | #include "store_key_md5.h" | |
807feb1d DK |
16 | #include "tools.h" |
17 | ||
18 | /// shared memory segment path to use for CollapsedForwarding queue | |
19 | static const char *const ShmLabel = "cf"; | |
20 | /// a single worker-to-worker queue capacity | |
21 | // TODO: make configurable or compute from squid.conf settings if possible | |
22 | static const int QueueCapacity = 1024; | |
23 | ||
24 | std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue; | |
25 | ||
26 | /// IPC queue message | |
27 | class CollapsedForwardingMsg | |
28 | { | |
29 | public: | |
ce49546e | 30 | CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; } |
807feb1d DK |
31 | |
32 | public: | |
ce49546e AR |
33 | int sender; /// kid ID of sending process |
34 | uint64_t key[2]; ///< StoreEntry key | |
807feb1d DK |
35 | }; |
36 | ||
37 | // CollapsedForwarding | |
38 | ||
39 | void | |
40 | CollapsedForwarding::Init() | |
41 | { | |
42 | Must(!queue.get()); | |
ce49546e AR |
43 | if (UsingSmp() && IamWorkerProcess()) |
44 | queue.reset(new Queue(ShmLabel, KidIdentifier)); | |
807feb1d DK |
45 | } |
46 | ||
47 | void | |
99921d9d | 48 | CollapsedForwarding::Broadcast(const StoreEntry &e) |
807feb1d | 49 | { |
ce49546e AR |
50 | if (!queue.get()) |
51 | return; | |
52 | ||
807feb1d | 53 | CollapsedForwardingMsg msg; |
ce49546e | 54 | msg.sender = KidIdentifier; |
99921d9d | 55 | memcpy(msg.key, e.key, sizeof(msg.key)); |
ce49546e | 56 | |
99921d9d AR |
57 | debugs(17, 5, storeKeyText(static_cast<cache_key*>(e.key)) << " to " << |
58 | Config.workers << "-1 workers"); | |
807feb1d DK |
59 | |
60 | // TODO: send only to workers who are waiting for data | |
807feb1d DK |
61 | for (int workerId = 1; workerId <= Config.workers; ++workerId) { |
62 | try { | |
ce49546e | 63 | if (workerId != KidIdentifier && queue->push(workerId, msg)) |
807feb1d DK |
64 | Notify(workerId); |
65 | } catch (const Queue::Full &) { | |
5c9846c7 AR |
66 | debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " << |
67 | "queue overflow for kid" << workerId << | |
68 | " at " << queue->outSize(workerId) << " items"); | |
807feb1d DK |
69 | // TODO: grow queue size |
70 | } | |
71 | } | |
72 | } | |
73 | ||
74 | void | |
75 | CollapsedForwarding::Notify(const int workerId) | |
76 | { | |
77 | // TODO: Count and report the total number of notifications, pops, pushes. | |
ce49546e | 78 | debugs(17, 7, "to kid" << workerId); |
807feb1d DK |
79 | Ipc::TypedMsgHdr msg; |
80 | // TODO: add proper message type? | |
81 | msg.setType(Ipc::mtCollapsedForwardingNotification); | |
82 | msg.putInt(KidIdentifier); | |
83 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId); | |
84 | Ipc::SendMessage(addr, msg); | |
85 | } | |
86 | ||
87 | void | |
88 | CollapsedForwarding::HandleNewData(const char *const when) | |
89 | { | |
ce49546e | 90 | debugs(17, 4, "popping all " << when); |
807feb1d DK |
91 | CollapsedForwardingMsg msg; |
92 | int workerId; | |
93 | int poppedCount = 0; | |
94 | while (queue->pop(workerId, msg)) { | |
ce49546e AR |
95 | debugs(17, 3, "message from kid" << workerId); |
96 | if (workerId != msg.sender) { | |
97 | debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId << | |
98 | " != " << msg.sender); | |
807feb1d DK |
99 | } |
100 | ||
5c9846c7 AR |
101 | const cache_key *key = reinterpret_cast<const cache_key*>(msg.key); |
102 | debugs(17, 7, "hadling " << storeKeyText(key)); | |
103 | Store::Root().syncCollapsed(key); | |
104 | debugs(17, 7, "handled " << storeKeyText(key)); | |
ce49546e | 105 | |
807feb1d DK |
106 | // XXX: stop and schedule an async call to continue |
107 | assert(++poppedCount < SQUID_MAXFD); | |
108 | } | |
109 | } | |
110 | ||
111 | void | |
112 | CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
113 | { | |
114 | const int from = msg.getInt(); | |
115 | debugs(17, 7, HERE << "from " << from); | |
ce49546e | 116 | assert(queue.get()); |
807feb1d DK |
117 | queue->clearReaderSignal(from); |
118 | HandleNewData("after notification"); | |
119 | } | |
120 | ||
121 | /// initializes shared queue used by CollapsedForwarding | |
122 | class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner | |
123 | { | |
124 | public: | |
125 | /* RegisteredRunner API */ | |
126 | CollapsedForwardingRr(): owner(NULL) {} | |
127 | virtual ~CollapsedForwardingRr(); | |
128 | ||
129 | protected: | |
130 | virtual void create(const RunnerRegistry &); | |
131 | virtual void open(const RunnerRegistry &); | |
132 | ||
133 | private: | |
134 | Ipc::MultiQueue::Owner *owner; | |
135 | }; | |
136 | ||
137 | RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr); | |
138 | ||
139 | void CollapsedForwardingRr::create(const RunnerRegistry &) | |
140 | { | |
141 | Must(!owner); | |
142 | owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1, | |
143 | sizeof(CollapsedForwardingMsg), | |
144 | QueueCapacity); | |
145 | } | |
146 | ||
147 | void CollapsedForwardingRr::open(const RunnerRegistry &) | |
148 | { | |
ce49546e | 149 | CollapsedForwarding::Init(); |
807feb1d DK |
150 | } |
151 | ||
152 | CollapsedForwardingRr::~CollapsedForwardingRr() | |
153 | { | |
154 | delete owner; | |
155 | } |