]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
12 #include "ipc/Queue.h"
14 /// constructs Metadata ID from parent queue ID
18 id
. append ( "__metadata" );
22 /// constructs one-to-one queues ID from parent queue ID
26 id
. append ( "__queues" );
30 /// constructs QueueReaders ID from parent queue ID
34 id
. append ( "__readers" );
41 InstanceIdDefinitions ( Ipc :: QueueReader
, "ipcQR" );
43 Ipc :: QueueReader :: QueueReader (): popBlocked ( 1 ), popSignal ( 0 )
45 debugs ( 54 , 7 , HERE
<< "constructed " << id
);
50 Ipc :: QueueReaders :: QueueReaders ( const int aCapacity
): theCapacity ( aCapacity
)
52 Must ( theCapacity
> 0 );
53 new ( theReaders
) QueueReader
[ theCapacity
];
57 Ipc :: QueueReaders :: sharedMemorySize () const
59 return SharedMemorySize ( theCapacity
);
63 Ipc :: QueueReaders :: SharedMemorySize ( const int capacity
)
65 return sizeof ( QueueReaders
) + sizeof ( QueueReader
) * capacity
;
71 Ipc :: OneToOneUniQueue :: OneToOneUniQueue ( const unsigned int aMaxItemSize
, const int aCapacity
):
72 theIn ( 0 ), theOut ( 0 ), theSize ( 0 ), theMaxItemSize ( aMaxItemSize
),
73 theCapacity ( aCapacity
)
75 Must ( theMaxItemSize
> 0 );
76 Must ( theCapacity
> 0 );
80 Ipc :: OneToOneUniQueue :: Bytes2Items ( const unsigned int maxItemSize
, int size
)
82 assert ( maxItemSize
> 0 );
83 size
-= sizeof ( OneToOneUniQueue
);
84 return size
>= 0 ? size
/ maxItemSize
: 0 ;
88 Ipc :: OneToOneUniQueue :: Items2Bytes ( const unsigned int maxItemSize
, const int size
)
91 return sizeof ( OneToOneUniQueue
) + maxItemSize
* size
;
95 /* OneToOneUniQueues */
97 Ipc :: OneToOneUniQueues :: OneToOneUniQueues ( const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
): theCapacity ( aCapacity
)
99 Must ( theCapacity
> 0 );
100 for ( int i
= 0 ; i
< theCapacity
; ++ i
)
101 new (&(* this )[ i
]) OneToOneUniQueue ( maxItemSize
, queueCapacity
);
105 Ipc :: OneToOneUniQueues :: sharedMemorySize () const
107 return sizeof (* this ) + theCapacity
* front (). sharedMemorySize ();
111 Ipc :: OneToOneUniQueues :: SharedMemorySize ( const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
)
113 const int queueSize
=
114 OneToOneUniQueue :: Items2Bytes ( maxItemSize
, queueCapacity
);
115 return sizeof ( OneToOneUniQueues
) + queueSize
* capacity
;
118 const Ipc :: OneToOneUniQueue
&
119 Ipc :: OneToOneUniQueues :: operator []( const int index
) const
121 Must ( 0 <= index
&& index
< theCapacity
);
122 const size_t queueSize
= index
? front (). sharedMemorySize () : 0 ;
123 const char * const queue
=
124 reinterpret_cast < const char *>( this ) + sizeof (* this ) + index
* queueSize
;
125 return * reinterpret_cast < const OneToOneUniQueue
*>( queue
);
131 Ipc :: FewToFewBiQueue :: Owner
*
132 Ipc :: FewToFewBiQueue :: Init ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
)
134 return new Owner ( id
, groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
, maxItemSize
, capacity
);
137 Ipc :: FewToFewBiQueue :: FewToFewBiQueue ( const String
& id
, const Group aLocalGroup
, const int aLocalProcessId
):
138 metadata ( shm_old ( Metadata
)( MetadataId ( id
). termedBuf ())),
139 queues ( shm_old ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf ())),
140 readers ( shm_old ( QueueReaders
)( ReadersId ( id
). termedBuf ())),
141 theLocalGroup ( aLocalGroup
), theLocalProcessId ( aLocalProcessId
),
142 theLastPopProcessId ( readers
-> theCapacity
)
144 Must ( queues
-> theCapacity
== metadata
-> theGroupASize
* metadata
-> theGroupBSize
* 2 );
145 Must ( readers
-> theCapacity
== metadata
-> theGroupASize
+ metadata
-> theGroupBSize
);
147 const QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
148 debugs ( 54 , 7 , HERE
<< "queue " << id
<< " reader: " << localReader
. id
);
152 Ipc :: FewToFewBiQueue :: validProcessId ( const Group group
, const int processId
) const
156 return metadata
-> theGroupAIdOffset
<= processId
&&
157 processId
< metadata
-> theGroupAIdOffset
+ metadata
-> theGroupASize
;
159 return metadata
-> theGroupBIdOffset
<= processId
&&
160 processId
< metadata
-> theGroupBIdOffset
+ metadata
-> theGroupBSize
;
166 Ipc :: FewToFewBiQueue :: oneToOneQueueIndex ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
168 Must ( fromGroup
!= toGroup
);
169 assert ( validProcessId ( fromGroup
, fromProcessId
));
170 assert ( validProcessId ( toGroup
, toProcessId
));
174 if ( fromGroup
== groupA
) {
175 index1
= fromProcessId
- metadata
-> theGroupAIdOffset
;
176 index2
= toProcessId
- metadata
-> theGroupBIdOffset
;
179 index1
= toProcessId
- metadata
-> theGroupAIdOffset
;
180 index2
= fromProcessId
- metadata
-> theGroupBIdOffset
;
181 offset
= metadata
-> theGroupASize
* metadata
-> theGroupBSize
;
183 const int index
= offset
+ index1
* metadata
-> theGroupBSize
+ index2
;
187 Ipc :: OneToOneUniQueue
&
188 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
)
190 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
193 const Ipc :: OneToOneUniQueue
&
194 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
196 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
200 Ipc :: FewToFewBiQueue :: reader ( const Group group
, const int processId
)
202 Must ( validProcessId ( group
, processId
));
203 const int index
= group
== groupA
?
204 processId
- metadata
-> theGroupAIdOffset
:
205 metadata
-> theGroupASize
+ processId
- metadata
-> theGroupBIdOffset
;
206 return readers
-> theReaders
[ index
];
210 Ipc :: FewToFewBiQueue :: clearReaderSignal ( const int remoteProcessId
)
212 QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
213 debugs ( 54 , 7 , HERE
<< "reader: " << localReader
. id
);
215 Must ( validProcessId ( remoteGroup (), remoteProcessId
));
216 localReader
. clearSignal ();
218 // we got a hint; we could reposition iteration to try popping from the
219 // remoteProcessId queue first; but it does not seem to help much and might
220 // introduce some bias so we do not do that for now:
221 // theLastPopProcessId = remoteProcessId;
224 Ipc :: FewToFewBiQueue :: Metadata :: Metadata ( const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
):
225 theGroupASize ( aGroupASize
), theGroupAIdOffset ( aGroupAIdOffset
),
226 theGroupBSize ( aGroupBSize
), theGroupBIdOffset ( aGroupBIdOffset
)
228 Must ( theGroupASize
> 0 );
229 Must ( theGroupBSize
> 0 );
232 Ipc :: FewToFewBiQueue :: Owner :: Owner ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
):
233 metadataOwner ( shm_new ( Metadata
)( MetadataId ( id
). termedBuf (), groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
)),
234 queuesOwner ( shm_new ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf (), groupASize
* groupBSize
* 2 , maxItemSize
, capacity
)),
235 readersOwner ( shm_new ( QueueReaders
)( ReadersId ( id
). termedBuf (), groupASize
+ groupBSize
))
239 Ipc :: FewToFewBiQueue :: Owner ::~ Owner ()
241 delete metadataOwner
;