return q
}
+type autodrainingInboundQueue struct {
+ c chan *QueueInboundElement
+}
+
// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
// It is useful in cases in which is it hard to manage the lifetime of the channel.
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
-func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
- type autodrainingInboundQueue struct {
- c chan *QueueInboundElement
- }
+func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize),
}
}
}
})
- return q.c
+ return q
+}
+
+type autodrainingOutboundQueue struct {
+ c chan *QueueOutboundElement
}
// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
// The returned channel must not be closed. Senders should signal shutdown using
// some other means, such as sending a sentinel nil values.
// All sends to the channel must be best-effort, because there may be no receivers.
-func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement {
- type autodrainingOutboundQueue struct {
- c chan *QueueOutboundElement
- }
+func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize),
}
}
}
})
- return q.c
+ return q
}
queue struct {
staged chan *QueueOutboundElement // staged packets before a handshake is available
- outbound chan *QueueOutboundElement // sequential ordering of udp transmission
- inbound chan *QueueInboundElement // sequential ordering of tun writing
+ outbound *autodrainingOutboundQueue // sequential ordering of udp transmission
+ inbound *autodrainingInboundQueue // sequential ordering of tun writing
}
cookieGenerator CookieGenerator
peer.timersStop()
// Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit.
- peer.queue.inbound <- nil
- peer.queue.outbound <- nil
+ peer.queue.inbound.c <- nil
+ peer.queue.outbound.c <- nil
peer.stopping.Wait()
peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us
}()
}
wg.Wait()
-}
\ No newline at end of file
+}
// add to decryption queues
if peer.isRunning.Get() {
- peer.queue.inbound <- elem
+ peer.queue.inbound.c <- elem
device.queue.decryption.c <- elem
buffer = device.GetMessageBuffer()
} else {
}()
device.log.Verbosef("%v - Routine: sequential receiver - started", peer)
- for elem := range peer.queue.inbound {
+ for elem := range peer.queue.inbound.c {
if elem == nil {
return
}
if err != nil && !device.isClosed() {
device.log.Errorf("Failed to write packet to TUN device: %v", err)
}
- if len(peer.queue.inbound) == 0 {
+ if len(peer.queue.inbound.c) == 0 {
err = device.tun.device.Flush()
if err != nil {
peer.device.log.Errorf("Unable to flush packets: %v", err)
// add to parallel and sequential queue
if peer.isRunning.Get() {
- peer.queue.outbound <- elem
+ peer.queue.outbound.c <- elem
peer.device.queue.encryption.c <- elem
} else {
peer.device.PutMessageBuffer(elem.buffer)
}()
device.log.Verbosef("%v - Routine: sequential sender - started", peer)
- for elem := range peer.queue.outbound {
+ for elem := range peer.queue.outbound.c {
if elem == nil {
return
}