q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutInboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushInboundQueue)
return q
}
+func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutInboundElement(elem)
+ default:
+ return
+ }
+ }
+}
+
type autodrainingOutboundQueue struct {
c chan *QueueOutboundElement
}
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushOutboundQueue)
return q
}
+
+func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ default:
+ return
+ }
+ }
+}
peer.timersStart()
+ device.flushInboundQueue(peer.queue.inbound)
+ device.flushOutboundQueue(peer.queue.outbound)
go peer.RoutineSequentialSender()
go peer.RoutineSequentialReceiver()