]> git.ipfire.org Git - thirdparty/wireguard-go.git/commitdiff
device: remove mutex from Peer send/receive
authorJosh Bleecher Snyder <josh@tailscale.com>
Mon, 8 Feb 2021 21:02:52 +0000 (13:02 -0800)
committerJosh Bleecher Snyder <josh@tailscale.com>
Mon, 8 Feb 2021 21:02:52 +0000 (13:02 -0800)
The immediate motivation for this change is an observed deadlock.

1. A goroutine calls peer.Stop. That calls peer.queue.Lock().
2. Another goroutine is in RoutineSequentialReceiver.
   It receives an elem from peer.queue.inbound.
3. The peer.Stop goroutine calls close(peer.queue.inbound),
   close(peer.queue.outbound), and peer.stopping.Wait().
   It blocks waiting for RoutineSequentialReceiver
   and RoutineSequentialSender to exit.
4. The RoutineSequentialReceiver goroutine calls peer.SendStagedPackets().
   SendStagedPackets attempts peer.queue.RLock().
   That blocks forever because the peer.Stop
   goroutine holds a write lock on that mutex.

A background motivation for this change is that it can be expensive
to have a mutex in the hot code path of RoutineSequential*.

The mutex was necessary to avoid attempting to send elems on a closed channel.
This commit removes that danger by never closing the channel.
Instead, we send a sentinel nil value on the channel to indicate
to the receiver that it should exit.

The only problem with this is that if the receiver exits,
we could write an elem into the channel which would never get received.
If it never gets received, it cannot get returned to the device pools.

To work around this, we use a finalizer. When the channel can be GC'd,
the finalizer drains any remaining elements from the channel and
restores them to the device pool.

After that change, peer.queue.RWMutex no longer makes sense where it is.
It is only used to prevent concurrent calls to Start and Stop.
Move it to a more sensible location and make it a plain sync.Mutex.

Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
device/channels.go
device/peer.go
device/receive.go
device/send.go

index 4471477fee5888590642bc05f7c825d396899b19..8cd6aee3168b7822d37c34cd0a7df8b8c6ff1662 100644 (file)
@@ -5,7 +5,10 @@
 
 package device
 
-import "sync"
+import (
+       "runtime"
+       "sync"
+)
 
 // An outboundQueue is a channel of QueueOutboundElements awaiting encryption.
 // An outboundQueue is ref-counted using its wg field.
@@ -67,3 +70,60 @@ func newHandshakeQueue() *handshakeQueue {
        }()
        return q
 }
+
+// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement {
+       type autodrainingInboundQueue struct {
+               c chan *QueueInboundElement
+       }
+       q := &autodrainingInboundQueue{
+               c: make(chan *QueueInboundElement, QueueInboundSize),
+       }
+       runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) {
+               for {
+                       select {
+                       case elem := <-q.c:
+                               if elem == nil {
+                                       continue
+                               }
+                               device.PutMessageBuffer(elem.buffer)
+                               device.PutInboundElement(elem)
+                       default:
+                               return
+                       }
+               }
+       })
+       return q.c
+}
+
+// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
+// It is useful in cases in which is it hard to manage the lifetime of the channel.
+// The returned channel must not be closed. Senders should signal shutdown using
+// some other means, such as sending a sentinel nil values.
+// All sends to the channel must be best-effort, because there may be no receivers.
+func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement {
+       type autodrainingOutboundQueue struct {
+               c chan *QueueOutboundElement
+       }
+       q := &autodrainingOutboundQueue{
+               c: make(chan *QueueOutboundElement, QueueOutboundSize),
+       }
+       runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) {
+               for {
+                       select {
+                       case elem := <-q.c:
+                               if elem == nil {
+                                       continue
+                               }
+                               device.PutMessageBuffer(elem.buffer)
+                               device.PutOutboundElement(elem)
+                       default:
+                               return
+                       }
+               }
+       })
+       return q.c
+}
index 3e4f4ecbf4ed4df99987388711554cedaf3e485e..49b9acb6b73e2babed17a0071c17c4299d9bebd6 100644 (file)
@@ -51,8 +51,11 @@ type Peer struct {
                sentLastMinuteHandshake AtomicBool
        }
 
+       state struct {
+               mu sync.Mutex // protects against concurrent Start/Stop
+       }
+
        queue struct {
-               sync.RWMutex
                staged   chan *QueueOutboundElement // staged packets before a handshake is available
                outbound chan *QueueOutboundElement // sequential ordering of udp transmission
                inbound  chan *QueueInboundElement  // sequential ordering of tun writing
@@ -158,8 +161,8 @@ func (peer *Peer) Start() {
        }
 
        // prevent simultaneous start/stop operations
-       peer.queue.Lock()
-       defer peer.queue.Unlock()
+       peer.state.mu.Lock()
+       defer peer.state.mu.Unlock()
 
        if peer.isRunning.Get() {
                return
@@ -177,8 +180,8 @@ func (peer *Peer) Start() {
        peer.handshake.mutex.Unlock()
 
        // prepare queues
-       peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
-       peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
+       peer.queue.outbound = newAutodrainingOutboundQueue(device)
+       peer.queue.inbound = newAutodrainingInboundQueue(device)
        if peer.queue.staged == nil {
                peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
        }
@@ -239,8 +242,8 @@ func (peer *Peer) ExpireCurrentKeypairs() {
 }
 
 func (peer *Peer) Stop() {
-       peer.queue.Lock()
-       defer peer.queue.Unlock()
+       peer.state.mu.Lock()
+       defer peer.state.mu.Unlock()
 
        if !peer.isRunning.Swap(false) {
                return
@@ -249,9 +252,9 @@ func (peer *Peer) Stop() {
        peer.device.log.Verbosef("%v - Stopping...", peer)
 
        peer.timersStop()
-
-       close(peer.queue.inbound)
-       close(peer.queue.outbound)
+       // Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit.
+       peer.queue.inbound <- nil
+       peer.queue.outbound <- nil
        peer.stopping.Wait()
        peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us
 
index c6a28f7fb4559d4665142eb10b33a971e94003fb..7acb7d9f281184c7938f7870a7ec326067970876 100644 (file)
@@ -166,7 +166,6 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
                        elem.Lock()
 
                        // add to decryption queues
-                       peer.queue.RLock()
                        if peer.isRunning.Get() {
                                peer.queue.inbound <- elem
                                device.queue.decryption.c <- elem
@@ -174,8 +173,6 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
                        } else {
                                device.PutInboundElement(elem)
                        }
-                       peer.queue.RUnlock()
-
                        continue
 
                // otherwise it is a fixed size & handshake related packet
@@ -406,6 +403,9 @@ func (peer *Peer) RoutineSequentialReceiver() {
        device.log.Verbosef("%v - Routine: sequential receiver - started", peer)
 
        for elem := range peer.queue.inbound {
+               if elem == nil {
+                       return
+               }
                var err error
                elem.Lock()
                if elem.packet == nil {
index 982fec0b6da8ea9e729b19cbf9ffebbf5b6d2a93..911ee5caa388ec2ca2b129bcc11ff374bec8bb8a 100644 (file)
@@ -316,7 +316,6 @@ top:
                        elem.Lock()
 
                        // add to parallel and sequential queue
-                       peer.queue.RLock()
                        if peer.isRunning.Get() {
                                peer.queue.outbound <- elem
                                peer.device.queue.encryption.c <- elem
@@ -324,7 +323,6 @@ top:
                                peer.device.PutMessageBuffer(elem.buffer)
                                peer.device.PutOutboundElement(elem)
                        }
-                       peer.queue.RUnlock()
                default:
                        return
                }
@@ -413,6 +411,9 @@ func (peer *Peer) RoutineSequentialSender() {
        device.log.Verbosef("%v - Routine: sequential sender - started", peer)
 
        for elem := range peer.queue.outbound {
+               if elem == nil {
+                       return
+               }
                elem.Lock()
                if !peer.isRunning.Get() {
                        // peer has been stopped; return re-usable elems to the shared pool.