]> git.ipfire.org Git - thirdparty/wireguard-go.git/commitdiff
device: move Queue{In,Out}boundElement Mutex to container type
authorJordan Whited <jordan@tailscale.com>
Mon, 2 Oct 2023 21:48:28 +0000 (14:48 -0700)
committerJason A. Donenfeld <Jason@zx2c4.com>
Tue, 10 Oct 2023 13:07:36 +0000 (15:07 +0200)
Queue{In,Out}boundElement locking can contribute to significant
overhead via sync.Mutex.lockSlow() in some environments. These types
are passed throughout the device package as elements in a slice, so
move the per-element Mutex to a container around the slice.

Reviewed-by: Maisem Ali <maisem@tailscale.com>
Signed-off-by: Jordan Whited <jordan@tailscale.com>
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
device/channels.go
device/device.go
device/peer.go
device/pools.go
device/receive.go
device/send.go

index 40ee5c9a51ae083fe7b4aba0ccd2050fe938557e..e526f6bb1dd156b7a2bda659b6163e7874b2df34 100644 (file)
@@ -19,13 +19,13 @@ import (
 // call wg.Done to remove the initial reference.
 // When the refcount hits 0, the queue's channel is closed.
 type outboundQueue struct {
-       c  chan *[]*QueueOutboundElement
+       c  chan *QueueOutboundElementsContainer
        wg sync.WaitGroup
 }
 
 func newOutboundQueue() *outboundQueue {
        q := &outboundQueue{
-               c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
+               c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
        }
        q.wg.Add(1)
        go func() {
@@ -37,13 +37,13 @@ func newOutboundQueue() *outboundQueue {
 
 // A inboundQueue is similar to an outboundQueue; see those docs.
 type inboundQueue struct {
-       c  chan *[]*QueueInboundElement
+       c  chan *QueueInboundElementsContainer
        wg sync.WaitGroup
 }
 
 func newInboundQueue() *inboundQueue {
        q := &inboundQueue{
-               c: make(chan *[]*QueueInboundElement, QueueInboundSize),
+               c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
        }
        q.wg.Add(1)
        go func() {
@@ -72,7 +72,7 @@ func newHandshakeQueue() *handshakeQueue {
 }
 
 type autodrainingInboundQueue struct {
-       c chan *[]*QueueInboundElement
+       c chan *QueueInboundElementsContainer
 }
 
 // newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
@@ -81,7 +81,7 @@ type autodrainingInboundQueue struct {
 // some other means, such as sending a sentinel nil values.
 func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
        q := &autodrainingInboundQueue{
-               c: make(chan *[]*QueueInboundElement, QueueInboundSize),
+               c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
        }
        runtime.SetFinalizer(q, device.flushInboundQueue)
        return q
@@ -90,13 +90,13 @@ func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
 func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
        for {
                select {
-               case elems := <-q.c:
-                       for _, elem := range *elems {
-                               elem.Lock()
+               case elemsContainer := <-q.c:
+                       elemsContainer.Lock()
+                       for _, elem := range elemsContainer.elems {
                                device.PutMessageBuffer(elem.buffer)
                                device.PutInboundElement(elem)
                        }
-                       device.PutInboundElementsSlice(elems)
+                       device.PutInboundElementsContainer(elemsContainer)
                default:
                        return
                }
@@ -104,7 +104,7 @@ func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
 }
 
 type autodrainingOutboundQueue struct {
-       c chan *[]*QueueOutboundElement
+       c chan *QueueOutboundElementsContainer
 }
 
 // newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
@@ -114,7 +114,7 @@ type autodrainingOutboundQueue struct {
 // All sends to the channel must be best-effort, because there may be no receivers.
 func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
        q := &autodrainingOutboundQueue{
-               c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
+               c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
        }
        runtime.SetFinalizer(q, device.flushOutboundQueue)
        return q
@@ -123,13 +123,13 @@ func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
 func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
        for {
                select {
-               case elems := <-q.c:
-                       for _, elem := range *elems {
-                               elem.Lock()
+               case elemsContainer := <-q.c:
+                       elemsContainer.Lock()
+                       for _, elem := range elemsContainer.elems {
                                device.PutMessageBuffer(elem.buffer)
                                device.PutOutboundElement(elem)
                        }
-                       device.PutOutboundElementsSlice(elems)
+                       device.PutOutboundElementsContainer(elemsContainer)
                default:
                        return
                }
index 1af9fe01770048114cc2260a5761663709d6df47..f9557a075b50434feaec3a982bc1738527bef482 100644 (file)
@@ -68,11 +68,11 @@ type Device struct {
        cookieChecker CookieChecker
 
        pool struct {
-               outboundElementsSlice *WaitPool
-               inboundElementsSlice  *WaitPool
-               messageBuffers        *WaitPool
-               inboundElements       *WaitPool
-               outboundElements      *WaitPool
+               inboundElementsContainer  *WaitPool
+               outboundElementsContainer *WaitPool
+               messageBuffers            *WaitPool
+               inboundElements           *WaitPool
+               outboundElements          *WaitPool
        }
 
        queue struct {
index 0ac48962c8eaa2d2b0dfd8c3cef84b12b5e0e9d1..2fb5da62a53ea0ced889ace7b3cb06840f6fc7d1 100644 (file)
@@ -45,9 +45,9 @@ type Peer struct {
        }
 
        queue struct {
-               staged   chan *[]*QueueOutboundElement // staged packets before a handshake is available
-               outbound *autodrainingOutboundQueue    // sequential ordering of udp transmission
-               inbound  *autodrainingInboundQueue     // sequential ordering of tun writing
+               staged   chan *QueueOutboundElementsContainer // staged packets before a handshake is available
+               outbound *autodrainingOutboundQueue           // sequential ordering of udp transmission
+               inbound  *autodrainingInboundQueue            // sequential ordering of tun writing
        }
 
        cookieGenerator             CookieGenerator
@@ -81,7 +81,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
        peer.device = device
        peer.queue.outbound = newAutodrainingOutboundQueue(device)
        peer.queue.inbound = newAutodrainingInboundQueue(device)
-       peer.queue.staged = make(chan *[]*QueueOutboundElement, QueueStagedSize)
+       peer.queue.staged = make(chan *QueueOutboundElementsContainer, QueueStagedSize)
 
        // map public key
        _, ok := device.peers.keyMap[pk]
index 02a5d6acb302aa90ee97da4a531f7a93735bbbc7..94f3dc7e6b9271007812298877978ac213d7f569 100644 (file)
@@ -46,13 +46,13 @@ func (p *WaitPool) Put(x any) {
 }
 
 func (device *Device) PopulatePools() {
-       device.pool.outboundElementsSlice = NewWaitPool(PreallocatedBuffersPerPool, func() any {
-               s := make([]*QueueOutboundElement, 0, device.BatchSize())
-               return &s
-       })
-       device.pool.inboundElementsSlice = NewWaitPool(PreallocatedBuffersPerPool, func() any {
+       device.pool.inboundElementsContainer = NewWaitPool(PreallocatedBuffersPerPool, func() any {
                s := make([]*QueueInboundElement, 0, device.BatchSize())
-               return &s
+               return &QueueInboundElementsContainer{elems: s}
+       })
+       device.pool.outboundElementsContainer = NewWaitPool(PreallocatedBuffersPerPool, func() any {
+               s := make([]*QueueOutboundElement, 0, device.BatchSize())
+               return &QueueOutboundElementsContainer{elems: s}
        })
        device.pool.messageBuffers = NewWaitPool(PreallocatedBuffersPerPool, func() any {
                return new([MaxMessageSize]byte)
@@ -65,28 +65,32 @@ func (device *Device) PopulatePools() {
        })
 }
 
-func (device *Device) GetOutboundElementsSlice() *[]*QueueOutboundElement {
-       return device.pool.outboundElementsSlice.Get().(*[]*QueueOutboundElement)
+func (device *Device) GetInboundElementsContainer() *QueueInboundElementsContainer {
+       c := device.pool.inboundElementsContainer.Get().(*QueueInboundElementsContainer)
+       c.Mutex = sync.Mutex{}
+       return c
 }
 
-func (device *Device) PutOutboundElementsSlice(s *[]*QueueOutboundElement) {
-       for i := range *s {
-               (*s)[i] = nil
+func (device *Device) PutInboundElementsContainer(c *QueueInboundElementsContainer) {
+       for i := range c.elems {
+               c.elems[i] = nil
        }
-       *s = (*s)[:0]
-       device.pool.outboundElementsSlice.Put(s)
+       c.elems = c.elems[:0]
+       device.pool.inboundElementsContainer.Put(c)
 }
 
-func (device *Device) GetInboundElementsSlice() *[]*QueueInboundElement {
-       return device.pool.inboundElementsSlice.Get().(*[]*QueueInboundElement)
+func (device *Device) GetOutboundElementsContainer() *QueueOutboundElementsContainer {
+       c := device.pool.outboundElementsContainer.Get().(*QueueOutboundElementsContainer)
+       c.Mutex = sync.Mutex{}
+       return c
 }
 
-func (device *Device) PutInboundElementsSlice(s *[]*QueueInboundElement) {
-       for i := range *s {
-               (*s)[i] = nil
+func (device *Device) PutOutboundElementsContainer(c *QueueOutboundElementsContainer) {
+       for i := range c.elems {
+               c.elems[i] = nil
        }
-       *s = (*s)[:0]
-       device.pool.inboundElementsSlice.Put(s)
+       c.elems = c.elems[:0]
+       device.pool.outboundElementsContainer.Put(c)
 }
 
 func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {
index f0f37a14aa234f43bb98942fa35ddfb0d28ae0cb..4b32dc5871c72b0d16402112bcfbedd00e0efa02 100644 (file)
@@ -27,7 +27,6 @@ type QueueHandshakeElement struct {
 }
 
 type QueueInboundElement struct {
-       sync.Mutex
        buffer   *[MaxMessageSize]byte
        packet   []byte
        counter  uint64
@@ -35,6 +34,11 @@ type QueueInboundElement struct {
        endpoint conn.Endpoint
 }
 
+type QueueInboundElementsContainer struct {
+       sync.Mutex
+       elems []*QueueInboundElement
+}
+
 // clearPointers clears elem fields that contain pointers.
 // This makes the garbage collector's life easier and
 // avoids accidentally keeping other objects around unnecessarily.
@@ -87,7 +91,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
                count       int
                endpoints   = make([]conn.Endpoint, maxBatchSize)
                deathSpiral int
-               elemsByPeer = make(map[*Peer]*[]*QueueInboundElement, maxBatchSize)
+               elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize)
        )
 
        for i := range bufsArrs {
@@ -170,15 +174,14 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
                                elem.keypair = keypair
                                elem.endpoint = endpoints[i]
                                elem.counter = 0
-                               elem.Mutex = sync.Mutex{}
-                               elem.Lock()
 
                                elemsForPeer, ok := elemsByPeer[peer]
                                if !ok {
-                                       elemsForPeer = device.GetInboundElementsSlice()
+                                       elemsForPeer = device.GetInboundElementsContainer()
+                                       elemsForPeer.Lock()
                                        elemsByPeer[peer] = elemsForPeer
                                }
-                               *elemsForPeer = append(*elemsForPeer, elem)
+                               elemsForPeer.elems = append(elemsForPeer.elems, elem)
                                bufsArrs[i] = device.GetMessageBuffer()
                                bufs[i] = bufsArrs[i][:]
                                continue
@@ -217,16 +220,16 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
                        default:
                        }
                }
-               for peer, elems := range elemsByPeer {
+               for peer, elemsContainer := range elemsByPeer {
                        if peer.isRunning.Load() {
-                               peer.queue.inbound.c <- elems
-                               device.queue.decryption.c <- elems
+                               peer.queue.inbound.c <- elemsContainer
+                               device.queue.decryption.c <- elemsContainer
                        } else {
-                               for _, elem := range *elems {
+                               for _, elem := range elemsContainer.elems {
                                        device.PutMessageBuffer(elem.buffer)
                                        device.PutInboundElement(elem)
                                }
-                               device.PutInboundElementsSlice(elems)
+                               device.PutInboundElementsContainer(elemsContainer)
                        }
                        delete(elemsByPeer, peer)
                }
@@ -239,8 +242,8 @@ func (device *Device) RoutineDecryption(id int) {
        defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
        device.log.Verbosef("Routine: decryption worker %d - started", id)
 
-       for elems := range device.queue.decryption.c {
-               for _, elem := range *elems {
+       for elemsContainer := range device.queue.decryption.c {
+               for _, elem := range elemsContainer.elems {
                        // split message into fields
                        counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
                        content := elem.packet[MessageTransportOffsetContent:]
@@ -259,8 +262,8 @@ func (device *Device) RoutineDecryption(id int) {
                        if err != nil {
                                elem.packet = nil
                        }
-                       elem.Unlock()
                }
+               elemsContainer.Unlock()
        }
 }
 
@@ -437,12 +440,12 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
 
        bufs := make([][]byte, 0, maxBatchSize)
 
-       for elems := range peer.queue.inbound.c {
-               if elems == nil {
+       for elemsContainer := range peer.queue.inbound.c {
+               if elemsContainer == nil {
                        return
                }
-               for _, elem := range *elems {
-                       elem.Lock()
+               elemsContainer.Lock()
+               for _, elem := range elemsContainer.elems {
                        if elem.packet == nil {
                                // decryption failed
                                continue
@@ -515,11 +518,11 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
                                device.log.Errorf("Failed to write packets to TUN device: %v", err)
                        }
                }
-               for _, elem := range *elems {
+               for _, elem := range elemsContainer.elems {
                        device.PutMessageBuffer(elem.buffer)
                        device.PutInboundElement(elem)
                }
                bufs = bufs[:0]
-               device.PutInboundElementsSlice(elems)
+               device.PutInboundElementsContainer(elemsContainer)
        }
 }
index e838c4e847f52f28572815d277d238fa32f9bf9e..769720af8f2127ff04d905eefb892b74112af002 100644 (file)
@@ -46,7 +46,6 @@ import (
  */
 
 type QueueOutboundElement struct {
-       sync.Mutex
        buffer  *[MaxMessageSize]byte // slice holding the packet data
        packet  []byte                // slice of "buffer" (always!)
        nonce   uint64                // nonce for encryption
@@ -54,10 +53,14 @@ type QueueOutboundElement struct {
        peer    *Peer                 // related peer
 }
 
+type QueueOutboundElementsContainer struct {
+       sync.Mutex
+       elems []*QueueOutboundElement
+}
+
 func (device *Device) NewOutboundElement() *QueueOutboundElement {
        elem := device.GetOutboundElement()
        elem.buffer = device.GetMessageBuffer()
-       elem.Mutex = sync.Mutex{}
        elem.nonce = 0
        // keypair and peer were cleared (if necessary) by clearPointers.
        return elem
@@ -79,15 +82,15 @@ func (elem *QueueOutboundElement) clearPointers() {
 func (peer *Peer) SendKeepalive() {
        if len(peer.queue.staged) == 0 && peer.isRunning.Load() {
                elem := peer.device.NewOutboundElement()
-               elems := peer.device.GetOutboundElementsSlice()
-               *elems = append(*elems, elem)
+               elemsContainer := peer.device.GetOutboundElementsContainer()
+               elemsContainer.elems = append(elemsContainer.elems, elem)
                select {
-               case peer.queue.staged <- elems:
+               case peer.queue.staged <- elemsContainer:
                        peer.device.log.Verbosef("%v - Sending keepalive packet", peer)
                default:
                        peer.device.PutMessageBuffer(elem.buffer)
                        peer.device.PutOutboundElement(elem)
-                       peer.device.PutOutboundElementsSlice(elems)
+                       peer.device.PutOutboundElementsContainer(elemsContainer)
                }
        }
        peer.SendStagedPackets()
@@ -219,7 +222,7 @@ func (device *Device) RoutineReadFromTUN() {
                readErr     error
                elems       = make([]*QueueOutboundElement, batchSize)
                bufs        = make([][]byte, batchSize)
-               elemsByPeer = make(map[*Peer]*[]*QueueOutboundElement, batchSize)
+               elemsByPeer = make(map[*Peer]*QueueOutboundElementsContainer, batchSize)
                count       = 0
                sizes       = make([]int, batchSize)
                offset      = MessageTransportHeaderSize
@@ -276,10 +279,10 @@ func (device *Device) RoutineReadFromTUN() {
                        }
                        elemsForPeer, ok := elemsByPeer[peer]
                        if !ok {
-                               elemsForPeer = device.GetOutboundElementsSlice()
+                               elemsForPeer = device.GetOutboundElementsContainer()
                                elemsByPeer[peer] = elemsForPeer
                        }
-                       *elemsForPeer = append(*elemsForPeer, elem)
+                       elemsForPeer.elems = append(elemsForPeer.elems, elem)
                        elems[i] = device.NewOutboundElement()
                        bufs[i] = elems[i].buffer[:]
                }
@@ -289,11 +292,11 @@ func (device *Device) RoutineReadFromTUN() {
                                peer.StagePackets(elemsForPeer)
                                peer.SendStagedPackets()
                        } else {
-                               for _, elem := range *elemsForPeer {
+                               for _, elem := range elemsForPeer.elems {
                                        device.PutMessageBuffer(elem.buffer)
                                        device.PutOutboundElement(elem)
                                }
-                               device.PutOutboundElementsSlice(elemsForPeer)
+                               device.PutOutboundElementsContainer(elemsForPeer)
                        }
                        delete(elemsByPeer, peer)
                }
@@ -317,7 +320,7 @@ func (device *Device) RoutineReadFromTUN() {
        }
 }
 
-func (peer *Peer) StagePackets(elems *[]*QueueOutboundElement) {
+func (peer *Peer) StagePackets(elems *QueueOutboundElementsContainer) {
        for {
                select {
                case peer.queue.staged <- elems:
@@ -326,11 +329,11 @@ func (peer *Peer) StagePackets(elems *[]*QueueOutboundElement) {
                }
                select {
                case tooOld := <-peer.queue.staged:
-                       for _, elem := range *tooOld {
+                       for _, elem := range tooOld.elems {
                                peer.device.PutMessageBuffer(elem.buffer)
                                peer.device.PutOutboundElement(elem)
                        }
-                       peer.device.PutOutboundElementsSlice(tooOld)
+                       peer.device.PutOutboundElementsContainer(tooOld)
                default:
                }
        }
@@ -349,52 +352,52 @@ top:
        }
 
        for {
-               var elemsOOO *[]*QueueOutboundElement
+               var elemsContainerOOO *QueueOutboundElementsContainer
                select {
-               case elems := <-peer.queue.staged:
+               case elemsContainer := <-peer.queue.staged:
                        i := 0
-                       for _, elem := range *elems {
+                       for _, elem := range elemsContainer.elems {
                                elem.peer = peer
                                elem.nonce = keypair.sendNonce.Add(1) - 1
                                if elem.nonce >= RejectAfterMessages {
                                        keypair.sendNonce.Store(RejectAfterMessages)
-                                       if elemsOOO == nil {
-                                               elemsOOO = peer.device.GetOutboundElementsSlice()
+                                       if elemsContainerOOO == nil {
+                                               elemsContainerOOO = peer.device.GetOutboundElementsContainer()
                                        }
-                                       *elemsOOO = append(*elemsOOO, elem)
+                                       elemsContainerOOO.elems = append(elemsContainerOOO.elems, elem)
                                        continue
                                } else {
-                                       (*elems)[i] = elem
+                                       elemsContainer.elems[i] = elem
                                        i++
                                }
 
                                elem.keypair = keypair
-                               elem.Lock()
                        }
-                       *elems = (*elems)[:i]
+                       elemsContainer.Lock()
+                       elemsContainer.elems = elemsContainer.elems[:i]
 
-                       if elemsOOO != nil {
-                               peer.StagePackets(elemsOOO) // XXX: Out of order, but we can't front-load go chans
+                       if elemsContainerOOO != nil {
+                               peer.StagePackets(elemsContainerOOO) // XXX: Out of order, but we can't front-load go chans
                        }
 
-                       if len(*elems) == 0 {
-                               peer.device.PutOutboundElementsSlice(elems)
+                       if len(elemsContainer.elems) == 0 {
+                               peer.device.PutOutboundElementsContainer(elemsContainer)
                                goto top
                        }
 
                        // add to parallel and sequential queue
                        if peer.isRunning.Load() {
-                               peer.queue.outbound.c <- elems
-                               peer.device.queue.encryption.c <- elems
+                               peer.queue.outbound.c <- elemsContainer
+                               peer.device.queue.encryption.c <- elemsContainer
                        } else {
-                               for _, elem := range *elems {
+                               for _, elem := range elemsContainer.elems {
                                        peer.device.PutMessageBuffer(elem.buffer)
                                        peer.device.PutOutboundElement(elem)
                                }
-                               peer.device.PutOutboundElementsSlice(elems)
+                               peer.device.PutOutboundElementsContainer(elemsContainer)
                        }
 
-                       if elemsOOO != nil {
+                       if elemsContainerOOO != nil {
                                goto top
                        }
                default:
@@ -406,12 +409,12 @@ top:
 func (peer *Peer) FlushStagedPackets() {
        for {
                select {
-               case elems := <-peer.queue.staged:
-                       for _, elem := range *elems {
+               case elemsContainer := <-peer.queue.staged:
+                       for _, elem := range elemsContainer.elems {
                                peer.device.PutMessageBuffer(elem.buffer)
                                peer.device.PutOutboundElement(elem)
                        }
-                       peer.device.PutOutboundElementsSlice(elems)
+                       peer.device.PutOutboundElementsContainer(elemsContainer)
                default:
                        return
                }
@@ -445,8 +448,8 @@ func (device *Device) RoutineEncryption(id int) {
        defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
        device.log.Verbosef("Routine: encryption worker %d - started", id)
 
-       for elems := range device.queue.encryption.c {
-               for _, elem := range *elems {
+       for elemsContainer := range device.queue.encryption.c {
+               for _, elem := range elemsContainer.elems {
                        // populate header fields
                        header := elem.buffer[:MessageTransportHeaderSize]
 
@@ -471,8 +474,8 @@ func (device *Device) RoutineEncryption(id int) {
                                elem.packet,
                                nil,
                        )
-                       elem.Unlock()
                }
+               elemsContainer.Unlock()
        }
 }
 
@@ -486,9 +489,9 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
 
        bufs := make([][]byte, 0, maxBatchSize)
 
-       for elems := range peer.queue.outbound.c {
+       for elemsContainer := range peer.queue.outbound.c {
                bufs = bufs[:0]
-               if elems == nil {
+               if elemsContainer == nil {
                        return
                }
                if !peer.isRunning.Load() {
@@ -498,16 +501,16 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
                        // The timers and SendBuffers code are resilient to a few stragglers.
                        // TODO: rework peer shutdown order to ensure
                        // that we never accidentally keep timers alive longer than necessary.
-                       for _, elem := range *elems {
-                               elem.Lock()
+                       elemsContainer.Lock()
+                       for _, elem := range elemsContainer.elems {
                                device.PutMessageBuffer(elem.buffer)
                                device.PutOutboundElement(elem)
                        }
                        continue
                }
                dataSent := false
-               for _, elem := range *elems {
-                       elem.Lock()
+               elemsContainer.Lock()
+               for _, elem := range elemsContainer.elems {
                        if len(elem.packet) != MessageKeepaliveSize {
                                dataSent = true
                        }
@@ -521,11 +524,11 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
                if dataSent {
                        peer.timersDataSent()
                }
-               for _, elem := range *elems {
+               for _, elem := range elemsContainer.elems {
                        device.PutMessageBuffer(elem.buffer)
                        device.PutOutboundElement(elem)
                }
-               device.PutOutboundElementsSlice(elems)
+               device.PutOutboundElementsContainer(elemsContainer)
                if err != nil {
                        var errGSO conn.ErrUDPGSODisabled
                        if errors.As(err, &errGSO) {