]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/TextException.h"
15 #include "ipc/Queue.h"
19 /// constructs Metadata ID from parent queue ID
23 id
. append ( "__metadata" );
27 /// constructs one-to-one queues ID from parent queue ID
31 id
. append ( "__queues" );
35 /// constructs QueueReaders ID from parent queue ID
39 id
. append ( "__readers" );
45 InstanceIdDefinitions ( Ipc :: QueueReader
, "ipcQR" );
47 Ipc :: QueueReader :: QueueReader (): popBlocked ( true ), popSignal ( false ),
48 rateLimit ( 0 ), balance ( 0 )
50 debugs ( 54 , 7 , HERE
<< "constructed " << id
);
55 Ipc :: QueueReaders :: QueueReaders ( const int aCapacity
): theCapacity ( aCapacity
),
56 theReaders ( theCapacity
)
58 Must ( theCapacity
> 0 );
62 Ipc :: QueueReaders :: sharedMemorySize () const
64 return SharedMemorySize ( theCapacity
);
68 Ipc :: QueueReaders :: SharedMemorySize ( const int capacity
)
70 return sizeof ( QueueReaders
) + sizeof ( QueueReader
) * capacity
;
75 Ipc :: OneToOneUniQueue :: OneToOneUniQueue ( const unsigned int aMaxItemSize
, const int aCapacity
):
76 theIn ( 0 ), theOut ( 0 ), theSize ( 0 ), theMaxItemSize ( aMaxItemSize
),
77 theCapacity ( aCapacity
)
79 Must ( theMaxItemSize
> 0 );
80 Must ( theCapacity
> 0 );
84 Ipc :: OneToOneUniQueue :: Bytes2Items ( const unsigned int maxItemSize
, int size
)
86 assert ( maxItemSize
> 0 );
87 size
-= sizeof ( OneToOneUniQueue
);
88 return size
>= 0 ? size
/ maxItemSize
: 0 ;
92 Ipc :: OneToOneUniQueue :: Items2Bytes ( const unsigned int maxItemSize
, const int size
)
95 return sizeof ( OneToOneUniQueue
) + maxItemSize
* size
;
98 /* OneToOneUniQueues */
100 Ipc :: OneToOneUniQueues :: OneToOneUniQueues ( const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
): theCapacity ( aCapacity
)
102 Must ( theCapacity
> 0 );
103 for ( int i
= 0 ; i
< theCapacity
; ++ i
)
104 new (&(* this )[ i
]) OneToOneUniQueue ( maxItemSize
, queueCapacity
);
108 Ipc :: OneToOneUniQueues :: sharedMemorySize () const
110 return sizeof (* this ) + theCapacity
* front (). sharedMemorySize ();
114 Ipc :: OneToOneUniQueues :: SharedMemorySize ( const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
)
116 const int queueSize
=
117 OneToOneUniQueue :: Items2Bytes ( maxItemSize
, queueCapacity
);
118 return sizeof ( OneToOneUniQueues
) + queueSize
* capacity
;
121 const Ipc :: OneToOneUniQueue
&
122 Ipc :: OneToOneUniQueues :: operator []( const int index
) const
124 Must ( 0 <= index
&& index
< theCapacity
);
125 const size_t queueSize
= index
? front (). sharedMemorySize () : 0 ;
126 const char * const queue
=
127 reinterpret_cast < const char *>( this ) + sizeof (* this ) + index
* queueSize
;
128 return * reinterpret_cast < const OneToOneUniQueue
*>( queue
);
133 Ipc :: BaseMultiQueue :: BaseMultiQueue ( const int aLocalProcessId
):
134 theLocalProcessId ( aLocalProcessId
),
135 theLastPopProcessId ( std :: numeric_limits
< int >:: max () - 1 )
140 Ipc :: BaseMultiQueue :: clearReaderSignal ( const int /*remoteProcessId*/ )
142 QueueReader
& reader
= localReader ();
143 debugs ( 54 , 7 , "reader: " << reader
. id
);
145 reader
. clearSignal ();
147 // we got a hint; we could reposition iteration to try popping from the
148 // remoteProcessId queue first; but it does not seem to help much and might
149 // introduce some bias so we do not do that for now:
150 // theLastPopProcessId = remoteProcessId;
153 const Ipc :: QueueReader :: Balance
&
154 Ipc :: BaseMultiQueue :: balance ( const int remoteProcessId
) const
156 const QueueReader
& r
= remoteReader ( remoteProcessId
);
160 const Ipc :: QueueReader :: Rate
&
161 Ipc :: BaseMultiQueue :: rateLimit ( const int remoteProcessId
) const
163 const QueueReader
& r
= remoteReader ( remoteProcessId
);
167 Ipc :: OneToOneUniQueue
&
168 Ipc :: BaseMultiQueue :: inQueue ( const int remoteProcessId
)
170 const OneToOneUniQueue
& queue
=
171 const_cast < const BaseMultiQueue
*>( this )-> inQueue ( remoteProcessId
);
172 return const_cast < OneToOneUniQueue
&>( queue
);
175 Ipc :: OneToOneUniQueue
&
176 Ipc :: BaseMultiQueue :: outQueue ( const int remoteProcessId
)
178 const OneToOneUniQueue
& queue
=
179 const_cast < const BaseMultiQueue
*>( this )-> outQueue ( remoteProcessId
);
180 return const_cast < OneToOneUniQueue
&>( queue
);
184 Ipc :: BaseMultiQueue :: localReader ()
186 const QueueReader
& reader
=
187 const_cast < const BaseMultiQueue
*>( this )-> localReader ();
188 return const_cast < QueueReader
&>( reader
);
192 Ipc :: BaseMultiQueue :: remoteReader ( const int remoteProcessId
)
194 const QueueReader
& reader
=
195 const_cast < const BaseMultiQueue
*>( this )-> remoteReader ( remoteProcessId
);
196 return const_cast < QueueReader
&>( reader
);
201 Ipc :: FewToFewBiQueue :: Owner
*
202 Ipc :: FewToFewBiQueue :: Init ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
)
204 return new Owner ( id
, groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
, maxItemSize
, capacity
);
207 Ipc :: FewToFewBiQueue :: FewToFewBiQueue ( const String
& id
, const Group aLocalGroup
, const int aLocalProcessId
):
208 BaseMultiQueue ( aLocalProcessId
),
209 metadata ( shm_old ( Metadata
)( MetadataId ( id
). termedBuf ())),
210 queues ( shm_old ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf ())),
211 readers ( shm_old ( QueueReaders
)( ReadersId ( id
). termedBuf ())),
212 theLocalGroup ( aLocalGroup
)
214 Must ( queues
-> theCapacity
== metadata
-> theGroupASize
* metadata
-> theGroupBSize
* 2 );
215 Must ( readers
-> theCapacity
== metadata
-> theGroupASize
+ metadata
-> theGroupBSize
);
217 debugs ( 54 , 7 , "queue " << id
<< " reader: " << localReader (). id
);
221 Ipc :: FewToFewBiQueue :: MaxItemsCount ( const int groupASize
, const int groupBSize
, const int capacity
)
223 return capacity
* groupASize
* groupBSize
* 2 ;
227 Ipc :: FewToFewBiQueue :: validProcessId ( const Group group
, const int processId
) const
231 return metadata
-> theGroupAIdOffset
<= processId
&&
232 processId
< metadata
-> theGroupAIdOffset
+ metadata
-> theGroupASize
;
234 return metadata
-> theGroupBIdOffset
<= processId
&&
235 processId
< metadata
-> theGroupBIdOffset
+ metadata
-> theGroupBSize
;
241 Ipc :: FewToFewBiQueue :: oneToOneQueueIndex ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
243 Must ( fromGroup
!= toGroup
);
244 assert ( validProcessId ( fromGroup
, fromProcessId
));
245 assert ( validProcessId ( toGroup
, toProcessId
));
249 if ( fromGroup
== groupA
) {
250 index1
= fromProcessId
- metadata
-> theGroupAIdOffset
;
251 index2
= toProcessId
- metadata
-> theGroupBIdOffset
;
254 index1
= toProcessId
- metadata
-> theGroupAIdOffset
;
255 index2
= fromProcessId
- metadata
-> theGroupBIdOffset
;
256 offset
= metadata
-> theGroupASize
* metadata
-> theGroupBSize
;
258 const int index
= offset
+ index1
* metadata
-> theGroupBSize
+ index2
;
262 const Ipc :: OneToOneUniQueue
&
263 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
265 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
268 const Ipc :: OneToOneUniQueue
&
269 Ipc :: FewToFewBiQueue :: inQueue ( const int remoteProcessId
) const
271 return oneToOneQueue ( remoteGroup (), remoteProcessId
,
272 theLocalGroup
, theLocalProcessId
);
275 const Ipc :: OneToOneUniQueue
&
276 Ipc :: FewToFewBiQueue :: outQueue ( const int remoteProcessId
) const
278 return oneToOneQueue ( theLocalGroup
, theLocalProcessId
,
279 remoteGroup (), remoteProcessId
);
283 Ipc :: FewToFewBiQueue :: readerIndex ( const Group group
, const int processId
) const
285 Must ( validProcessId ( group
, processId
));
286 return group
== groupA
?
287 processId
- metadata
-> theGroupAIdOffset
:
288 metadata
-> theGroupASize
+ processId
- metadata
-> theGroupBIdOffset
;
291 const Ipc :: QueueReader
&
292 Ipc :: FewToFewBiQueue :: localReader () const
294 return readers
-> theReaders
[ readerIndex ( theLocalGroup
, theLocalProcessId
)];
297 const Ipc :: QueueReader
&
298 Ipc :: FewToFewBiQueue :: remoteReader ( const int processId
) const
300 return readers
-> theReaders
[ readerIndex ( remoteGroup (), processId
)];
304 Ipc :: FewToFewBiQueue :: remotesCount () const
306 return theLocalGroup
== groupA
? metadata
-> theGroupBSize
:
307 metadata
-> theGroupASize
;
311 Ipc :: FewToFewBiQueue :: remotesIdOffset () const
313 return theLocalGroup
== groupA
? metadata
-> theGroupBIdOffset
:
314 metadata
-> theGroupAIdOffset
;
317 Ipc :: FewToFewBiQueue :: Metadata :: Metadata ( const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
):
318 theGroupASize ( aGroupASize
), theGroupAIdOffset ( aGroupAIdOffset
),
319 theGroupBSize ( aGroupBSize
), theGroupBIdOffset ( aGroupBIdOffset
)
321 Must ( theGroupASize
> 0 );
322 Must ( theGroupBSize
> 0 );
325 Ipc :: FewToFewBiQueue :: Owner :: Owner ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
):
326 metadataOwner ( shm_new ( Metadata
)( MetadataId ( id
). termedBuf (), groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
)),
327 queuesOwner ( shm_new ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf (), groupASize
* groupBSize
* 2 , maxItemSize
, capacity
)),
328 readersOwner ( shm_new ( QueueReaders
)( ReadersId ( id
). termedBuf (), groupASize
+ groupBSize
))
332 Ipc :: FewToFewBiQueue :: Owner ::~ Owner ()
334 delete metadataOwner
;
341 Ipc :: MultiQueue :: Owner
*
342 Ipc :: MultiQueue :: Init ( const String
& id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
)
344 return new Owner ( id
, processCount
, processIdOffset
, maxItemSize
, capacity
);
347 Ipc :: MultiQueue :: MultiQueue ( const String
& id
, const int localProcessId
):
348 BaseMultiQueue ( localProcessId
),
349 metadata ( shm_old ( Metadata
)( MetadataId ( id
). termedBuf ())),
350 queues ( shm_old ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf ())),
351 readers ( shm_old ( QueueReaders
)( ReadersId ( id
). termedBuf ()))
353 Must ( queues
-> theCapacity
== metadata
-> theProcessCount
* metadata
-> theProcessCount
);
354 Must ( readers
-> theCapacity
== metadata
-> theProcessCount
);
356 debugs ( 54 , 7 , "queue " << id
<< " reader: " << localReader (). id
);
360 Ipc :: MultiQueue :: validProcessId ( const int processId
) const
362 return metadata
-> theProcessIdOffset
<= processId
&&
363 processId
< metadata
-> theProcessIdOffset
+ metadata
-> theProcessCount
;
366 const Ipc :: OneToOneUniQueue
&
367 Ipc :: MultiQueue :: oneToOneQueue ( const int fromProcessId
, const int toProcessId
) const
369 assert ( validProcessId ( fromProcessId
));
370 assert ( validProcessId ( toProcessId
));
371 const int fromIndex
= fromProcessId
- metadata
-> theProcessIdOffset
;
372 const int toIndex
= toProcessId
- metadata
-> theProcessIdOffset
;
373 const int index
= fromIndex
* metadata
-> theProcessCount
+ toIndex
;
374 return (* queues
)[ index
];
377 const Ipc :: QueueReader
&
378 Ipc :: MultiQueue :: reader ( const int processId
) const
380 assert ( validProcessId ( processId
));
381 const int index
= processId
- metadata
-> theProcessIdOffset
;
382 return readers
-> theReaders
[ index
];
385 const Ipc :: OneToOneUniQueue
&
386 Ipc :: MultiQueue :: inQueue ( const int remoteProcessId
) const
388 return oneToOneQueue ( remoteProcessId
, theLocalProcessId
);
391 const Ipc :: OneToOneUniQueue
&
392 Ipc :: MultiQueue :: outQueue ( const int remoteProcessId
) const
394 return oneToOneQueue ( theLocalProcessId
, remoteProcessId
);
397 const Ipc :: QueueReader
&
398 Ipc :: MultiQueue :: localReader () const
400 return reader ( theLocalProcessId
);
403 const Ipc :: QueueReader
&
404 Ipc :: MultiQueue :: remoteReader ( const int processId
) const
406 return reader ( processId
);
410 Ipc :: MultiQueue :: remotesCount () const
412 return metadata
-> theProcessCount
;
416 Ipc :: MultiQueue :: remotesIdOffset () const
418 return metadata
-> theProcessIdOffset
;
421 Ipc :: MultiQueue :: Metadata :: Metadata ( const int aProcessCount
, const int aProcessIdOffset
):
422 theProcessCount ( aProcessCount
), theProcessIdOffset ( aProcessIdOffset
)
424 Must ( theProcessCount
> 0 );
427 Ipc :: MultiQueue :: Owner :: Owner ( const String
& id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
):
428 metadataOwner ( shm_new ( Metadata
)( MetadataId ( id
). termedBuf (), processCount
, processIdOffset
)),
429 queuesOwner ( shm_new ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf (), processCount
* processCount
, maxItemSize
, capacity
)),
430 readersOwner ( shm_new ( QueueReaders
)( ReadersId ( id
). termedBuf (), processCount
))
434 Ipc :: MultiQueue :: Owner ::~ Owner ()
436 delete metadataOwner
;