DeviceRoutineNumberAdditional = 2
)
-var preallocatedBuffers = 0
-
type Device struct {
isUp AtomicBool // device is (going) up
isClosed AtomicBool // device is closed? (acting as guard)
}
pool struct {
- messageBuffers *sync.Pool
- reuseChan chan interface{}
+ messageBufferPool *sync.Pool
+ messageBufferReuseChan chan *[MaxMessageSize]byte
+ inboundElementPool *sync.Pool
+ inboundElementReuseChan chan *QueueInboundElement
+ outboundElementPool *sync.Pool
+ outboundElementReuseChan chan *QueueOutboundElement
}
queue struct {
return nil
}
-func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {
- if preallocatedBuffers == 0 {
- return device.pool.messageBuffers.Get().(*[MaxMessageSize]byte)
- } else {
- return (<-device.pool.reuseChan).(*[MaxMessageSize]byte)
- }
-}
-
-func (device *Device) PutMessageBuffer(msg *[MaxMessageSize]byte) {
- if preallocatedBuffers == 0 {
- device.pool.messageBuffers.Put(msg)
- } else {
- device.pool.reuseChan <- msg
- }
-}
-
func NewDevice(tunDevice tun.TUNDevice, logger *Logger) *Device {
device := new(Device)
device.indexTable.Init()
device.allowedips.Reset()
- if preallocatedBuffers == 0 {
- device.pool.messageBuffers = &sync.Pool{
- New: func() interface{} {
- return new([MaxMessageSize]byte)
- },
- }
- } else {
- device.pool.reuseChan = make(chan interface{}, preallocatedBuffers)
- for i := 0; i < preallocatedBuffers; i += 1 {
- device.pool.reuseChan <- new([MaxMessageSize]byte)
- }
- }
+ device.PopulatePools()
// create queues
--- /dev/null
+/* SPDX-License-Identifier: GPL-2.0
+ *
+ * Copyright (C) 2017-2018 WireGuard LLC. All Rights Reserved.
+ */
+
+package main
+
+import "sync"
+
+var preallocatedBuffers = 0
+
+func (device *Device) PopulatePools() {
+ if preallocatedBuffers == 0 {
+ device.pool.messageBufferPool = &sync.Pool{
+ New: func() interface{} {
+ return new([MaxMessageSize]byte)
+ },
+ }
+ device.pool.inboundElementPool = &sync.Pool{
+ New: func() interface{} {
+ return new(QueueInboundElement)
+ },
+ }
+ device.pool.outboundElementPool = &sync.Pool{
+ New: func() interface{} {
+ return new(QueueOutboundElement)
+ },
+ }
+ } else {
+ device.pool.messageBufferReuseChan = make(chan *[MaxMessageSize]byte, preallocatedBuffers)
+ for i := 0; i < preallocatedBuffers; i += 1 {
+ device.pool.messageBufferReuseChan <- new([MaxMessageSize]byte)
+ }
+ device.pool.inboundElementReuseChan = make(chan *QueueInboundElement, preallocatedBuffers)
+ for i := 0; i < preallocatedBuffers; i += 1 {
+ device.pool.inboundElementReuseChan <- new(QueueInboundElement)
+ }
+ device.pool.outboundElementReuseChan = make(chan *QueueOutboundElement, preallocatedBuffers)
+ for i := 0; i < preallocatedBuffers; i += 1 {
+ device.pool.outboundElementReuseChan <- new(QueueOutboundElement)
+ }
+ }
+}
+
+func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {
+ if preallocatedBuffers == 0 {
+ return device.pool.messageBufferPool.Get().(*[MaxMessageSize]byte)
+ } else {
+ return <-device.pool.messageBufferReuseChan
+ }
+}
+
+func (device *Device) PutMessageBuffer(msg *[MaxMessageSize]byte) {
+ if preallocatedBuffers == 0 {
+ device.pool.messageBufferPool.Put(msg)
+ } else {
+ device.pool.messageBufferReuseChan <- msg
+ }
+}
+
+func (device *Device) GetInboundElement() *QueueInboundElement {
+ if preallocatedBuffers == 0 {
+ return device.pool.inboundElementPool.Get().(*QueueInboundElement)
+ } else {
+ return <-device.pool.inboundElementReuseChan
+ }
+}
+
+func (device *Device) PutInboundElement(msg *QueueInboundElement) {
+ if preallocatedBuffers == 0 {
+ device.pool.inboundElementPool.Put(msg)
+ } else {
+ device.pool.inboundElementReuseChan <- msg
+ }
+}
+
+func (device *Device) GetOutboundElement() *QueueOutboundElement {
+ if preallocatedBuffers == 0 {
+ return device.pool.outboundElementPool.Get().(*QueueOutboundElement)
+ } else {
+ return <-device.pool.outboundElementReuseChan
+ }
+}
+
+func (device *Device) PutOutboundElement(msg *QueueOutboundElement) {
+ if preallocatedBuffers == 0 {
+ device.pool.outboundElementPool.Put(msg)
+ } else {
+ device.pool.outboundElementReuseChan <- msg
+ }
+}
return false
}
default:
+ device.PutInboundElement(element)
return false
}
}
}
// create work element
-
peer := value.peer
- elem := &QueueInboundElement{
- packet: packet,
- buffer: buffer,
- keypair: keypair,
- dropped: AtomicFalse,
- endpoint: endpoint,
- }
+ elem := device.GetInboundElement()
+ elem.packet = packet
+ elem.buffer = buffer
+ elem.keypair = keypair
+ elem.dropped = AtomicFalse
+ elem.endpoint = endpoint
+ elem.counter = 0
+ elem.mutex = sync.Mutex{}
elem.mutex.Lock()
// add to decryption queues
// check if dropped
if elem.IsDropped() {
+ device.PutInboundElement(elem)
continue
}
elem.Drop()
device.PutMessageBuffer(elem.buffer)
elem.buffer = nil
- elem.mutex.Unlock()
}
elem.mutex.Unlock()
}
logDebug := device.log.Debug
var elem *QueueInboundElement
+ var ok bool
defer func() {
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
peer.routines.stopping.Done()
- if elem != nil && elem.buffer != nil {
- device.PutMessageBuffer(elem.buffer)
+ if elem != nil {
+ if elem.buffer != nil {
+ device.PutMessageBuffer(elem.buffer)
+ }
+ device.PutInboundElement(elem)
}
}()
peer.routines.starting.Done()
for {
- if elem != nil && elem.buffer != nil {
- device.PutMessageBuffer(elem.buffer)
+ if elem != nil {
+ if elem.buffer != nil {
+ device.PutMessageBuffer(elem.buffer)
+ }
+ device.PutInboundElement(elem)
}
select {
case <-peer.routines.stop:
return
- case elem, ok := <-peer.queue.inbound:
+ case elem, ok = <-peer.queue.inbound:
if !ok {
return
offset := MessageTransportOffsetContent
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
- _, err := device.tun.device.Write(
- elem.buffer[:offset+len(elem.packet)],
- offset)
+ _, err := device.tun.device.Write(elem.buffer[:offset+len(elem.packet)], offset)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)
}
}
func (device *Device) NewOutboundElement() *QueueOutboundElement {
- return &QueueOutboundElement{
- dropped: AtomicFalse,
- buffer: device.GetMessageBuffer(),
- }
+ elem := device.GetOutboundElement()
+ elem.dropped = AtomicFalse
+ elem.buffer = device.GetMessageBuffer()
+ elem.mutex = sync.Mutex{}
+ elem.nonce = 0
+ elem.keypair = nil
+ elem.peer = nil
+ return elem
}
func (elem *QueueOutboundElement) Drop() {
select {
case old := <-queue:
device.PutMessageBuffer(old.buffer)
+ device.PutOutboundElement(old)
default:
}
}
}
default:
element.peer.device.PutMessageBuffer(element.buffer)
+ element.peer.device.PutOutboundElement(element)
}
}
return true
default:
peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
return false
}
}
*/
func (device *Device) RoutineReadFromTUN() {
- elem := device.NewOutboundElement()
-
logDebug := device.log.Debug
logError := device.log.Error
logDebug.Println("Routine: TUN reader - started")
device.state.starting.Done()
+ var elem *QueueOutboundElement
+
for {
+ if elem != nil {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ }
+ elem = device.NewOutboundElement()
// read packet
device.Close()
}
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
return
}
peer.SendHandshakeInitiation(false)
}
addToNonceQueue(peer.queue.nonce, elem, device)
- elem = device.NewOutboundElement()
+ elem = nil
}
}
}
select {
case elem := <-peer.queue.nonce:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
default:
return
}
case <-peer.signals.flushNonceQueue:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
flush()
goto NextPacket
case <-peer.routines.stop:
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
return
}
}
if elem.nonce >= RejectAfterMessages {
atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages)
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
goto NextPacket
}
// check if dropped
if elem.IsDropped() {
+ device.PutOutboundElement(elem)
continue
}
elem.mutex.Lock()
if elem.IsDropped() {
+ device.PutOutboundElement(elem)
continue
}
length := uint64(len(elem.packet))
err := peer.SendBuffer(elem.packet)
device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
if err != nil {
logError.Println(peer, "- Failed to send data packet", err)
continue