]> git.ipfire.org Git - thirdparty/wireguard-go.git/commitdiff
Improved timer state machine
authorMathias Hall-Andersen <mathias@hall-andersen.dk>
Sat, 8 Jul 2017 21:51:26 +0000 (23:51 +0200)
committerMathias Hall-Andersen <mathias@hall-andersen.dk>
Sat, 8 Jul 2017 21:51:26 +0000 (23:51 +0200)
src/constants.go
src/device.go
src/handshake.go [deleted file]
src/keypair.go
src/misc.go
src/noise_protocol.go
src/peer.go
src/receive.go
src/send.go
src/timers.go [new file with mode: 0644]

index 6fbb7a0cfec35cbb7a256df683c919eaab81fac4..4e8d52133bdc1993cd231ed2d0bf7a47e6fbec4a 100644 (file)
@@ -16,6 +16,10 @@ const (
        MaxHandshakeAttemptTime = time.Second * 90
 )
 
+const (
+       RekeyAfterTimeReceiving = RekeyAfterTime - KeepaliveTimeout - RekeyTimeout
+)
+
 const (
        QueueOutboundSize      = 1024
        QueueInboundSize       = 1024
index 0564068daba79151feb13b8d09baa110ad27dca2..12d1ed9afdeabaeb985c72916326aea49bb0866e 100644 (file)
@@ -31,16 +31,11 @@ type Device struct {
        signal struct {
                stop chan struct{}
        }
-       congestionState int32 // used as an atomic bool
-       peers           map[NoisePublicKey]*Peer
-       mac             MACStateDevice
+       underLoad int32 // used as an atomic bool
+       peers     map[NoisePublicKey]*Peer
+       mac       MACStateDevice
 }
 
-const (
-       CongestionStateUnderLoad = iota
-       CongestionStateOkay
-)
-
 func (device *Device) SetPrivateKey(sk NoisePrivateKey) {
        device.mutex.Lock()
        defer device.mutex.Unlock()
@@ -99,10 +94,12 @@ func NewDevice(tun TUNDevice, logLevel int) *Device {
                go device.RoutineDecryption()
                go device.RoutineHandshake()
        }
+
        go device.RoutineBusyMonitor()
        go device.RoutineReadFromTUN(tun)
        go device.RoutineReceiveIncomming()
        go device.RoutineWriteToTUN(tun)
+
        return device
 }
 
diff --git a/src/handshake.go b/src/handshake.go
deleted file mode 100644 (file)
index de607df..0000000
+++ /dev/null
@@ -1,153 +0,0 @@
-package main
-
-import (
-       "bytes"
-       "encoding/binary"
-       "sync/atomic"
-       "time"
-)
-
-/* Sends a keep-alive if no packets queued for peer
- *
- * Used by initiator of handshake and with active keep-alive
- */
-func (peer *Peer) SendKeepAlive() bool {
-       elem := peer.device.NewOutboundElement()
-       elem.packet = nil
-       if len(peer.queue.nonce) == 0 {
-               select {
-               case peer.queue.nonce <- elem:
-                       return true
-               default:
-                       return false
-               }
-       }
-       return true
-}
-
-/* Called when a new authenticated message has been send
- *
- * TODO: This might be done in a faster way
- */
-func (peer *Peer) KeepKeyFreshSending() {
-       send := func() bool {
-               peer.keyPairs.mutex.RLock()
-               defer peer.keyPairs.mutex.RUnlock()
-
-               kp := peer.keyPairs.current
-               if kp == nil {
-                       return false
-               }
-
-               if !kp.isInitiator {
-                       return false
-               }
-
-               nonce := atomic.LoadUint64(&kp.sendNonce)
-               if nonce > RekeyAfterMessages {
-                       return true
-               }
-               return time.Now().Sub(kp.created) > RekeyAfterTime
-       }()
-       if send {
-               sendSignal(peer.signal.handshakeBegin)
-       }
-}
-
-/* This is the state machine for handshake initiation
- *
- * Associated with this routine is the signal "handshakeBegin"
- * The routine will read from the "handshakeBegin" channel
- * at most every RekeyTimeout seconds
- */
-func (peer *Peer) RoutineHandshakeInitiator() {
-       device := peer.device
-       logger := device.log.Debug
-       timeout := stoppedTimer()
-
-       var elem *QueueOutboundElement
-
-       logger.Println("Routine, handshake initator, started for peer", peer.id)
-
-       func() {
-               for {
-                       var attempts uint
-                       var deadline time.Time
-
-                       // wait for signal
-
-                       select {
-                       case <-peer.signal.handshakeBegin:
-                       case <-peer.signal.stop:
-                               return
-                       }
-
-               HandshakeLoop:
-                       for {
-                               // clear completed signal
-
-                               select {
-                               case <-peer.signal.handshakeCompleted:
-                               case <-peer.signal.stop:
-                                       return
-                               default:
-                               }
-
-                               // create initiation
-
-                               if elem != nil {
-                                       elem.Drop()
-                               }
-                               elem = device.NewOutboundElement()
-
-                               msg, err := device.CreateMessageInitiation(peer)
-                               if err != nil {
-                                       device.log.Error.Println("Failed to create initiation message:", err)
-                                       break
-                               }
-
-                               // marshal & schedule for sending
-
-                               writer := bytes.NewBuffer(elem.data[:0])
-                               binary.Write(writer, binary.LittleEndian, msg)
-                               elem.packet = writer.Bytes()
-                               peer.mac.AddMacs(elem.packet)
-                               addToOutboundQueue(peer.queue.outbound, elem)
-
-                               if attempts == 0 {
-                                       deadline = time.Now().Add(MaxHandshakeAttemptTime)
-                               }
-
-                               // set timeout
-
-                               attempts += 1
-                               stopTimer(timeout)
-                               timeout.Reset(RekeyTimeout)
-                               device.log.Debug.Println("Handshake initiation attempt", attempts, "queued for peer", peer.id)
-
-                               // wait for handshake or timeout
-
-                               select {
-                               case <-peer.signal.stop:
-                                       return
-
-                               case <-peer.signal.handshakeCompleted:
-                                       device.log.Debug.Println("Handshake complete")
-                                       break HandshakeLoop
-
-                               case <-timeout.C:
-                                       device.log.Debug.Println("Timeout")
-                                       if deadline.Before(time.Now().Add(RekeyTimeout)) {
-                                               peer.signal.flushNonceQueue <- struct{}{}
-                                               if !peer.timer.sendKeepalive.Stop() {
-                                                       <-peer.timer.sendKeepalive.C
-                                               }
-                                               break HandshakeLoop
-                                       }
-                               }
-                       }
-               }
-       }()
-
-       logger.Println("Routine, handshake initator, stopped for peer", peer.id)
-}
index 3caa0c833262dabd599a08dd0b1dbfe0b05c4781..b24dbe4fcef2725cb33406e8961c3f61a19d34a1 100644 (file)
@@ -23,19 +23,6 @@ type KeyPairs struct {
        next     *KeyPair // not yet "confirmed by transport"
 }
 
-/* Called during recieving to confirm the handshake
- * was completed correctly
- */
-func (kp *KeyPairs) Used(key *KeyPair) {
-       if key == kp.next {
-               kp.mutex.Lock()
-               kp.previous = kp.current
-               kp.current = key
-               kp.next = nil
-               kp.mutex.Unlock()
-       }
-}
-
 func (kp *KeyPairs) Current() *KeyPair {
        kp.mutex.RLock()
        defer kp.mutex.RUnlock()
index dd4fa637e14223a6fb63de28e1e9ec22b4e217eb..75561b2ff63db82b19710eb585d11408d5644bf0 100644 (file)
@@ -4,6 +4,14 @@ import (
        "time"
 )
 
+/* We use int32 as atomic bools
+ * (since booleans are not natively supported by sync/atomic)
+ */
+const (
+       AtomicFalse = iota
+       AtomicTrue
+)
+
 func min(a uint, b uint) uint {
        if a > b {
                return b
@@ -11,14 +19,21 @@ func min(a uint, b uint) uint {
        return a
 }
 
-func sendSignal(c chan struct{}) {
+func signalSend(c chan struct{}) {
        select {
        case c <- struct{}{}:
        default:
        }
 }
 
-func stopTimer(timer *time.Timer) {
+func signalClear(c chan struct{}) {
+       select {
+       case <-c:
+       default:
+       }
+}
+
+func timerStop(timer *time.Timer) {
        if !timer.Stop() {
                select {
                case <-timer.C:
@@ -27,8 +42,8 @@ func stopTimer(timer *time.Timer) {
        }
 }
 
-func stoppedTimer() *time.Timer {
+func NewStoppedTimer() *time.Timer {
        timer := time.NewTimer(time.Hour)
-       stopTimer(timer)
+       timerStop(timer)
        return timer
 }
index 9a9d9187ec3b04c01a22149ca65cb4d6dc56a0cf..a90fe4cc5a3a376a714f6619a2bcb8741accef33 100644 (file)
@@ -478,7 +478,7 @@ func (peer *Peer) NewKeyPair() *KeyPair {
                        }
                        kp.previous = kp.current
                        kp.current = keyPair
-                       sendSignal(peer.signal.newKeyPair)
+                       signalSend(peer.signal.newKeyPair)
                } else {
                        kp.next = keyPair
                }
index fadc43f5ea99d0a3f4d2882c1df934194d27de82..c8dc5c0d7e83bd9b499a7e96906a7f6c0cc2e0de 100644 (file)
@@ -20,25 +20,36 @@ type Peer struct {
        txBytes                     uint64
        rxBytes                     uint64
        time                        struct {
+               mutex         sync.RWMutex
                lastSend      time.Time // last send message
                lastHandshake time.Time // last completed handshake
+               nextKeepalive time.Time
        }
        signal struct {
                newKeyPair         chan struct{} // (size 1) : a new key pair was generated
                handshakeBegin     chan struct{} // (size 1) : request that a new handshake be started ("queue handshake")
                handshakeCompleted chan struct{} // (size 1) : handshake completed
                flushNonceQueue    chan struct{} // (size 1) : empty queued packets
+               messageSend        chan struct{} // (size 1) : a message was send to the peer
+               messageReceived    chan struct{} // (size 1) : an authenticated message was received
                stop               chan struct{} // (size 0) : close to stop all goroutines for peer
        }
        timer struct {
-               sendKeepalive    *time.Timer
-               handshakeTimeout *time.Timer
+               /* Both keep-alive timers acts as one (see timers.go)
+                * They are kept seperate to simplify the implementation.
+                */
+               keepalivePersistent      *time.Timer // set for persistent keepalives
+               keepaliveAcknowledgement *time.Timer // set upon recieving messages
+               zeroAllKeys              *time.Timer // zero all key material after RejectAfterTime*3
        }
        queue struct {
                nonce    chan *QueueOutboundElement // nonce / pre-handshake queue
                outbound chan *QueueOutboundElement // sequential ordering of work
                inbound  chan *QueueInboundElement  // sequential ordering of work
        }
+       flags struct {
+               keepaliveWaiting int32
+       }
        mac MACStatePeer
 }
 
@@ -51,7 +62,12 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
 
        peer.mac.Init(pk)
        peer.device = device
-       peer.timer.sendKeepalive = stoppedTimer()
+
+       peer.timer.keepalivePersistent = NewStoppedTimer()
+       peer.timer.keepaliveAcknowledgement = NewStoppedTimer()
+       peer.timer.zeroAllKeys = NewStoppedTimer()
+
+       peer.flags.keepaliveWaiting = AtomicFalse
 
        // assign id for debugging
 
@@ -82,7 +98,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
        peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
        peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
 
-       // prepare signaling
+       // prepare signaling & routines
 
        peer.signal.stop = make(chan struct{})
        peer.signal.newKeyPair = make(chan struct{}, 1)
@@ -90,9 +106,8 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
        peer.signal.handshakeCompleted = make(chan struct{}, 1)
        peer.signal.flushNonceQueue = make(chan struct{}, 1)
 
-       // outbound pipeline
-
        go peer.RoutineNonce()
+       go peer.RoutineTimerHandler()
        go peer.RoutineHandshakeInitiator()
        go peer.RoutineSequentialSender()
        go peer.RoutineSequentialReceiver()
index c788dcf99b596e2251f671d4d6d191d5bc6177c8..e780c66fc7d0afaf4aac5fb9b3e2ec32a3c73e12 100644 (file)
@@ -10,11 +10,6 @@ import (
        "time"
 )
 
-const (
-       ElementStateOkay = iota
-       ElementStateDropped
-)
-
 type QueueHandshakeElement struct {
        msgType uint32
        packet  []byte
@@ -22,7 +17,7 @@ type QueueHandshakeElement struct {
 }
 
 type QueueInboundElement struct {
-       state   uint32
+       dropped int32
        mutex   sync.Mutex
        packet  []byte
        counter uint64
@@ -30,11 +25,11 @@ type QueueInboundElement struct {
 }
 
 func (elem *QueueInboundElement) Drop() {
-       atomic.StoreUint32(&elem.state, ElementStateDropped)
+       atomic.StoreInt32(&elem.dropped, AtomicTrue)
 }
 
 func (elem *QueueInboundElement) IsDropped() bool {
-       return atomic.LoadUint32(&elem.state) == ElementStateDropped
+       return atomic.LoadInt32(&elem.dropped) == AtomicTrue
 }
 
 func addToInboundQueue(
@@ -101,9 +96,9 @@ func (device *Device) RoutineBusyMonitor() {
                // update busy state
 
                if busy {
-                       atomic.StoreInt32(&device.congestionState, CongestionStateUnderLoad)
+                       atomic.StoreInt32(&device.underLoad, AtomicTrue)
                } else {
-                       atomic.StoreInt32(&device.congestionState, CongestionStateOkay)
+                       atomic.StoreInt32(&device.underLoad, AtomicFalse)
                }
 
                timer.Reset(interval)
@@ -216,7 +211,7 @@ func (device *Device) RoutineReceiveIncomming() {
                                work := new(QueueInboundElement)
                                work.packet = packet
                                work.keyPair = keyPair
-                               work.state = ElementStateOkay
+                               work.dropped = AtomicFalse
                                work.mutex.Lock()
 
                                // add to decryption queues
@@ -303,7 +298,7 @@ func (device *Device) RoutineHandshake() {
 
                        // verify mac2
 
-                       busy := atomic.LoadInt32(&device.congestionState) == CongestionStateUnderLoad
+                       busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue
 
                        if busy && !device.mac.CheckMAC2(elem.packet, elem.source) {
                                sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type"
@@ -397,13 +392,12 @@ func (device *Device) RoutineHandshake() {
                                        )
                                        return
                                }
-                               sendSignal(peer.signal.handshakeCompleted)
-                               logDebug.Println("Recieved valid response message for peer", peer.id)
                                kp := peer.NewKeyPair()
                                if kp == nil {
                                        logDebug.Println("Failed to derieve key-pair")
                                }
                                peer.SendKeepAlive()
+                               peer.EventHandshakeComplete()
 
                        default:
                                device.log.Error.Println("Invalid message type in handshake queue")
@@ -438,9 +432,25 @@ func (peer *Peer) RoutineSequentialReceiver() {
 
                        // check for replay
 
-                       // update timers
+                       // time (passive) keep-alive
+
+                       peer.TimerStartKeepalive()
+
+                       // refresh key material (rekey)
 
-                       // refresh key material
+                       peer.KeepKeyFreshReceiving()
+
+                       // check if confirming handshake
+
+                       kp := &peer.keyPairs
+                       kp.mutex.Lock()
+                       if kp.next == elem.keyPair {
+                               peer.EventHandshakeComplete()
+                               kp.previous = kp.current
+                               kp.current = kp.next
+                               kp.next = nil
+                       }
+                       kp.mutex.Unlock()
 
                        // check for keep-alive
 
@@ -491,7 +501,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
                                }
 
                        default:
-                               device.log.Debug.Println("Receieved packet with unknown IP version")
+                               logDebug.Println("Receieved packet with unknown IP version")
                                return
                        }
 
index a02f5cb3b6b9cb38a2add51e869d14dc5252e023..5ea9a8f8920a853a68fde7693473d90c3c4ef2e2 100644 (file)
@@ -31,7 +31,7 @@ import (
  * (to allow the construction of transport messages in-place)
  */
 type QueueOutboundElement struct {
-       state   uint32
+       dropped int32
        mutex   sync.Mutex
        data    [MaxMessageSize]byte
        packet  []byte   // slice of "data" (always!)
@@ -61,11 +61,11 @@ func (device *Device) NewOutboundElement() *QueueOutboundElement {
 }
 
 func (elem *QueueOutboundElement) Drop() {
-       atomic.StoreUint32(&elem.state, ElementStateDropped)
+       atomic.StoreInt32(&elem.dropped, AtomicTrue)
 }
 
 func (elem *QueueOutboundElement) IsDropped() bool {
-       return atomic.LoadUint32(&elem.state) == ElementStateDropped
+       return atomic.LoadInt32(&elem.dropped) == AtomicTrue
 }
 
 func addToOutboundQueue(
@@ -86,6 +86,25 @@ func addToOutboundQueue(
        }
 }
 
+func addToEncryptionQueue(
+       queue chan *QueueOutboundElement,
+       element *QueueOutboundElement,
+) {
+       for {
+               select {
+               case queue <- element:
+                       return
+               default:
+                       select {
+                       case old := <-queue:
+                               old.Drop()
+                               old.mutex.Unlock()
+                       default:
+                       }
+               }
+       }
+}
+
 /* Reads packets from the TUN and inserts
  * into nonce queue for peer
  *
@@ -196,9 +215,7 @@ func (peer *Peer) RoutineNonce() {
                                                break
                                        }
                                }
-                               logDebug.Println("Key pair:", keyPair)
-
-                               sendSignal(peer.signal.handshakeBegin)
+                               signalSend(peer.signal.handshakeBegin)
                                logDebug.Println("Waiting for key-pair, peer", peer.id)
 
                                select {
@@ -225,12 +242,13 @@ func (peer *Peer) RoutineNonce() {
 
                                elem.keyPair = keyPair
                                elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
+                               elem.dropped = AtomicFalse
                                elem.peer = peer
                                elem.mutex.Lock()
 
-                               // add to parallel processing and sequential consuming queue
+                               // add to parallel and sequential queue
 
-                               addToOutboundQueue(device.queue.encryption, elem)
+                               addToEncryptionQueue(device.queue.encryption, elem)
                                addToOutboundQueue(peer.queue.outbound, elem)
                                elem = nil
                        }
@@ -246,6 +264,9 @@ func (peer *Peer) RoutineNonce() {
 func (device *Device) RoutineEncryption() {
        var nonce [chacha20poly1305.NonceSize]byte
        for work := range device.queue.encryption {
+
+               // check if dropped
+
                if work.IsDropped() {
                        continue
                }
@@ -289,25 +310,25 @@ func (device *Device) RoutineEncryption() {
  * The routine terminates then the outbound queue is closed.
  */
 func (peer *Peer) RoutineSequentialSender() {
-       logDebug := peer.device.log.Debug
-       logDebug.Println("Routine, sequential sender, started for peer", peer.id)
-
        device := peer.device
 
+       logDebug := device.log.Debug
+       logDebug.Println("Routine, sequential sender, started for peer", peer.id)
+
        for {
                select {
                case <-peer.signal.stop:
                        logDebug.Println("Routine, sequential sender, stopped for peer", peer.id)
                        return
                case work := <-peer.queue.outbound:
+                       work.mutex.Lock()
                        if work.IsDropped() {
                                continue
                        }
-                       work.mutex.Lock()
+
                        func() {
-                               if work.packet == nil {
-                                       return
-                               }
+
+                               // send to endpoint
 
                                peer.mutex.RLock()
                                defer peer.mutex.RUnlock()
@@ -331,12 +352,9 @@ func (peer *Peer) RoutineSequentialSender() {
                                }
                                atomic.AddUint64(&peer.txBytes, uint64(len(work.packet)))
 
-                               // shift keep-alive timer
+                               // reset keep-alive (passive keep-alives / acknowledgements)
 
-                               if peer.persistentKeepaliveInterval != 0 {
-                                       interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second
-                                       peer.timer.sendKeepalive.Reset(interval)
-                               }
+                               peer.TimerResetKeepalive()
                        }()
                }
        }
diff --git a/src/timers.go b/src/timers.go
new file mode 100644 (file)
index 0000000..26926c2
--- /dev/null
@@ -0,0 +1,303 @@
+package main
+
+import (
+       "bytes"
+       "encoding/binary"
+       "golang.org/x/crypto/blake2s"
+       "sync/atomic"
+       "time"
+)
+
+/* Called when a new authenticated message has been send
+ *
+ */
+func (peer *Peer) KeepKeyFreshSending() {
+       send := func() bool {
+               peer.keyPairs.mutex.RLock()
+               defer peer.keyPairs.mutex.RUnlock()
+
+               kp := peer.keyPairs.current
+               if kp == nil {
+                       return false
+               }
+
+               if !kp.isInitiator {
+                       return false
+               }
+
+               nonce := atomic.LoadUint64(&kp.sendNonce)
+               return nonce > RekeyAfterMessages || time.Now().Sub(kp.created) > RekeyAfterTime
+       }()
+       if send {
+               signalSend(peer.signal.handshakeBegin)
+       }
+}
+
+/* Called when a new authenticated message has been recevied
+ *
+ */
+func (peer *Peer) KeepKeyFreshReceiving() {
+       send := func() bool {
+               peer.keyPairs.mutex.RLock()
+               defer peer.keyPairs.mutex.RUnlock()
+
+               kp := peer.keyPairs.current
+               if kp == nil {
+                       return false
+               }
+
+               if !kp.isInitiator {
+                       return false
+               }
+
+               nonce := atomic.LoadUint64(&kp.sendNonce)
+               return nonce > RekeyAfterMessages || time.Now().Sub(kp.created) > RekeyAfterTimeReceiving
+       }()
+       if send {
+               signalSend(peer.signal.handshakeBegin)
+       }
+}
+
+/* Called after succesfully completing a handshake.
+ * i.e. after:
+ * - Valid handshake response
+ * - First transport message under the "next" key
+ */
+func (peer *Peer) EventHandshakeComplete() {
+       peer.device.log.Debug.Println("Handshake completed")
+       peer.timer.zeroAllKeys.Reset(RejectAfterTime * 3)
+       signalSend(peer.signal.handshakeCompleted)
+}
+
+/* Queues a keep-alive if no packets are queued for peer
+ */
+func (peer *Peer) SendKeepAlive() bool {
+       elem := peer.device.NewOutboundElement()
+       elem.packet = nil
+       if len(peer.queue.nonce) == 0 {
+               select {
+               case peer.queue.nonce <- elem:
+                       return true
+               default:
+                       return false
+               }
+       }
+       return true
+}
+
+/* Starts the "keep-alive" timer
+ * (if not already running),
+ * in response to incomming messages
+ */
+func (peer *Peer) TimerStartKeepalive() {
+
+       // check if acknowledgement timer set yet
+
+       var waiting int32 = AtomicTrue
+       waiting = atomic.SwapInt32(&peer.flags.keepaliveWaiting, waiting)
+       if waiting == AtomicTrue {
+               return
+       }
+
+       // timer not yet set, start it
+
+       wait := KeepaliveTimeout
+       interval := atomic.LoadUint64(&peer.persistentKeepaliveInterval)
+       if interval > 0 {
+               duration := time.Duration(interval) * time.Second
+               if duration < wait {
+                       wait = duration
+               }
+       }
+}
+
+/* Resets both keep-alive timers
+ */
+func (peer *Peer) TimerResetKeepalive() {
+
+       // reset persistent timer
+
+       interval := atomic.LoadUint64(&peer.persistentKeepaliveInterval)
+       if interval > 0 {
+               peer.timer.keepalivePersistent.Reset(
+                       time.Duration(interval) * time.Second,
+               )
+       }
+
+       // stop acknowledgement timer
+
+       timerStop(peer.timer.keepaliveAcknowledgement)
+       atomic.StoreInt32(&peer.flags.keepaliveWaiting, AtomicFalse)
+}
+
+func (peer *Peer) BeginHandshakeInitiation() (*QueueOutboundElement, error) {
+
+       // create initiation
+
+       elem := peer.device.NewOutboundElement()
+       msg, err := peer.device.CreateMessageInitiation(peer)
+       if err != nil {
+               return nil, err
+       }
+
+       // marshal & schedule for sending
+
+       writer := bytes.NewBuffer(elem.data[:0])
+       binary.Write(writer, binary.LittleEndian, msg)
+       elem.packet = writer.Bytes()
+       peer.mac.AddMacs(elem.packet)
+       addToOutboundQueue(peer.queue.outbound, elem)
+       return elem, err
+}
+
+func (peer *Peer) RoutineTimerHandler() {
+       device := peer.device
+
+       logDebug := device.log.Debug
+       logDebug.Println("Routine, timer handler, started for peer", peer.id)
+
+       for {
+               select {
+
+               case <-peer.signal.stop:
+                       return
+
+               // keep-alives
+
+               case <-peer.timer.keepalivePersistent.C:
+
+                       logDebug.Println("Sending persistent keep-alive to peer", peer.id)
+
+                       peer.SendKeepAlive()
+                       peer.TimerResetKeepalive()
+
+               case <-peer.timer.keepaliveAcknowledgement.C:
+
+                       logDebug.Println("Sending passive persistent keep-alive to peer", peer.id)
+
+                       peer.SendKeepAlive()
+                       peer.TimerResetKeepalive()
+
+               // clear key material
+
+               case <-peer.timer.zeroAllKeys.C:
+
+                       logDebug.Println("Clearing all key material for peer", peer.id)
+
+                       // zero out key pairs
+
+                       func() {
+                               kp := &peer.keyPairs
+                               kp.mutex.Lock()
+                               // best we can do is wait for GC :( ?
+                               kp.current = nil
+                               kp.previous = nil
+                               kp.next = nil
+                               kp.mutex.Unlock()
+                       }()
+
+                       // zero out handshake
+
+                       func() {
+                               hs := &peer.handshake
+                               hs.mutex.Lock()
+                               hs.localEphemeral = NoisePrivateKey{}
+                               hs.remoteEphemeral = NoisePublicKey{}
+                               hs.chainKey = [blake2s.Size]byte{}
+                               hs.hash = [blake2s.Size]byte{}
+                               hs.mutex.Unlock()
+                       }()
+               }
+       }
+}
+
+/* This is the state machine for handshake initiation
+ *
+ * Associated with this routine is the signal "handshakeBegin"
+ * The routine will read from the "handshakeBegin" channel
+ * at most every RekeyTimeout seconds
+ */
+func (peer *Peer) RoutineHandshakeInitiator() {
+       device := peer.device
+
+       var elem *QueueOutboundElement
+
+       logError := device.log.Error
+       logDebug := device.log.Debug
+       logDebug.Println("Routine, handshake initator, started for peer", peer.id)
+
+       for run := true; run; {
+               var err error
+               var attempts uint
+               var deadline time.Time
+
+               // wait for signal
+
+               select {
+               case <-peer.signal.handshakeBegin:
+               case <-peer.signal.stop:
+                       return
+               }
+
+               // wait for handshake
+
+               run = func() bool {
+                       for {
+                               // clear completed signal
+
+                               select {
+                               case <-peer.signal.handshakeCompleted:
+                               case <-peer.signal.stop:
+                                       return false
+                               default:
+                               }
+
+                               // create initiation
+
+                               if elem != nil {
+                                       elem.Drop()
+                               }
+                               elem, err = peer.BeginHandshakeInitiation()
+                               if err != nil {
+                                       logError.Println("Failed to create initiation message:", err)
+                                       break
+                               }
+
+                               // set timeout
+
+                               attempts += 1
+                               if attempts == 1 {
+                                       deadline = time.Now().Add(MaxHandshakeAttemptTime)
+                               }
+                               timeout := time.NewTimer(RekeyTimeout)
+                               logDebug.Println("Handshake initiation attempt", attempts, "queued for peer", peer.id)
+
+                               // wait for handshake or timeout
+
+                               select {
+                               case <-peer.signal.stop:
+                                       return true
+
+                               case <-peer.signal.handshakeCompleted:
+                                       <-timeout.C
+                                       return true
+
+                               case <-timeout.C:
+                                       logDebug.Println("Timeout")
+
+                                       // check if sufficient time for retry
+
+                                       if deadline.Before(time.Now().Add(RekeyTimeout)) {
+                                               signalSend(peer.signal.flushNonceQueue)
+                                               timerStop(peer.timer.keepalivePersistent)
+                                               timerStop(peer.timer.keepaliveAcknowledgement)
+                                               return true
+                                       }
+                               }
+                       }
+                       return true
+               }()
+
+               signalClear(peer.signal.handshakeBegin)
+       }
+}