return err
}
-func updateBind(device *Device) error {
- device.mutex.Lock()
- defer device.mutex.Unlock()
-
- netc := &device.net
- netc.mutex.Lock()
- defer netc.mutex.Unlock()
+/* Must hold device and net lock
+ */
+func unsafeUpdateBind(device *Device) error {
// close existing sockets
// bind to new port
var err error
+ netc := &device.net
netc.bind, netc.port, err = CreateBind(netc.port)
if err != nil {
netc.bind = nil
package main
import (
+ "github.com/sasha-s/go-deadlock"
"runtime"
"sync"
"sync/atomic"
messageBuffers sync.Pool
}
net struct {
- mutex sync.RWMutex
+ mutex deadlock.RWMutex
bind Bind // bind interface
port uint16 // listening port
fwmark uint32 // mark value (0 = disabled)
}
- mutex sync.RWMutex
+ mutex deadlock.RWMutex
privateKey NoisePrivateKey
publicKey NoisePublicKey
routingTable RoutingTable
device.mutex.Lock()
defer device.mutex.Unlock()
- device.isUp.Set(true)
- updateBind(device)
+ device.net.mutex.Lock()
+ defer device.net.mutex.Unlock()
+
+ if device.isUp.Swap(true) {
+ return
+ }
+
+ unsafeUpdateBind(device)
+
for _, peer := range device.peers {
peer.Start()
}
device.mutex.Lock()
defer device.mutex.Unlock()
- device.isUp.Set(false)
+ if !device.isUp.Swap(false) {
+ return
+ }
+
closeBind(device)
+
for _, peer := range device.peers {
peer.Stop()
}
if !ok {
return
}
- peer.mutex.Lock()
peer.Stop()
device.routingTable.RemovePeer(peer)
delete(device.peers, key)
"time"
)
+const (
+ PeerRoutineNumber = 4
+)
+
type Peer struct {
id uint
mutex sync.RWMutex
flushNonceQueue Signal // size 1, empty queued packets
messageSend Signal // size 1, message was send to peer
messageReceived Signal // size 1, authenticated message recv
- stop Signal // size 0, stop all goroutines in peer
}
timer struct {
// state related to WireGuard timers
outbound chan *QueueOutboundElement // sequential ordering of work
inbound chan *QueueInboundElement // sequential ordering of work
}
+ routines struct {
+ mutex sync.Mutex // held when stopping / starting routines
+ starting sync.WaitGroup // routines pending start
+ stopping sync.WaitGroup // routines pending stop
+ stop Signal // size 0, stop all goroutines in peer
+ }
mac CookieGenerator
}
peer.signal.handshakeCompleted = NewSignal()
peer.signal.flushNonceQueue = NewSignal()
+ peer.routines.mutex.Lock()
+ peer.routines.stop = NewSignal()
+ peer.routines.mutex.Unlock()
+
return peer, nil
}
)
}
-/* Starts all routines for a given peer
- *
- * Requires that the caller holds the exclusive peer lock!
- */
-func unsafePeerStart(peer *Peer) {
- peer.signal.stop.Broadcast()
- peer.signal.stop = NewSignal()
+func (peer *Peer) Start() {
+
+ peer.routines.mutex.Lock()
+ defer peer.routines.mutex.Lock()
+
+ // stop & wait for ungoing routines (if any)
+
+ peer.routines.stop.Broadcast()
+ peer.routines.starting.Wait()
+ peer.routines.stopping.Wait()
- var wait sync.WaitGroup
+ // reset signal and start (new) routines
- wait.Add(1)
+ peer.routines.stop = NewSignal()
+ peer.routines.starting.Add(PeerRoutineNumber)
+ peer.routines.stopping.Add(PeerRoutineNumber)
go peer.RoutineNonce()
- go peer.RoutineTimerHandler(&wait)
+ go peer.RoutineTimerHandler()
go peer.RoutineSequentialSender()
go peer.RoutineSequentialReceiver()
- wait.Wait()
-}
-
-func (peer *Peer) Start() {
- peer.mutex.Lock()
- unsafePeerStart(peer)
- peer.mutex.Unlock()
+ peer.routines.starting.Wait()
}
func (peer *Peer) Stop() {
- peer.signal.stop.Broadcast()
+
+ peer.routines.mutex.Lock()
+ defer peer.routines.mutex.Lock()
+
+ // stop & wait for ungoing routines (if any)
+
+ peer.routines.stop.Broadcast()
+ peer.routines.starting.Wait()
+ peer.routines.stopping.Wait()
+
+ // reset signal (to handle repeated stopping)
+
+ peer.routines.stop = NewSignal()
}
select {
- case <-peer.signal.stop.Wait():
+ case <-peer.routines.stop.Wait():
logDebug.Println("Routine, sequential receiver, stopped for peer", peer.id)
return
for {
NextPacket:
select {
- case <-peer.signal.stop.Wait():
+ case <-peer.routines.stop.Wait():
return
case elem := <-peer.queue.nonce:
logDebug.Println("Clearing queue for", peer.String())
peer.FlushNonceQueue()
goto NextPacket
- case <-peer.signal.stop.Wait():
+ case <-peer.routines.stop.Wait():
return
}
}
* The routine terminates then the outbound queue is closed.
*/
func (peer *Peer) RoutineSequentialSender() {
+
+ defer peer.routines.stopping.Done()
+
device := peer.device
logDebug := device.log.Debug
logDebug.Println("Routine, sequential sender, started for", peer.String())
+ peer.routines.starting.Done()
+
for {
select {
- case <-peer.signal.stop.Wait():
+ case <-peer.routines.stop.Wait():
logDebug.Println(
"Routine, sequential sender, stopped for", peer.String())
return
"bytes"\r
"encoding/binary"\r
"math/rand"\r
- "sync"\r
"sync/atomic"\r
"time"\r
)\r
return err\r
}\r
\r
-func (peer *Peer) RoutineTimerHandler(ready *sync.WaitGroup) {\r
+func (peer *Peer) RoutineTimerHandler() {\r
+\r
+ defer peer.routines.stopping.Done()\r
+\r
device := peer.device\r
\r
logInfo := device.log.Info\r
peer.timer.keepalivePersistent.Reset(duration)\r
}\r
\r
- // signal that timers are reset\r
+ // signal synchronised setup complete\r
\r
- ready.Done()\r
+ peer.routines.starting.Done()\r
\r
// handle timer events\r
\r
for {\r
select {\r
\r
+ /* stopping */\r
+\r
+ case <-peer.routines.stop.Wait():\r
+ return\r
+\r
/* timers */\r
\r
// keep-alive\r
\r
/* signals */\r
\r
- case <-peer.signal.stop.Wait():\r
- return\r
-\r
case <-peer.signal.handshakeBegin.Wait():\r
\r
peer.signal.handshakeBegin.Disable()\r
}
}
- if event&TUNEventUp != 0 {
+ if event&TUNEventUp != 0 && !device.isUp.Get() {
logInfo.Println("Interface set up")
device.Up()
}
- if event&TUNEventDown != 0 {
+ if event&TUNEventDown != 0 && device.isUp.Get() {
logInfo.Println("Interface set down")
- device.Up()
+ device.Down()
}
}
}
device.SetPrivateKey(sk)
case "listen_port":
+
+ // parse port number
+
port, err := strconv.ParseUint(value, 10, 16)
if err != nil {
logError.Println("Failed to parse listen_port:", err)
return &IPCError{Code: ipcErrorInvalid}
}
+
+ // update port and rebind
+
+ device.mutex.Lock()
+ device.net.mutex.Lock()
+
device.net.port = uint16(port)
- if err := updateBind(device); err != nil {
+ err = unsafeUpdateBind(device)
+
+ device.net.mutex.Unlock()
+ device.mutex.Unlock()
+
+ if err != nil {
logError.Println("Failed to set listen_port:", err)
return &IPCError{Code: ipcErrorPortInUse}
}