handshake chan QueueHandshakeElement
}
signal struct {
- stop chan struct{}
+ stop Signal
}
underLoadUntil atomic.Value
ratelimiter Ratelimiter
func NewDevice(tun TUNDevice, logger *Logger) *Device {
device := new(Device)
-
device.mutex.Lock()
defer device.mutex.Unlock()
// prepare signals
- device.signal.stop = make(chan struct{})
+ device.signal.stop = NewSignal()
// prepare net
go device.RoutineDecryption()
go device.RoutineHandshake()
}
+
go device.RoutineReadFromTUN()
go device.RoutineTUNEventReader()
go device.ratelimiter.RoutineGarbageCollector(device.signal.stop)
+
return device
}
}
device.log.Info.Println("Closing device")
device.RemoveAllPeers()
- close(device.signal.stop)
- closeBind(device)
+ device.signal.stop.Broadcast()
device.tun.device.Close()
+ closeBind(device)
}
-func (device *Device) WaitChannel() chan struct{} {
- return device.signal.stop
+func (device *Device) Wait() chan struct{} {
+ return device.signal.stop.Wait()
}
"strconv"
)
+import _ "net/http/pprof"
+import "net/http"
+import "log"
+
const (
ExitSetupSuccess = 0
ExitSetupFailed = 1
func main() {
+ go func() {
+ log.Println(http.ListenAndServe("localhost:6060", nil))
+ }()
+
// parse arguments
var foreground bool
errs := make(chan error)
term := make(chan os.Signal)
- wait := device.WaitChannel()
uapi, err := UAPIListen(interfaceName, fileUAPI)
signal.Notify(term, os.Interrupt)
select {
- case <-wait:
case <-term:
case <-errs:
+ case <-device.Wait():
}
// clean up
import (
"sync/atomic"
- "time"
)
-/* We use int32 as atomic bools
- * (since booleans are not natively supported by sync/atomic)
- */
+/* Atomic Boolean */
+
const (
AtomicFalse = int32(iota)
AtomicTrue
atomic.StoreInt32(&a.flag, flag)
}
+/* Integer manipulation */
+
func toInt32(n uint32) int32 {
mask := uint32(1 << 31)
return int32(-(n & mask) + (n & ^mask))
}
return a
}
-
-func signalSend(c chan struct{}) {
- select {
- case c <- struct{}{}:
- default:
- }
-}
-
-func signalClear(c chan struct{}) {
- select {
- case <-c:
- default:
- }
-}
-
-func timerStop(timer *time.Timer) {
- if !timer.Stop() {
- select {
- case <-timer.C:
- default:
- }
- }
-}
-
-func NewStoppedTimer() *time.Timer {
- timer := time.NewTimer(time.Hour)
- timerStop(timer)
- return timer
-}
rate.mutex.Unlock()
}
-func (rate *Ratelimiter) RoutineGarbageCollector(stop chan struct{}) {
+func (rate *Ratelimiter) RoutineGarbageCollector(stop Signal) {
timer := time.NewTimer(time.Second)
for {
select {
- case <-stop:
+ case <-stop.Wait():
return
case <-timer.C:
rate.GarbageCollectEntries()
}
}
+/* Receives incoming datagrams for the device
+ *
+ * Every time the bind is updated a new routine is started for
+ * IPv4 and IPv6 (separately)
+ */
func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
logDebug := device.log.Debug
device.addToDecryptionQueue(device.queue.decryption, elem)
device.addToInboundQueue(peer.queue.inbound, elem)
buffer = device.GetMessageBuffer()
+
continue
// otherwise it is a fixed size & handshake related packet
for {
select {
- case <-device.signal.stop:
+ case <-device.signal.stop.Wait():
logDebug.Println("Routine, decryption worker, stopped")
return
}
}
-/* Handles incomming packets related to handshake
+/* Handles incoming packets related to handshake
*/
func (device *Device) RoutineHandshake() {
for {
select {
case elem = <-device.queue.handshake:
- case <-device.signal.stop:
+ case <-device.signal.stop.Wait():
return
}
continue
}
- // handle handshake initation/response content
+ // handle handshake initiation/response content
switch elem.msgType {
case MessageInitiationType:
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
logInfo.Println(
- "Recieved invalid initiation message from",
+ "Received invalid initiation message from",
elem.endpoint.DstToString(),
)
continue
peer.endpoint = elem.endpoint
peer.mutex.Unlock()
- logDebug.Println("Received handshake initation from", peer)
+ logDebug.Println("Received handshake initiation from", peer)
peer.TimerEphemeralKeyCreated()
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.routingTable.LookupIPv4(src) != peer {
logInfo.Println(
- "IPv4 packet with unallowed source address from",
+ "IPv4 packet with disallowed source address from",
peer.String(),
)
continue
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.routingTable.LookupIPv6(src) != peer {
logInfo.Println(
- "IPv6 packet with unallowed source address from",
+ "IPv6 packet with disallowed source address from",
peer.String(),
)
continue
"time"
)
-/* Handles outbound flow
+/* Outbound flow
*
* 1. TUN queue
* 2. Routing (sequential)
* 4. Encryption (parallel)
* 5. Transmission (sequential)
*
- * The order of packets (per peer) is maintained.
- * The functions in this file occure (roughly) in the order packets are processed.
- */
-
-/* The sequential consumers will attempt to take the lock,
+ * The functions in this file occur (roughly) in the order in
+ * which the packets are processed.
+ *
+ * Locking, Producers and Consumers
+ *
+ * The order of packets (per peer) must be maintained,
+ * but encryption of packets happen out-of-order:
+ *
+ * The sequential consumers will attempt to take the lock,
* workers release lock when they have completed work (encryption) on the packet.
*
* If the element is inserted into the "encryption queue",
- * the content is preceeded by enough "junk" to contain the transport header
+ * the content is preceded by enough "junk" to contain the transport header
* (to allow the construction of transport messages in-place)
*/
+
type QueueOutboundElement struct {
dropped int32
mutex sync.Mutex
peer = device.routingTable.LookupIPv6(dst)
default:
- logDebug.Println("Receieved packet with unknown IP version")
+ logDebug.Println("Received packet with unknown IP version")
}
if peer == nil {
// fetch next element
select {
- case <-device.signal.stop:
+ case <-device.signal.stop.Wait():
logDebug.Println("Routine, encryption worker, stopped")
return
s.enabled.Set(true)
}
+/* Unblock exactly one listener
+ */
func (s *Signal) Send() {
if s.enabled.Get() {
select {
}
}
+/* Clear the signal if already fired
+ */
func (s Signal) Clear() {
select {
case <-s.C:
}
}
+/* Unblocks all listeners (forever)
+ */
func (s Signal) Broadcast() {
- close(s.C) // unblocks all selectors
+ close(s.C)
}
+/* Wait for the signal
+ */
func (s Signal) Wait() chan struct{} {
return s.C
}
\r
/* Called when a new authenticated message has been received\r
*\r
- * NOTE: Not thread safe (called by sequential receiver)\r
+ * NOTE: Not thread safe, but called by sequential receiver!\r
*/\r
func (peer *Peer) KeepKeyFreshReceiving() {\r
if peer.timer.sendLastMinuteHandshake {\r
* same way as those created by the "net" functions.
* Here the IPs are slices of either 4 or 16 byte (not always 16)
*
- * Syncronization done seperatly
+ * Synchronization done separately
* See: routing.go
- *
- * TODO: Better commenting
*/
type Trie struct {
}
/* Finds length of matching prefix
- * TODO: Make faster
+ *
+ * TODO: Only use during insertion (xor + prefix mask for lookup)
+ * Check out
+ * prefix_matches(struct allowedips_node *node, const u8 *key, u8 bits)
+ * https://git.zx2c4.com/WireGuard/commit/?h=jd/precomputed-prefix-match
*
* Assumption:
* len(ip1) == len(ip2)
return node
}
- // walk recursivly
+ // walk recursively
node.child[0] = node.child[0].RemovePeer(p)
node.child[1] = node.child[1].RemovePeer(p)