]> git.ipfire.org Git - thirdparty/squid.git/blame - src/CollapsedForwarding.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / CollapsedForwarding.cc
CommitLineData
807feb1d 1/*
4ac4a490 2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
807feb1d 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
807feb1d
DK
7 */
8
bbc27441
AJ
9/* DEBUG: section 17 Request Forwarding */
10
807feb1d 11#include "squid.h"
e4d13993
AR
12#include "CollapsedForwarding.h"
13#include "globals.h"
807feb1d
DK
14#include "ipc/mem/Segment.h"
15#include "ipc/Messages.h"
16#include "ipc/Port.h"
17#include "ipc/TypedMsgHdr.h"
d366a7fa 18#include "MemObject.h"
ce49546e
AR
19#include "SquidConfig.h"
20#include "Store.h"
21#include "store_key_md5.h"
807feb1d
DK
22#include "tools.h"
23
24/// shared memory segment path to use for CollapsedForwarding queue
25static const char *const ShmLabel = "cf";
26/// a single worker-to-worker queue capacity
27// TODO: make configurable or compute from squid.conf settings if possible
28static const int QueueCapacity = 1024;
29
829030b5 30std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
807feb1d
DK
31
32/// IPC queue message
33class CollapsedForwardingMsg
34{
35public:
6919be24 36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
807feb1d
DK
37
38public:
6919be24
AR
39 int sender; ///< kid ID of sending process
40
41 /// transients index, so that workers can find [private] entries to sync
9d4e9cfb 42 sfileno xitIndex;
807feb1d
DK
43};
44
45// CollapsedForwarding
46
47void
48CollapsedForwarding::Init()
49{
50 Must(!queue.get());
ce49546e
AR
51 if (UsingSmp() && IamWorkerProcess())
52 queue.reset(new Queue(ShmLabel, KidIdentifier));
807feb1d
DK
53}
54
55void
99921d9d 56CollapsedForwarding::Broadcast(const StoreEntry &e)
807feb1d 57{
ce49546e
AR
58 if (!queue.get())
59 return;
60
d366a7fa 61 if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
9d4e9cfb 62 !Store::Root().transientReaders(e)) {
f54986ad 63 debugs(17, 7, "nobody reads " << e);
d366a7fa
AR
64 return;
65 }
66
807feb1d 67 CollapsedForwardingMsg msg;
ce49546e 68 msg.sender = KidIdentifier;
6919be24 69 msg.xitIndex = e.mem_obj->xitTable.index;
ce49546e 70
6919be24 71 debugs(17, 5, e << " to " << Config.workers << "-1 workers");
807feb1d
DK
72
73 // TODO: send only to workers who are waiting for data
807feb1d
DK
74 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
75 try {
ce49546e 76 if (workerId != KidIdentifier && queue->push(workerId, msg))
807feb1d
DK
77 Notify(workerId);
78 } catch (const Queue::Full &) {
5c9846c7
AR
79 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
80 "queue overflow for kid" << workerId <<
81 " at " << queue->outSize(workerId) << " items");
807feb1d
DK
82 // TODO: grow queue size
83 }
84 }
85}
86
87void
88CollapsedForwarding::Notify(const int workerId)
89{
90 // TODO: Count and report the total number of notifications, pops, pushes.
ce49546e 91 debugs(17, 7, "to kid" << workerId);
807feb1d 92 Ipc::TypedMsgHdr msg;
807feb1d
DK
93 msg.setType(Ipc::mtCollapsedForwardingNotification);
94 msg.putInt(KidIdentifier);
1ee292b7 95 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
807feb1d
DK
96 Ipc::SendMessage(addr, msg);
97}
98
99void
100CollapsedForwarding::HandleNewData(const char *const when)
101{
ce49546e 102 debugs(17, 4, "popping all " << when);
807feb1d
DK
103 CollapsedForwardingMsg msg;
104 int workerId;
105 int poppedCount = 0;
106 while (queue->pop(workerId, msg)) {
ce49546e
AR
107 debugs(17, 3, "message from kid" << workerId);
108 if (workerId != msg.sender) {
109 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
110 " != " << msg.sender);
807feb1d
DK
111 }
112
6919be24
AR
113 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
114 Store::Root().syncCollapsed(msg.xitIndex);
115 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
ce49546e 116
807feb1d 117 // XXX: stop and schedule an async call to continue
432ad89c
FC
118 ++poppedCount;
119 assert(poppedCount < SQUID_MAXFD);
807feb1d
DK
120 }
121}
122
123void
124CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
125{
126 const int from = msg.getInt();
539283df 127 debugs(17, 7, "from " << from);
ce49546e 128 assert(queue.get());
807feb1d
DK
129 queue->clearReaderSignal(from);
130 HandleNewData("after notification");
131}
132
133/// initializes shared queue used by CollapsedForwarding
134class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
135{
136public:
137 /* RegisteredRunner API */
138 CollapsedForwardingRr(): owner(NULL) {}
139 virtual ~CollapsedForwardingRr();
140
141protected:
21b7990f
AR
142 virtual void create();
143 virtual void open();
807feb1d
DK
144
145private:
146 Ipc::MultiQueue::Owner *owner;
147};
148
21b7990f 149RunnerRegistrationEntry(CollapsedForwardingRr);
807feb1d 150
21b7990f 151void CollapsedForwardingRr::create()
807feb1d
DK
152{
153 Must(!owner);
154 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
155 sizeof(CollapsedForwardingMsg),
156 QueueCapacity);
157}
158
21b7990f 159void CollapsedForwardingRr::open()
807feb1d 160{
ce49546e 161 CollapsedForwarding::Init();
807feb1d
DK
162}
163
164CollapsedForwardingRr::~CollapsedForwardingRr()
165{
166 delete owner;
167}
f53969cc 168