]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
12 #include "ipc/Queue.h"
14 /// constructs shared segment ID from parent queue ID and child queue index
16 QueueId(String id
, const int idx
)
19 id
.append(xitoa(idx
));
23 /// constructs QueueReader ID from parent queue ID
27 id
.append("__readers");
34 InstanceIdDefinitions(QueueReader
, "ipcQR");
36 QueueReader::QueueReader(): popBlocked(1), popSignal(0)
38 debugs(54, 7, HERE
<< "constructed " << id
);
43 QueueReaders::QueueReaders(const int aCapacity
): theCapacity(aCapacity
)
45 Must(theCapacity
> 0);
46 new (theReaders
) QueueReader
[theCapacity
];
50 QueueReaders::sharedMemorySize() const
52 return SharedMemorySize(theCapacity
);
56 QueueReaders::SharedMemorySize(const int capacity
)
58 return sizeof(QueueReaders
) + sizeof(QueueReader
) * capacity
;
64 OneToOneUniQueue::Owner
*
65 OneToOneUniQueue::Init(const String
&id
, const unsigned int maxItemSize
, const int capacity
)
67 Must(maxItemSize
> 0);
69 return shm_new(Shared
)(id
.termedBuf(), maxItemSize
, capacity
);
72 OneToOneUniQueue::OneToOneUniQueue(const String
&id
):
73 shared(shm_old(Shared
)(id
.termedBuf())), reader_(NULL
)
78 OneToOneUniQueue::reader(QueueReader
*aReader
)
80 Must(!reader_
&& aReader
);
85 OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize
, int size
)
87 assert(maxItemSize
> 0);
88 size
-= sizeof(Shared
);
89 return size
>= 0 ? size
/ maxItemSize
: 0;
93 OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize
, const int size
)
96 return sizeof(Shared
) + maxItemSize
* size
;
100 OneToOneUniQueue::reader()
106 OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize
, const int aCapacity
):
107 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize
),
108 theCapacity(aCapacity
)
113 OneToOneUniQueue::Shared::sharedMemorySize() const
115 return SharedMemorySize(theMaxItemSize
, theCapacity
);
119 OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize
, const int capacity
)
121 return Items2Bytes(maxItemSize
, capacity
);
127 OneToOneBiQueue::Owner
*
128 OneToOneBiQueue::Init(const String
&id
, const unsigned int maxItemSize
, const int capacity
)
130 UniQueueOwner
owner1(OneToOneUniQueue::Init(QueueId(id
, Side1
), maxItemSize
, capacity
));
131 UniQueueOwner
owner2(OneToOneUniQueue::Init(QueueId(id
, Side2
), maxItemSize
, capacity
));
132 Owner
*const owner
= new Owner
;
133 owner
->first
= owner1
;
134 owner
->second
= owner2
;
138 OneToOneBiQueue::OneToOneBiQueue(const String
&id
, const Side side
)
140 OneToOneUniQueue
*const queue1
= new OneToOneUniQueue(QueueId(id
, Side1
));
141 OneToOneUniQueue
*const queue2
= new OneToOneUniQueue(QueueId(id
, Side2
));
144 popQueue
.reset(queue1
);
145 pushQueue
.reset(queue2
);
148 popQueue
.reset(queue2
);
149 pushQueue
.reset(queue1
);
157 OneToOneBiQueue::readers(QueueReader
*r1
, QueueReader
*r2
)
159 popQueue
->reader(r1
);
160 pushQueue
->reader(r2
);
164 OneToOneBiQueue::clearReaderSignal()
166 debugs(54, 7, HERE
<< "reader: " << &popQueue
->reader());
167 popQueue
->reader().clearSignal();
173 FewToOneBiQueue::Owner
*
174 FewToOneBiQueue::Init(const String
&id
, const int workerCount
, const unsigned int maxItemSize
, const int capacity
)
176 return new Owner(id
, workerCount
, maxItemSize
, capacity
);
179 FewToOneBiQueue::FewToOneBiQueue(const String
&id
):
181 readers(shm_old(QueueReaders
)(ReaderId(id
).termedBuf())),
182 reader(readers
->theReaders
)
184 Must(readers
->theCapacity
> 1);
186 debugs(54, 7, HERE
<< "disker " << id
<< " reader: " << reader
->id
);
188 biQueues
.reserve(workerCount());
189 for (int i
= 0; i
< workerCount(); ++i
) {
190 OneToOneBiQueue
*const biQueue
= new OneToOneBiQueue(QueueId(id
, i
+ WorkerIdOffset
), OneToOneBiQueue::Side1
);
191 QueueReader
*const remoteReader
= readers
->theReaders
+ i
+ 1;
192 biQueue
->readers(reader
, remoteReader
);
193 biQueues
.push_back(biQueue
);
198 FewToOneBiQueue::Attach(const String
&id
, const int workerId
)
200 Ipc::Mem::Pointer
<QueueReaders
> readers
= shm_old(QueueReaders
)(ReaderId(id
).termedBuf());
201 Must(workerId
>= WorkerIdOffset
);
202 Must(workerId
< readers
->theCapacity
- 1 + WorkerIdOffset
);
203 QueueReader
*const remoteReader
= readers
->theReaders
;
204 debugs(54, 7, HERE
<< "disker " << id
<< " reader: " << remoteReader
->id
);
205 QueueReader
*const localReader
=
206 readers
->theReaders
+ workerId
- WorkerIdOffset
+ 1;
207 debugs(54, 7, HERE
<< "local " << id
<< " reader: " << localReader
->id
);
209 OneToOneBiQueue
*const biQueue
=
210 new OneToOneBiQueue(QueueId(id
, workerId
), OneToOneBiQueue::Side2
);
211 biQueue
->readers(localReader
, remoteReader
);
213 // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
214 const Ipc::Mem::Pointer
<QueueReaders
> *const leakingReaders
= new Ipc::Mem::Pointer
<QueueReaders
>(readers
);
215 Must(leakingReaders
); // silence unused variable warning
220 FewToOneBiQueue::~FewToOneBiQueue()
222 for (int i
= 0; i
< workerCount(); ++i
)
226 bool FewToOneBiQueue::validWorkerId(const int workerId
) const
228 return WorkerIdOffset
<= workerId
&&
229 workerId
< WorkerIdOffset
+ workerCount();
233 FewToOneBiQueue::clearReaderSignal(int workerId
)
235 debugs(54, 7, HERE
<< "reader: " << reader
->id
);
237 assert(validWorkerId(workerId
));
238 reader
->clearSignal();
240 // we got a hint; we could reposition iteration to try popping from the
241 // workerId queue first; but it does not seem to help much and might
242 // introduce some bias so we do not do that for now:
243 // theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
246 FewToOneBiQueue::Owner::Owner(const String
&id
, const int workerCount
, const unsigned int maxItemSize
, const int capacity
):
247 readersOwner(shm_new(QueueReaders
)(ReaderId(id
).termedBuf(), workerCount
+ 1))
249 biQueueOwners
.reserve(workerCount
);
250 for (int i
= 0; i
< workerCount
; ++i
) {
251 OneToOneBiQueue::Owner
*const queueOwner
= OneToOneBiQueue::Init(QueueId(id
, i
+ WorkerIdOffset
), maxItemSize
, capacity
);
252 biQueueOwners
.push_back(queueOwner
);
256 FewToOneBiQueue::Owner::~Owner()
258 for (size_t i
= 0; i
< biQueueOwners
.size(); ++i
)
259 delete biQueueOwners
[i
];