}
}
-func (peer *Peer) elementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueInboundElement) {
+func (peer *Peer) receiveElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueInboundElement) {
if !*shouldFlush {
select {
case <-peer.routines.stop:
*shouldFlush = false
err := peer.device.tun.device.Flush()
if err != nil {
- peer.device.log.Error.Printf("Unable to flush packets: %v", err)
+ peer.device.log.Error.Printf("Unable to flush receive packets: %v", err)
}
- return peer.elementStopOrFlush(shouldFlush)
+ return peer.receiveElementStopOrFlush(shouldFlush)
}
}
}
elem = nil
}
- stop, ok, elem = peer.elementStopOrFlush(&shouldFlush)
+ stop, ok, elem = peer.receiveElementStopOrFlush(&shouldFlush)
if stop || !ok {
return
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake initiation", err)
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake response", err)
}
var buff [MessageCookieReplySize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, reply)
- device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint)
+ device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint, true)
if err != nil {
device.log.Error.Println("Failed to send cookie reply:", err)
}
}
}
+func (peer *Peer) sendElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueOutboundElement) {
+ if !*shouldFlush {
+ select {
+ case <-peer.routines.stop:
+ stop = true
+ return
+ case elem, elemOk = <-peer.queue.outbound:
+ return
+ }
+ } else {
+ select {
+ case <-peer.routines.stop:
+ stop = true
+ return
+ case elem, elemOk = <-peer.queue.outbound:
+ return
+ default:
+ *shouldFlush = false
+ err := peer.device.net.bind.Flush()
+ if err != nil {
+ peer.device.log.Error.Printf("Unable to flush send packets: %v", err)
+ }
+ return peer.sendElementStopOrFlush(shouldFlush)
+ }
+ }
+}
+
/* Sequentially reads packets from queue and sends to endpoint
*
* Obs. Single instance per peer.
peer.routines.starting.Done()
+ shouldFlush := false
for {
- select {
-
- case <-peer.routines.stop:
+ stop, ok, elem := peer.sendElementStopOrFlush(&shouldFlush)
+ if stop || !ok {
return
+ }
- case elem, ok := <-peer.queue.outbound:
-
- if !ok {
- return
- }
-
- elem.Lock()
- if elem.IsDropped() {
- device.PutOutboundElement(elem)
- continue
- }
-
- peer.timersAnyAuthenticatedPacketTraversal()
- peer.timersAnyAuthenticatedPacketSent()
+ elem.Lock()
+ if elem.IsDropped() {
+ device.PutOutboundElement(elem)
+ continue
+ }
- // send message and return buffer to pool
+ peer.timersAnyAuthenticatedPacketTraversal()
+ peer.timersAnyAuthenticatedPacketSent()
- err := peer.SendBuffer(elem.packet)
- if len(elem.packet) != MessageKeepaliveSize {
- peer.timersDataSent()
- }
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- if err != nil {
- logError.Println(peer, "- Failed to send data packet", err)
- continue
- }
+ // send message and return buffer to pool
- peer.keepKeyFreshSending()
+ err := peer.SendBuffer(elem.packet, false)
+ if len(elem.packet) != MessageKeepaliveSize {
+ peer.timersDataSent()
}
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ if err != nil {
+ logError.Println(peer, "- Failed to send data packet", err)
+ continue
+ } else {
+ shouldFlush = true
+ }
+
+ peer.keepKeyFreshSending()
}
}