]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Rework shared object design and management API.
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs shared segment ID from parent queue ID and child queue index
15 static String
16 QueueId(String id, const int idx)
17 {
18 id.append("__");
19 id.append(xitoa(idx));
20 return id;
21 }
22
23 /// constructs QueueReader ID from parent queue ID
24 static String
25 ReaderId(String id)
26 {
27 id.append("__readers");
28 return id;
29 }
30
31
32 /* QueueReader */
33
34 InstanceIdDefinitions(QueueReader, "ipcQR");
35
36 QueueReader::QueueReader(): popBlocked(1), popSignal(0)
37 {
38 debugs(54, 7, HERE << "constructed " << id);
39 }
40
41 /* QueueReaders */
42
43 QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
44 {
45 Must(theCapacity > 0);
46 new (theReaders) QueueReader[theCapacity];
47 }
48
49 size_t
50 QueueReaders::sharedMemorySize() const
51 {
52 return SharedMemorySize(theCapacity);
53 }
54
55 size_t
56 QueueReaders::SharedMemorySize(const int capacity)
57 {
58 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
59 }
60
61
62 // OneToOneUniQueue
63
64 OneToOneUniQueue::Owner *
65 OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
66 {
67 Must(maxItemSize > 0);
68 Must(capacity > 0);
69 return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity);
70 }
71
72 OneToOneUniQueue::OneToOneUniQueue(const String &id):
73 shared(shm_old(Shared)(id.termedBuf())), reader_(NULL)
74 {
75 }
76
77 void
78 OneToOneUniQueue::reader(QueueReader *aReader)
79 {
80 Must(!reader_ && aReader);
81 reader_ = aReader;
82 }
83
84 int
85 OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
86 {
87 assert(maxItemSize > 0);
88 size -= sizeof(Shared);
89 return size >= 0 ? size / maxItemSize : 0;
90 }
91
92 int
93 OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
94 {
95 assert(size >= 0);
96 return sizeof(Shared) + maxItemSize * size;
97 }
98
99 QueueReader &
100 OneToOneUniQueue::reader()
101 {
102 Must(reader_);
103 return *reader_;
104 }
105
106 OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
107 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
108 theCapacity(aCapacity)
109 {
110 }
111
112 size_t
113 OneToOneUniQueue::Shared::sharedMemorySize() const
114 {
115 return SharedMemorySize(theMaxItemSize, theCapacity);
116 }
117
118 size_t
119 OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
120 {
121 return Items2Bytes(maxItemSize, capacity);
122 }
123
124
125 // OneToOneBiQueue
126
127 OneToOneBiQueue::Owner *
128 OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
129 {
130 UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity));
131 UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity));
132 Owner *const owner = new Owner;
133 owner->first = owner1;
134 owner->second = owner2;
135 return owner;
136 }
137
138 OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
139 {
140 OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1));
141 OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2));
142 switch (side) {
143 case Side1:
144 popQueue.reset(queue1);
145 pushQueue.reset(queue2);
146 break;
147 case Side2:
148 popQueue.reset(queue2);
149 pushQueue.reset(queue1);
150 break;
151 default:
152 Must(false);
153 }
154 }
155
156 void
157 OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
158 {
159 popQueue->reader(r1);
160 pushQueue->reader(r2);
161 }
162
163 void
164 OneToOneBiQueue::clearReaderSignal()
165 {
166 debugs(54, 7, HERE << "reader: " << &popQueue->reader());
167 popQueue->reader().clearSignal();
168 }
169
170
171 // FewToOneBiQueue
172
173 FewToOneBiQueue::Owner *
174 FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
175 {
176 return new Owner(id, workerCount, maxItemSize, capacity);
177 }
178
179 FewToOneBiQueue::FewToOneBiQueue(const String &id):
180 theLastPopWorker(0),
181 readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())),
182 reader(readers->theReaders)
183 {
184 Must(readers->theCapacity > 1);
185
186 debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
187
188 biQueues.reserve(workerCount());
189 for (int i = 0; i < workerCount(); ++i) {
190 OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1);
191 QueueReader *const remoteReader = readers->theReaders + i + 1;
192 biQueue->readers(reader, remoteReader);
193 biQueues.push_back(biQueue);
194 }
195 }
196
197 OneToOneBiQueue *
198 FewToOneBiQueue::Attach(const String &id, const int workerId)
199 {
200 Ipc::Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
201 Must(workerId >= WorkerIdOffset);
202 Must(workerId < readers->theCapacity - 1 + WorkerIdOffset);
203 QueueReader *const remoteReader = readers->theReaders;
204 debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
205 QueueReader *const localReader =
206 readers->theReaders + workerId - WorkerIdOffset + 1;
207 debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
208
209 OneToOneBiQueue *const biQueue =
210 new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2);
211 biQueue->readers(localReader, remoteReader);
212
213 // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
214 const Ipc::Mem::Pointer<QueueReaders> *const leakingReaders = new Ipc::Mem::Pointer<QueueReaders>(readers);
215 Must(leakingReaders); // silence unused variable warning
216
217 return biQueue;
218 }
219
220 FewToOneBiQueue::~FewToOneBiQueue()
221 {
222 for (int i = 0; i < workerCount(); ++i)
223 delete biQueues[i];
224 }
225
226 bool FewToOneBiQueue::validWorkerId(const int workerId) const
227 {
228 return WorkerIdOffset <= workerId &&
229 workerId < WorkerIdOffset + workerCount();
230 }
231
232 void
233 FewToOneBiQueue::clearReaderSignal(int workerId)
234 {
235 debugs(54, 7, HERE << "reader: " << reader->id);
236
237 assert(validWorkerId(workerId));
238 reader->clearSignal();
239
240 // we got a hint; we could reposition iteration to try popping from the
241 // workerId queue first; but it does not seem to help much and might
242 // introduce some bias so we do not do that for now:
243 // theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
244 }
245
246 FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
247 readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1))
248 {
249 biQueueOwners.reserve(workerCount);
250 for (int i = 0; i < workerCount; ++i) {
251 OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
252 biQueueOwners.push_back(queueOwner);
253 }
254 }
255
256 FewToOneBiQueue::Owner::~Owner()
257 {
258 for (size_t i = 0; i < biQueueOwners.size(); ++i)
259 delete biQueueOwners[i];
260 delete readersOwner;
261 }