*/
type QueueOutboundElement struct {
- dropped int32
sync.Mutex
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
func (device *Device) NewOutboundElement() *QueueOutboundElement {
elem := device.GetOutboundElement()
- elem.dropped = AtomicFalse
elem.buffer = device.GetMessageBuffer()
elem.Mutex = sync.Mutex{}
elem.nonce = 0
elem.peer = nil
}
-func (elem *QueueOutboundElement) Drop() {
- atomic.StoreInt32(&elem.dropped, AtomicTrue)
-}
-
-func (elem *QueueOutboundElement) IsDropped() bool {
- return atomic.LoadInt32(&elem.dropped) == AtomicTrue
-}
-
func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElement, device *Device) {
for {
select {
}
elem.keypair = keypair
- elem.dropped = AtomicFalse
elem.Lock()
// add to parallel and sequential queue
logDebug.Println("Routine: encryption worker - started")
for elem := range device.queue.encryption.c {
-
- // check if dropped
-
- if elem.IsDropped() {
- continue
- }
-
// populate header fields
-
header := elem.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
for elem := range peer.queue.outbound {
elem.Lock()
- if elem.IsDropped() {
- device.PutOutboundElement(elem)
- continue
- }
if !peer.isRunning.Get() {
// peer has been stopped; return re-usable elems to the shared pool.
// This is an optimization only. It is possible for the peer to be stopped