+++ /dev/null
-/*
- * Copyright (c) 2010-2013 BitTorrent, Inc.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-
-#include <stdio.h>
-#include <assert.h>
-#include <string.h>
-#include <string.h>
-#include <stdlib.h>
-#include <errno.h>
-#include <limits.h> // for UINT_MAX
-
-#include "utp_types.h"
-#include "utp_packedsockaddr.h"
-#include "utp_internal.h"
-#include "utp_hash.h"
-
-#define TIMEOUT_CHECK_INTERVAL 500
-
-// number of bytes to increase max window size by, per RTT. This is
-// scaled down linearly proportional to off_target. i.e. if all packets
-// in one window have 0 delay, window size will increase by this number.
-// Typically it's less. TCP increases one MSS per RTT, which is 1500
-#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
-#define CUR_DELAY_SIZE 3
-// experiments suggest that a clock skew of 10 ms per 325 seconds
-// is not impossible. Reset delay_base every 13 minutes. The clock
-// skew is dealt with by observing the delay base in the other
-// direction, and adjusting our own upwards if the opposite direction
-// delay base keeps going down
-#define DELAY_BASE_HISTORY 13
-#define MAX_WINDOW_DECAY 100 // ms
-
-#define REORDER_BUFFER_SIZE 32
-#define REORDER_BUFFER_MAX_SIZE 1024
-#define OUTGOING_BUFFER_MAX_SIZE 1024
-
-#define PACKET_SIZE 1435
-
-// this is the minimum max_window value. It can never drop below this
-#define MIN_WINDOW_SIZE 10
-
-// if we receive 4 or more duplicate acks, we resend the packet
-// that hasn't been acked yet
-#define DUPLICATE_ACKS_BEFORE_RESEND 3
-
-// Allow a reception window of at least 3 ack_nrs behind seq_nr
-// A non-SYN packet with an ack_nr difference greater than this is
-// considered suspicious and ignored
-#define ACK_NR_ALLOWED_WINDOW DUPLICATE_ACKS_BEFORE_RESEND
-
-#define RST_INFO_TIMEOUT 10000
-#define RST_INFO_LIMIT 1000
-// 29 seconds determined from measuring many home NAT devices
-#define KEEPALIVE_INTERVAL 29000
-
-
-#define SEQ_NR_MASK 0xFFFF
-#define ACK_NR_MASK 0xFFFF
-#define TIMESTAMP_MASK 0xFFFFFFFF
-
-#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
-
-// The totals are derived from the following data:
-// 45: IPv6 address including embedded IPv4 address
-// 11: Scope Id
-// 2: Brackets around IPv6 address when port is present
-// 6: Port (including colon)
-// 1: Terminating null byte
-char addrbuf[65];
-#define addrfmt(x, s) x.fmt(s, sizeof(s))
-
-
-#if (defined(__SVR4) && defined(__sun))
- #pragma pack(1)
-#else
- #pragma pack(push,1)
-#endif
-
-
-// these packet sizes are including the uTP header wich
-// is either 20 or 23 bytes depending on version
-#define PACKET_SIZE_EMPTY_BUCKET 0
-#define PACKET_SIZE_EMPTY 23
-#define PACKET_SIZE_SMALL_BUCKET 1
-#define PACKET_SIZE_SMALL 373
-#define PACKET_SIZE_MID_BUCKET 2
-#define PACKET_SIZE_MID 723
-#define PACKET_SIZE_BIG_BUCKET 3
-#define PACKET_SIZE_BIG 1400
-#define PACKET_SIZE_HUGE_BUCKET 4
-
-struct PACKED_ATTRIBUTE PacketFormatV1 {
- // packet_type (4 high bits)
- // protocol version (4 low bits)
- byte ver_type;
- byte version() const { return ver_type & 0xf; }
- byte type() const { return ver_type >> 4; }
- void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
- void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
-
- // Type of the first extension header
- byte ext;
- // connection ID
- uint16_big connid;
- uint32_big tv_usec;
- uint32_big reply_micro;
- // receive window size in bytes
- uint32_big windowsize;
- // Sequence number
- uint16_big seq_nr;
- // Acknowledgment number
- uint16_big ack_nr;
-};
-
-struct PACKED_ATTRIBUTE PacketFormatAckV1 {
- PacketFormatV1 pf;
- byte ext_next;
- byte ext_len;
- byte acks[4];
-};
-
-#if (defined(__SVR4) && defined(__sun))
- #pragma pack(0)
-#else
- #pragma pack(pop)
-#endif
-
-enum {
- ST_DATA = 0, // Data packet.
- ST_FIN = 1, // Finalize the connection. This is the last packet.
- ST_STATE = 2, // State packet. Used to transmit an ACK with no data.
- ST_RESET = 3, // Terminate connection forcefully.
- ST_SYN = 4, // Connect SYN
- ST_NUM_STATES, // used for bounds checking
-};
-
-static const cstr flagnames[] = {
- "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
-};
-
-enum CONN_STATE {
- CS_UNINITIALIZED = 0,
- CS_IDLE,
- CS_SYN_SENT,
- CS_SYN_RECV,
- CS_CONNECTED,
- CS_CONNECTED_FULL,
- CS_GOT_FIN,
- CS_DESTROY_DELAY,
- CS_FIN_SENT,
- CS_RESET,
- CS_DESTROY
-};
-
-static const cstr statenames[] = {
- "UNINITIALIZED", "IDLE","SYN_SENT", "SYN_RECV", "CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
-};
-
-struct OutgoingPacket {
- size_t length;
- size_t payload;
- uint64 time_sent; // microseconds
- uint transmissions:31;
- bool need_resend:1;
- byte data[1];
-};
-
-struct SizableCircularBuffer {
- // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
- size_t mask;
- // This is the elements that the circular buffer points to
- void **elements;
-
- void *get(size_t i) { assert(elements); return elements ? elements[i & mask] : NULL; }
- void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
-
- void grow(size_t item, size_t index);
- void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
- size_t size() { return mask + 1; }
-};
-
-// Item contains the element we want to make space for
-// index is the index in the list.
-void SizableCircularBuffer::grow(size_t item, size_t index)
-{
- // Figure out the new size.
- size_t size = mask + 1;
- do size *= 2; while (index >= size);
-
- // Allocate the new buffer
- void **buf = (void**)calloc(size, sizeof(void*));
-
- size--;
-
- // Copy elements from the old buffer to the new buffer
- for (size_t i = 0; i <= mask; i++) {
- buf[(item - index + i) & size] = get(item - index + i);
- }
-
- // Swap to the newly allocated buffer
- mask = size;
- free(elements);
- elements = buf;
-}
-
-// compare if lhs is less than rhs, taking wrapping
-// into account. if lhs is close to UINT_MAX and rhs
-// is close to 0, lhs is assumed to have wrapped and
-// considered smaller
-bool wrapping_compare_less(uint32 lhs, uint32 rhs, uint32 mask)
-{
- // distance walking from lhs to rhs, downwards
- const uint32 dist_down = (lhs - rhs) & mask;
- // distance walking from lhs to rhs, upwards
- const uint32 dist_up = (rhs - lhs) & mask;
-
- // if the distance walking up is shorter, lhs
- // is less than rhs. If the distance walking down
- // is shorter, then rhs is less than lhs
- return dist_up < dist_down;
-}
-
-struct DelayHist {
- uint32 delay_base;
-
- // this is the history of delay samples,
- // normalized by using the delay_base. These
- // values are always greater than 0 and measures
- // the queuing delay in microseconds
- uint32 cur_delay_hist[CUR_DELAY_SIZE];
- size_t cur_delay_idx;
-
- // this is the history of delay_base. It's
- // a number that doesn't have an absolute meaning
- // only relative. It doesn't make sense to initialize
- // it to anything other than values relative to
- // what's been seen in the real world.
- uint32 delay_base_hist[DELAY_BASE_HISTORY];
- size_t delay_base_idx;
- // the time when we last stepped the delay_base_idx
- uint64 delay_base_time;
-
- bool delay_base_initialized;
-
- void clear(uint64 current_ms)
- {
- delay_base_initialized = false;
- delay_base = 0;
- cur_delay_idx = 0;
- delay_base_idx = 0;
- delay_base_time = current_ms;
- for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
- cur_delay_hist[i] = 0;
- }
- for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
- delay_base_hist[i] = 0;
- }
- }
-
- void shift(const uint32 offset)
- {
- // the offset should never be "negative"
- // assert(offset < 0x10000000);
-
- // increase all of our base delays by this amount
- // this is used to take clock skew into account
- // by observing the other side's changes in its base_delay
- for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
- delay_base_hist[i] += offset;
- }
- delay_base += offset;
- }
-
- void add_sample(const uint32 sample, uint64 current_ms)
- {
- // The two clocks (in the two peers) are assumed not to
- // progress at the exact same rate. They are assumed to be
- // drifting, which causes the delay samples to contain
- // a systematic error, either they are under-
- // estimated or over-estimated. This is why we update the
- // delay_base every two minutes, to adjust for this.
-
- // This means the values will keep drifting and eventually wrap.
- // We can cross the wrapping boundry in two directions, either
- // going up, crossing the highest value, or going down, crossing 0.
-
- // if the delay_base is close to the max value and sample actually
- // wrapped on the other end we would see something like this:
- // delay_base = 0xffffff00, sample = 0x00000400
- // sample - delay_base = 0x500 which is the correct difference
-
- // if the delay_base is instead close to 0, and we got an even lower
- // sample (that will eventually update the delay_base), we may see
- // something like this:
- // delay_base = 0x00000400, sample = 0xffffff00
- // sample - delay_base = 0xfffffb00
- // this needs to be interpreted as a negative number and the actual
- // recorded delay should be 0.
-
- // It is important that all arithmetic that assume wrapping
- // is done with unsigned intergers. Signed integers are not guaranteed
- // to wrap the way unsigned integers do. At least GCC takes advantage
- // of this relaxed rule and won't necessarily wrap signed ints.
-
- // remove the clock offset and propagation delay.
- // delay base is min of the sample and the current
- // delay base. This min-operation is subject to wrapping
- // and care needs to be taken to correctly choose the
- // true minimum.
-
- // specifically the problem case is when delay_base is very small
- // and sample is very large (because it wrapped past zero), sample
- // needs to be considered the smaller
-
- if (!delay_base_initialized) {
- // delay_base being 0 suggests that we haven't initialized
- // it or its history with any real measurements yet. Initialize
- // everything with this sample.
- for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
- // if we don't have a value, set it to the current sample
- delay_base_hist[i] = sample;
- continue;
- }
- delay_base = sample;
- delay_base_initialized = true;
- }
-
- if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx], TIMESTAMP_MASK)) {
- // sample is smaller than the current delay_base_hist entry
- // update it
- delay_base_hist[delay_base_idx] = sample;
- }
-
- // is sample lower than delay_base? If so, update delay_base
- if (wrapping_compare_less(sample, delay_base, TIMESTAMP_MASK)) {
- // sample is smaller than the current delay_base
- // update it
- delay_base = sample;
- }
-
- // this operation may wrap, and is supposed to
- const uint32 delay = sample - delay_base;
- // sanity check. If this is triggered, something fishy is going on
- // it means the measured sample was greater than 32 seconds!
- //assert(delay < 0x2000000);
-
- cur_delay_hist[cur_delay_idx] = delay;
- cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
-
- // once every minute
- if (current_ms - delay_base_time > 60 * 1000) {
- delay_base_time = current_ms;
- delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
- // clear up the new delay base history spot by initializing
- // it to the current sample, then update it
- delay_base_hist[delay_base_idx] = sample;
- delay_base = delay_base_hist[0];
- // Assign the lowest delay in the last 2 minutes to delay_base
- for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
- if (wrapping_compare_less(delay_base_hist[i], delay_base, TIMESTAMP_MASK))
- delay_base = delay_base_hist[i];
- }
- }
- }
-
- uint32 get_value()
- {
- uint32 value = UINT_MAX;
- for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
- value = min<uint32>(cur_delay_hist[i], value);
- }
- // value could be UINT_MAX if we have no samples yet...
- return value;
- }
-};
-
-struct UTPSocket {
- ~UTPSocket();
-
- PackedSockAddr addr;
- utp_context *ctx;
-
- int ida; //for ack socket list
-
- uint16 retransmit_count;
-
- uint16 reorder_count;
- byte duplicate_ack;
-
- // the number of packets in the send queue. Packets that haven't
- // yet been sent count as well as packets marked as needing resend
- // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
- uint16 cur_window_packets;
-
- // how much of the window is used, number of bytes in-flight
- // packets that have not yet been sent do not count, packets
- // that are marked as needing to be re-sent (due to a timeout)
- // don't count either
- size_t cur_window;
- // maximum window size, in bytes
- size_t max_window;
- // UTP_SNDBUF setting, in bytes
- size_t opt_sndbuf;
- // UTP_RCVBUF setting, in bytes
- size_t opt_rcvbuf;
-
- // this is the target delay, in microseconds
- // for this socket. defaults to 100000.
- size_t target_delay;
-
- // Is a FIN packet in the reassembly buffer?
- bool got_fin:1;
- // Timeout procedure
- bool fast_timeout:1;
-
- // max receive window for other end, in bytes
- size_t max_window_user;
- CONN_STATE state;
- // TickCount when we last decayed window (wraps)
- int64 last_rwin_decay;
-
- // the sequence number of the FIN packet. This field is only set
- // when we have received a FIN, and the flag field has the FIN flag set.
- // it is used to know when it is safe to destroy the socket, we must have
- // received all packets up to this sequence number first.
- uint16 eof_pkt;
-
- // All sequence numbers up to including this have been properly received
- // by us
- uint16 ack_nr;
- // This is the sequence number for the next packet to be sent.
- uint16 seq_nr;
-
- uint16 timeout_seq_nr;
-
- // This is the sequence number of the next packet we're allowed to
- // do a fast resend with. This makes sure we only do a fast-resend
- // once per packet. We can resend the packet with this sequence number
- // or any later packet (with a higher sequence number).
- uint16 fast_resend_seq_nr;
-
- uint32 reply_micro;
-
- uint64 last_got_packet;
- uint64 last_sent_packet;
- uint64 last_measured_delay;
-
- // timestamp of the last time the cwnd was full
- // this is used to prevent the congestion window
- // from growing when we're not sending at capacity
- mutable uint64 last_maxed_out_window;
-
- void *userdata;
-
- // Round trip time
- uint rtt;
- // Round trip time variance
- uint rtt_var;
- // Round trip timeout
- uint rto;
- DelayHist rtt_hist;
- uint retransmit_timeout;
- // The RTO timer will timeout here.
- uint64 rto_timeout;
- // When the window size is set to zero, start this timer. It will send a new packet every 30secs.
- uint64 zerowindow_time;
-
- uint32 conn_seed;
- // Connection ID for packets I receive
- uint32 conn_id_recv;
- // Connection ID for packets I send
- uint32 conn_id_send;
- // Last rcv window we advertised, in bytes
- size_t last_rcv_win;
-
- DelayHist our_hist;
- DelayHist their_hist;
-
- // extension bytes from SYN packet
- byte extensions[8];
-
- // MTU Discovery
- // time when we should restart the MTU discovery
- uint64 mtu_discover_time;
- // ceiling and floor of binary search. last is the mtu size
- // we're currently using
- uint32 mtu_ceiling, mtu_floor, mtu_last;
- // we only ever have a single probe in flight at any given time.
- // this is the sequence number of that probe, and the size of
- // that packet
- uint32 mtu_probe_seq, mtu_probe_size;
-
- // this is the average delay samples, as compared to the initial
- // sample. It's averaged over 5 seconds
- int32 average_delay;
- // this is the sum of all the delay samples
- // we've made recently. The important distinction
- // of these samples is that they are all made compared
- // to the initial sample, this is to deal with
- // wrapping in a simple way.
- int64 current_delay_sum;
- // number of sample ins current_delay_sum
- int current_delay_samples;
- // initialized to 0, set to the first raw delay sample
- // each sample that's added to current_delay_sum
- // is subtracted from the value first, to make it
- // a delay relative to this sample
- uint32 average_delay_base;
- // the next time we should add an average delay
- // sample into average_delay_hist
- uint64 average_sample_time;
- // the estimated clock drift between our computer
- // and the endpoint computer. The unit is microseconds
- // per 5 seconds
- int32 clock_drift;
- // just used for logging
- int32 clock_drift_raw;
-
- SizableCircularBuffer inbuf, outbuf;
-
- #ifdef _DEBUG
- // Public per-socket statistics, returned by utp_get_stats()
- utp_socket_stats _stats;
- #endif
-
- // true if we're in slow-start (exponential growth) phase
- bool slow_start;
-
- // the slow-start threshold, in bytes
- size_t ssthresh;
-
- void log(int level, char const *fmt, ...)
- {
- va_list va;
- char buf[4096], buf2[4096];
-
- va_start(va, fmt);
- vsnprintf(buf, 4096, fmt, va);
- va_end(va);
- buf[4095] = '\0';
-
- snprintf(buf2, 4096, "%p %s %06d %s", this, addrfmt(addr, addrbuf), conn_id_recv, buf);
- buf2[4095] = '\0';
-
- ctx->log(level, this, buf2);
- }
-
- void schedule_ack();
-
- // called every time mtu_floor or mtu_ceiling are adjusted
- void mtu_search_update();
- void mtu_reset();
-
- // Calculates the current receive window
- size_t get_rcv_window()
- {
- // Trim window down according to what's already in buffer.
- const size_t numbuf = utp_call_get_read_buffer_size(this->ctx, this);
- assert((int)numbuf >= 0);
- return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
- }
-
- // Test if we're ready to decay max_window
- // XXX this breaks when spaced by > INT_MAX/2, which is 49
- // days; the failure mode in that case is we do an extra decay
- // or fail to do one when we really shouldn't.
- bool can_decay_win(int64 msec) const
- {
- return (msec - last_rwin_decay) >= MAX_WINDOW_DECAY;
- }
-
- // If we can, decay max window, returns true if we actually did so
- void maybe_decay_win(uint64 current_ms)
- {
- if (can_decay_win(current_ms)) {
- // TCP uses 0.5
- max_window = (size_t)(max_window * .5);
- last_rwin_decay = current_ms;
- if (max_window < MIN_WINDOW_SIZE)
- max_window = MIN_WINDOW_SIZE;
- slow_start = false;
- ssthresh = max_window;
- }
- }
-
- size_t get_header_size() const
- {
- return sizeof(PacketFormatV1);
- }
-
- size_t get_udp_mtu()
- {
- socklen_t len;
- SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
- return utp_call_get_udp_mtu(this->ctx, this, (const struct sockaddr *)&sa, len);
- }
-
- size_t get_udp_overhead()
- {
- socklen_t len;
- SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
- return utp_call_get_udp_overhead(this->ctx, this, (const struct sockaddr *)&sa, len);
- }
-
- size_t get_overhead()
- {
- return get_udp_overhead() + get_header_size();
- }
-
- void send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags = 0);
-
- void send_ack(bool synack = false);
-
- void send_keep_alive();
-
- static void send_rst(utp_context *ctx,
- const PackedSockAddr &addr, uint32 conn_id_send,
- uint16 ack_nr, uint16 seq_nr);
-
- void send_packet(OutgoingPacket *pkt);
-
- bool is_full(int bytes = -1);
- bool flush_packets();
- void write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs);
-
- #ifdef _DEBUG
- void check_invariant();
- #endif
-
- void check_timeouts();
- int ack_packet(uint16 seq);
- size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
- void selective_ack(uint base, const byte *mask, byte len);
- void apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
- size_t get_packet_size() const;
-};
-
-void removeSocketFromAckList(UTPSocket *conn)
-{
- if (conn->ida >= 0)
- {
- UTPSocket *last = conn->ctx->ack_sockets[conn->ctx->ack_sockets.GetCount() - 1];
-
- assert(last->ida < (int)(conn->ctx->ack_sockets.GetCount()));
- assert(conn->ctx->ack_sockets[last->ida] == last);
- last->ida = conn->ida;
- conn->ctx->ack_sockets[conn->ida] = last;
- conn->ida = -1;
-
- // Decrease the count
- conn->ctx->ack_sockets.SetCount(conn->ctx->ack_sockets.GetCount() - 1);
- }
-}
-
-static void utp_register_sent_packet(utp_context *ctx, size_t length)
-{
- if (length <= PACKET_SIZE_MID) {
- if (length <= PACKET_SIZE_EMPTY) {
- ctx->context_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
- } else if (length <= PACKET_SIZE_SMALL) {
- ctx->context_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
- } else
- ctx->context_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
- } else {
- if (length <= PACKET_SIZE_BIG) {
- ctx->context_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
- } else
- ctx->context_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
- }
-}
-
-void send_to_addr(utp_context *ctx, const byte *p, size_t len, const PackedSockAddr &addr, int flags = 0)
-{
- socklen_t tolen;
- SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
- utp_register_sent_packet(ctx, len);
- utp_call_sendto(ctx, NULL, p, len, (const struct sockaddr *)&to, tolen, flags);
-}
-
-void UTPSocket::schedule_ack()
-{
- if (ida == -1){
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "schedule_ack");
- #endif
- ida = ctx->ack_sockets.Append(this);
- } else {
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "schedule_ack: already in list");
- #endif
- }
-}
-
-void UTPSocket::send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags)
-{
- // time stamp this packet with local time, the stamp goes into
- // the header of every packet at the 8th byte for 8 bytes :
- // two integers, check packet.h for more
- uint64 time = utp_call_get_microseconds(ctx, this);
-
- PacketFormatV1* b1 = (PacketFormatV1*)b;
- b1->tv_usec = (uint32)time;
- b1->reply_micro = reply_micro;
-
- last_sent_packet = ctx->current_ms;
-
- #ifdef _DEBUG
- _stats.nbytes_xmit += length;
- ++_stats.nxmit;
- #endif
-
- if (ctx->callbacks[UTP_ON_OVERHEAD_STATISTICS]) {
- size_t n;
- if (type == payload_bandwidth) {
- // if this packet carries payload, just
- // count the header as overhead
- type = header_overhead;
- n = get_overhead();
- } else {
- n = length + get_udp_overhead();
- }
- utp_call_on_overhead_statistics(ctx, this, true, n, type);
- }
-#if UTP_DEBUG_LOGGING
- int flags2 = b1->type();
- uint16 seq_nr = b1->seq_nr;
- uint16 ack_nr = b1->ack_nr;
- log(UTP_LOG_DEBUG, "send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
- addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags2],
- seq_nr, ack_nr);
-#endif
- send_to_addr(ctx, b, length, addr, flags);
- removeSocketFromAckList(this);
-}
-
-void UTPSocket::send_ack(bool synack)
-{
- PacketFormatAckV1 pfa;
- zeromem(&pfa);
-
- size_t len;
- last_rcv_win = get_rcv_window();
- pfa.pf.set_version(1);
- pfa.pf.set_type(ST_STATE);
- pfa.pf.ext = 0;
- pfa.pf.connid = conn_id_send;
- pfa.pf.ack_nr = ack_nr;
- pfa.pf.seq_nr = seq_nr;
- pfa.pf.windowsize = (uint32)last_rcv_win;
- len = sizeof(PacketFormatV1);
-
- // we never need to send EACK for connections
- // that are shutting down
- if (reorder_count != 0 && state < CS_GOT_FIN) {
- // if reorder count > 0, send an EACK.
- // reorder count should always be 0
- // for synacks, so this should not be
- // as synack
- assert(!synack);
- pfa.pf.ext = 1;
- pfa.ext_next = 0;
- pfa.ext_len = 4;
- uint m = 0;
-
- // reorder count should only be non-zero
- // if the packet ack_nr + 1 has not yet
- // been received
- assert(inbuf.get(ack_nr + 1) == NULL);
- size_t window = min<size_t>(14+16, inbuf.size());
- // Generate bit mask of segments received.
- for (size_t i = 0; i < window; i++) {
- if (inbuf.get(ack_nr + i + 2) != NULL) {
- m |= 1 << i;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "EACK packet [%u]", ack_nr + i + 2);
- #endif
- }
- }
- pfa.acks[0] = (byte)m;
- pfa.acks[1] = (byte)(m >> 8);
- pfa.acks[2] = (byte)(m >> 16);
- pfa.acks[3] = (byte)(m >> 24);
- len += 4 + 2;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "Sending EACK %u [%u] bits:[%032b]", ack_nr, conn_id_send, m);
- #endif
- } else {
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "Sending ACK %u [%u]", ack_nr, conn_id_send);
- #endif
- }
-
- send_data((byte*)&pfa, len, ack_overhead);
- removeSocketFromAckList(this);
-}
-
-void UTPSocket::send_keep_alive()
-{
- ack_nr--;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "Sending KeepAlive ACK %u [%u]", ack_nr, conn_id_send);
- #endif
-
- send_ack();
- ack_nr++;
-}
-
-void UTPSocket::send_rst(utp_context *ctx,
- const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr)
-{
- PacketFormatV1 pf1;
- zeromem(&pf1);
-
- size_t len;
- pf1.set_version(1);
- pf1.set_type(ST_RESET);
- pf1.ext = 0;
- pf1.connid = conn_id_send;
- pf1.ack_nr = ack_nr;
- pf1.seq_nr = seq_nr;
- pf1.windowsize = 0;
- len = sizeof(PacketFormatV1);
-
-// LOG_DEBUG("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
-// LOG_DEBUG("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
- send_to_addr(ctx, (const byte*)&pf1, len, addr);
-}
-
-void UTPSocket::send_packet(OutgoingPacket *pkt)
-{
- // only count against the quota the first time we
- // send the packet. Don't enforce quota when closing
- // a socket. Only enforce the quota when we're sending
- // at slow rates (max window < packet size)
-
- //size_t max_send = min(max_window, opt_sndbuf, max_window_user);
- time_t cur_time = utp_call_get_milliseconds(this->ctx, this);
-
- if (pkt->transmissions == 0 || pkt->need_resend) {
- cur_window += pkt->payload;
- }
-
- pkt->need_resend = false;
-
- PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
- p1->ack_nr = ack_nr;
- pkt->time_sent = utp_call_get_microseconds(this->ctx, this);
-
- //socklen_t salen;
- //SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&salen);
- bool use_as_mtu_probe = false;
-
- // TODO: this is subject to nasty wrapping issues! Below as well
- if (mtu_discover_time < (uint64)cur_time) {
- // it's time to reset our MTU assupmtions
- // and trigger a new search
- mtu_reset();
- }
-
- // don't use packets that are larger then mtu_ceiling
- // as probes, since they were probably used as probes
- // already and failed, now we need it to fragment
- // just to get it through
- // if seq_nr == 1, the probe would end up being 0
- // which is a magic number representing no-probe
- // that why we don't send a probe for a packet with
- // sequence number 0
- if (mtu_floor < mtu_ceiling
- && pkt->length > mtu_floor
- && pkt->length <= mtu_ceiling
- && mtu_probe_seq == 0
- && seq_nr != 1
- && pkt->transmissions == 0) {
-
- // we've already incremented seq_nr
- // for this packet
- mtu_probe_seq = (seq_nr - 1) & ACK_NR_MASK;
- mtu_probe_size = pkt->length;
- assert(pkt->length >= mtu_floor);
- assert(pkt->length <= mtu_ceiling);
- use_as_mtu_probe = true;
- log(UTP_LOG_MTU, "MTU [PROBE] floor:%d ceiling:%d current:%d"
- , mtu_floor, mtu_ceiling, mtu_probe_size);
- }
-
- pkt->transmissions++;
- send_data((byte*)pkt->data, pkt->length,
- (state == CS_SYN_SENT) ? connect_overhead
- : (pkt->transmissions == 1) ? payload_bandwidth
- : retransmit_overhead, use_as_mtu_probe ? UTP_UDP_DONTFRAG : 0);
-}
-
-bool UTPSocket::is_full(int bytes)
-{
- size_t packet_size = get_packet_size();
- if (bytes < 0) bytes = packet_size;
- else if (bytes > (int)packet_size) bytes = (int)packet_size;
- size_t max_send = min(max_window, opt_sndbuf, max_window_user);
-
- // subtract one to save space for the FIN packet
- if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) {
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "is_full:false cur_window_packets:%d MAX:%d", cur_window_packets, OUTGOING_BUFFER_MAX_SIZE - 1);
- #endif
-
- last_maxed_out_window = ctx->current_ms;
- return true;
- }
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "is_full:%s. cur_window:%u pkt:%u max:%u cur_window_packets:%u max_window:%u"
- , (cur_window + bytes > max_send) ? "true" : "false"
- , cur_window, bytes, max_send, cur_window_packets
- , max_window);
- #endif
-
- if (cur_window + bytes > max_send) {
- last_maxed_out_window = ctx->current_ms;
- return true;
- }
- return false;
-}
-
-bool UTPSocket::flush_packets()
-{
- size_t packet_size = get_packet_size();
-
- // send packets that are waiting on the pacer to be sent
- // i has to be an unsigned 16 bit counter to wrap correctly
- // signed types are not guaranteed to wrap the way you expect
- for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
- if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
- // have we run out of quota?
- if (is_full()) return true;
-
- // Nagle check
- // don't send the last packet if we have one packet in-flight
- // and the current packet is still smaller than packet_size.
- if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
- cur_window_packets == 1 ||
- pkt->payload >= packet_size) {
- send_packet(pkt);
- }
- }
- return false;
-}
-
-// @payload: number of bytes to send
-// @flags: either ST_DATA, or ST_FIN
-// @iovec: base address of iovec array
-// @num_iovecs: number of iovecs in array
-void UTPSocket::write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs)
-{
- // Setup initial timeout timer
- if (cur_window_packets == 0) {
- retransmit_timeout = rto;
- rto_timeout = ctx->current_ms + retransmit_timeout;
- assert(cur_window == 0);
- }
-
- size_t packet_size = get_packet_size();
- do {
- assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
- assert(flags == ST_DATA || flags == ST_FIN);
-
- size_t added = 0;
-
- OutgoingPacket *pkt = NULL;
-
- if (cur_window_packets > 0) {
- pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
- }
-
- const size_t header_size = get_header_size();
- bool append = true;
-
- // if there's any room left in the last packet in the window
- // and it hasn't been sent yet, fill that frame first
- if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
- // Use the previous unsent packet
- added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
- pkt = (OutgoingPacket*)realloc(pkt,
- (sizeof(OutgoingPacket) - 1) +
- header_size +
- pkt->payload + added);
- outbuf.put(seq_nr - 1, pkt);
- append = false;
- assert(!pkt->need_resend);
- } else {
- // Create the packet to send.
- added = payload;
- pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
- header_size +
- added);
- pkt->payload = 0;
- pkt->transmissions = 0;
- pkt->need_resend = false;
- }
-
- if (added) {
- assert(flags == ST_DATA);
-
- // Fill it with data from the upper layer.
- unsigned char *p = pkt->data + header_size + pkt->payload;
- size_t needed = added;
-
- /*
- while (needed) {
- *p = *(char*)iovec[0].iov_base;
- p++;
- iovec[0].iov_base = (char *)iovec[0].iov_base + 1;
- needed--;
- }
- */
-
- for (size_t i = 0; i < num_iovecs && needed; i++) {
- if (iovec[i].iov_len == 0)
- continue;
-
- size_t num = min<size_t>(needed, iovec[i].iov_len);
- memcpy(p, iovec[i].iov_base, num);
-
- p += num;
-
- iovec[i].iov_len -= num;
- iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
- needed -= num;
- }
-
- assert(needed == 0);
- }
- pkt->payload += added;
- pkt->length = header_size + pkt->payload;
-
- last_rcv_win = get_rcv_window();
-
- PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
- p1->set_version(1);
- p1->set_type(flags);
- p1->ext = 0;
- p1->connid = conn_id_send;
- p1->windowsize = (uint32)last_rcv_win;
- p1->ack_nr = ack_nr;
-
- if (append) {
- // Remember the message in the outgoing queue.
- outbuf.ensure_size(seq_nr, cur_window_packets);
- outbuf.put(seq_nr, pkt);
- p1->seq_nr = seq_nr;
- seq_nr++;
- cur_window_packets++;
- }
-
- payload -= added;
-
- } while (payload);
-
- flush_packets();
-}
-
-#ifdef _DEBUG
-void UTPSocket::check_invariant()
-{
- if (reorder_count > 0) {
- assert(inbuf.get(ack_nr + 1) == NULL);
- }
-
- size_t outstanding_bytes = 0;
- for (int i = 0; i < cur_window_packets; ++i) {
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
- if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
- outstanding_bytes += pkt->payload;
- }
- assert(outstanding_bytes == cur_window);
-}
-#endif
-
-void UTPSocket::check_timeouts()
-{
- #ifdef _DEBUG
- check_invariant();
- #endif
-
- // this invariant should always be true
- assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "CheckTimeouts timeout:%d max_window:%u cur_window:%u "
- "state:%s cur_window_packets:%u",
- (int)(rto_timeout - ctx->current_ms), (uint)max_window, (uint)cur_window,
- statenames[state], cur_window_packets);
- #endif
-
- if (state != CS_DESTROY) flush_packets();
-
- switch (state) {
- case CS_SYN_SENT:
- case CS_SYN_RECV:
- case CS_CONNECTED_FULL:
- case CS_CONNECTED:
- case CS_FIN_SENT: {
-
- // Reset max window...
- if ((int)(ctx->current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
- max_window_user = PACKET_SIZE;
- }
-
- if ((int)(ctx->current_ms - rto_timeout) >= 0
- && rto_timeout > 0) {
-
- bool ignore_loss = false;
-
- if (cur_window_packets == 1
- && ((seq_nr - 1) & ACK_NR_MASK) == mtu_probe_seq
- && mtu_probe_seq != 0) {
- // we only had a single outstanding packet that timed out, and it was the probe
- mtu_ceiling = mtu_probe_size - 1;
- mtu_search_update();
- // this packet was most likely dropped because the packet size being
- // too big and not because congestion. To accelerate the binary search for
- // the MTU, resend immediately and don't reset the window size
- ignore_loss = true;
- log(UTP_LOG_MTU, "MTU [PROBE-TIMEOUT] floor:%d ceiling:%d current:%d"
- , mtu_floor, mtu_ceiling, mtu_last);
- }
- // we dropepd the probe, clear these fields to
- // allow us to send a new one
- mtu_probe_seq = mtu_probe_size = 0;
- log(UTP_LOG_MTU, "MTU [TIMEOUT]");
-
- /*
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
-
- // If there were a lot of retransmissions, force recomputation of round trip time
- if (pkt->transmissions >= 4)
- rtt = 0;
- */
-
- // Increase RTO
- const uint new_timeout = ignore_loss ? retransmit_timeout : retransmit_timeout * 2;
-
- // They initiated the connection but failed to respond before the rto.
- // A malicious client can also spoof the destination address of a ST_SYN bringing us to this state.
- // Kill the connection and do not notify the upper layer
- if (state == CS_SYN_RECV) {
- state = CS_DESTROY;
- utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
- return;
- }
-
- // We initiated the connection but the other side failed to respond before the rto
- if (retransmit_count >= 4 || (state == CS_SYN_SENT && retransmit_count >= 2)) {
- // 4 consecutive transmissions have timed out. Kill it. If we
- // haven't even connected yet, give up after only 2 consecutive
- // failed transmissions.
- if (state == CS_FIN_SENT)
- state = CS_DESTROY;
- else
- state = CS_RESET;
- utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
- return;
- }
-
- retransmit_timeout = new_timeout;
- rto_timeout = ctx->current_ms + new_timeout;
-
- if (!ignore_loss) {
- // On Timeout
- duplicate_ack = 0;
-
- int packet_size = get_packet_size();
-
- if ((cur_window_packets == 0) && ((int)max_window > packet_size)) {
- // we don't have any packets in-flight, even though
- // we could. This implies that the connection is just
- // idling. No need to be aggressive about resetting the
- // congestion window. Just let it decay by a 3:rd.
- // don't set it any lower than the packet size though
- max_window = max(max_window * 2 / 3, size_t(packet_size));
- } else {
- // our delay was so high that our congestion window
- // was shrunk below one packet, preventing us from
- // sending anything for one time-out period. Now, reset
- // the congestion window to fit one packet, to start over
- // again
- max_window = packet_size;
- slow_start = true;
- }
- }
-
- // every packet should be considered lost
- for (int i = 0; i < cur_window_packets; ++i) {
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
- if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
- pkt->need_resend = true;
- assert(cur_window >= pkt->payload);
- cur_window -= pkt->payload;
- }
-
- if (cur_window_packets > 0) {
- retransmit_count++;
- // used in parse_log.py
- log(UTP_LOG_NORMAL, "Packet timeout. Resend. seq_nr:%u. timeout:%u "
- "max_window:%u cur_window_packets:%d"
- , seq_nr - cur_window_packets, retransmit_timeout
- , (uint)max_window, int(cur_window_packets));
-
- fast_timeout = true;
- timeout_seq_nr = seq_nr;
-
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
- assert(pkt);
-
- // Re-send the packet.
- send_packet(pkt);
- }
- }
-
- // Mark the socket as writable. If the cwnd has grown, or if the number of
- // bytes in-flight is lower than cwnd, we need to make the socket writable again
- // in case it isn't
- if (state == CS_CONNECTED_FULL && !is_full()) {
- state = CS_CONNECTED;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
- (uint)max_window, (uint)cur_window, (uint)get_packet_size());
- #endif
- utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
- }
-
- if (state >= CS_CONNECTED && state < CS_GOT_FIN) {
- if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
- send_keep_alive();
- }
- }
- break;
- }
-
- // Close?
- case CS_GOT_FIN:
- case CS_DESTROY_DELAY:
- if ((int)(ctx->current_ms - rto_timeout) >= 0) {
- state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
- if (cur_window_packets > 0) {
- utp_call_on_error(ctx, this, UTP_ECONNRESET);
- }
- }
- break;
- // prevent warning
- case CS_UNINITIALIZED:
- case CS_IDLE:
- case CS_RESET:
- case CS_DESTROY:
- break;
- }
-}
-
-// this should be called every time we change mtu_floor or mtu_ceiling
-void UTPSocket::mtu_search_update()
-{
- assert(mtu_floor <= mtu_ceiling);
-
- // binary search
- mtu_last = (mtu_floor + mtu_ceiling) / 2;
-
- // enable a new probe to be sent
- mtu_probe_seq = mtu_probe_size = 0;
-
- // if the floor and ceiling are close enough, consider the
- // MTU binary search complete. We set the current value
- // to floor since that's the only size we know can go through
- // also set the ceiling to floor to terminate the searching
- if (mtu_ceiling - mtu_floor <= 16) {
- mtu_last = mtu_floor;
- log(UTP_LOG_MTU, "MTU [DONE] floor:%d ceiling:%d current:%d"
- , mtu_floor, mtu_ceiling, mtu_last);
- mtu_ceiling = mtu_floor;
- assert(mtu_floor <= mtu_ceiling);
- // Do another search in 30 minutes
- mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
- }
-}
-
-void UTPSocket::mtu_reset()
-{
- mtu_ceiling = get_udp_mtu();
- // Less would not pass TCP...
- mtu_floor = 576;
- log(UTP_LOG_MTU, "MTU [RESET] floor:%d ceiling:%d current:%d"
- , mtu_floor, mtu_ceiling, mtu_last);
- assert(mtu_floor <= mtu_ceiling);
- mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
-}
-
-// returns:
-// 0: the packet was acked.
-// 1: it means that the packet had already been acked
-// 2: the packet has not been sent yet
-int UTPSocket::ack_packet(uint16 seq)
-{
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
-
- // the packet has already been acked (or not sent)
- if (pkt == NULL) {
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "got ack for:%u (already acked, or never sent)", seq);
- #endif
-
- return 1;
- }
-
- // can't ack packets that haven't been sent yet!
- if (pkt->transmissions == 0) {
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
- seq, (uint)pkt->payload, pkt->need_resend);
- #endif
-
- return 2;
- }
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "got ack for:%u (pkt_size:%u need_resend:%u)",
- seq, (uint)pkt->payload, pkt->need_resend);
- #endif
-
- outbuf.put(seq, NULL);
-
- // if we never re-sent the packet, update the RTT estimate
- if (pkt->transmissions == 1) {
- // Estimate the round trip time.
- const uint32 ertt = (uint32)((utp_call_get_microseconds(this->ctx, this) - pkt->time_sent) / 1000);
- if (rtt == 0) {
- // First round trip time sample
- rtt = ertt;
- rtt_var = ertt / 2;
- // sanity check. rtt should never be more than 6 seconds
-// assert(rtt < 6000);
- } else {
- // Compute new round trip times
- const int delta = (int)rtt - ertt;
- rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
- rtt = rtt - rtt/8 + ertt/8;
- // sanity check. rtt should never be more than 6 seconds
-// assert(rtt < 6000);
- rtt_hist.add_sample(ertt, ctx->current_ms);
- }
- rto = max<uint>(rtt + rtt_var * 4, 1000);
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "rtt:%u avg:%u var:%u rto:%u",
- ertt, rtt, rtt_var, rto);
- #endif
-
- }
- retransmit_timeout = rto;
- rto_timeout = ctx->current_ms + rto;
- // if need_resend is set, this packet has already
- // been considered timed-out, and is not included in
- // the cur_window anymore
- if (!pkt->need_resend) {
- assert(cur_window >= pkt->payload);
- cur_window -= pkt->payload;
- }
- free(pkt);
- retransmit_count = 0;
- return 0;
-}
-
-// count the number of bytes that were acked by the EACK header
-size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
-{
- if (cur_window_packets == 0) return 0;
-
- size_t acked_bytes = 0;
- int bits = len * 8;
- uint64 now = utp_call_get_microseconds(this->ctx, this);
-
- do {
- uint v = base + bits;
-
- // ignore bits that haven't been sent yet
- // see comment in UTPSocket::selective_ack
- if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
- continue;
-
- // ignore bits that represents packets we haven't sent yet
- // or packets that have already been acked
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
- if (!pkt || pkt->transmissions == 0)
- continue;
-
- // Count the number of segments that were successfully received past it.
- if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
- assert((int)(pkt->payload) >= 0);
- acked_bytes += pkt->payload;
- if (pkt->time_sent < now)
- min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
- else
- min_rtt = min<int64>(min_rtt, 50000);
- continue;
- }
- } while (--bits >= -1);
- return acked_bytes;
-}
-
-enum { MAX_EACK = 128 };
-
-void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
-{
- if (cur_window_packets == 0) return;
-
- // the range is inclusive [0, 31] bits
- int bits = len * 8 - 1;
-
- int count = 0;
-
- // resends is a stack of sequence numbers we need to resend. Since we
- // iterate in reverse over the acked packets, at the end, the top packets
- // are the ones we want to resend
- int resends[MAX_EACK];
- int nr = 0;
-
-#if UTP_DEBUG_LOGGING
- char bitmask[1024] = {0};
- int counter = bits;
- for (int i = 0; i <= bits; ++i) {
- bool bit_set = counter >= 0 && mask[counter>>3] & (1 << (counter & 7));
- bitmask[i] = bit_set ? '1' : '0';
- --counter;
- }
-
- log(UTP_LOG_DEBUG, "Got EACK [%s] base:%u", bitmask, base);
-#endif
-
- do {
- // we're iterating over the bits from higher sequence numbers
- // to lower (kind of in reverse order, wich might not be very
- // intuitive)
- uint v = base + bits;
-
- // ignore bits that haven't been sent yet
- // and bits that fall below the ACKed sequence number
- // this can happen if an EACK message gets
- // reordered and arrives after a packet that ACKs up past
- // the base for thie EACK message
-
- // this is essentially the same as:
- // if v >= seq_nr || v <= seq_nr - cur_window_packets
- // but it takes wrapping into account
-
- // if v == seq_nr the -1 will make it wrap. if v > seq_nr
- // it will also wrap (since it will fall further below 0)
- // and be > cur_window_packets.
- // if v == seq_nr - cur_window_packets, the result will be
- // seq_nr - (seq_nr - cur_window_packets) - 1
- // == seq_nr - seq_nr + cur_window_packets - 1
- // == cur_window_packets - 1 which will be caught by the
- // test. If v < seq_nr - cur_window_packets the result will grow
- // fall furhter outside of the cur_window_packets range.
-
- // sequence number space:
- //
- // rejected < accepted > rejected
- // <============+--------------+============>
- // ^ ^
- // | |
- // (seq_nr-wnd) seq_nr
-
- if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
- continue;
-
- // this counts as a duplicate ack, even though we might have
- // received an ack for this packet previously (in another EACK
- // message for instance)
- bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
-
- // if this packet is acked, it counts towards the duplicate ack counter
- if (bit_set) count++;
-
- // ignore bits that represents packets we haven't sent yet
- // or packets that have already been acked
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
- if (!pkt || pkt->transmissions == 0) {
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "skipping %u. pkt:%08x transmissions:%u %s",
- v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
- #endif
- continue;
- }
-
- // Count the number of segments that were successfully received past it.
- if (bit_set) {
- // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
- assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
- ack_packet(v);
- continue;
- }
-
- // Resend segments
- // if count is less than our re-send limit, we haven't seen enough
- // acked packets in front of this one to warrant a re-send.
- // if count == 0, we're still going through the tail of zeroes
- if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
- count >= DUPLICATE_ACKS_BEFORE_RESEND) {
- // resends is a stack, and we're mostly interested in the top of it
- // if we're full, just throw away the lower half
- if (nr >= MAX_EACK - 2) {
- memmove(resends, &resends[MAX_EACK/2], MAX_EACK/2 * sizeof(resends[0]));
- nr -= MAX_EACK / 2;
- }
- resends[nr++] = v;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "no ack for %u", v);
- #endif
-
- } else {
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
- v, count, duplicate_ack, fast_resend_seq_nr);
- #endif
- }
- } while (--bits >= -1);
-
- if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
- count >= DUPLICATE_ACKS_BEFORE_RESEND) {
- // if we get enough duplicate acks to start
- // resending, the first packet we should resend
- // is base-1
- resends[nr++] = (base - 1) & ACK_NR_MASK;
-
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "no ack for %u", (base - 1) & ACK_NR_MASK);
- #endif
-
- } else {
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
- base - 1, count, duplicate_ack, fast_resend_seq_nr);
- #endif
- }
-
- bool back_off = false;
- int i = 0;
- while (nr > 0) {
- uint v = resends[--nr];
- // don't consider the tail of 0:es to be lost packets
- // only unacked packets with acked packets after should
- // be considered lost
- OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
-
- // this may be an old (re-ordered) packet, and some of the
- // packets in here may have been acked already. In which
- // case they will not be in the send queue anymore
- if (!pkt) continue;
-
- // used in parse_log.py
- log(UTP_LOG_NORMAL, "Packet %u lost. Resending", v);
-
- // On Loss
- back_off = true;
-
- #ifdef _DEBUG
- ++_stats.rexmit;
- #endif
-
- send_packet(pkt);
- fast_resend_seq_nr = (v + 1) & ACK_NR_MASK;
-
- // Re-send max 4 packets.
- if (++i >= 4) break;
- }
-
- if (back_off)
- maybe_decay_win(ctx->current_ms);
-
- duplicate_ack = count;
-}
-
-void UTPSocket::apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
-{
- // the delay can never be greater than the rtt. The min_rtt
- // variable is the RTT in microseconds
-
- assert(min_rtt >= 0);
- int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
- assert(our_delay != INT_MAX);
- assert(our_delay >= 0);
-
- utp_call_on_delay_sample(this->ctx, this, our_delay / 1000);
-
- // This test the connection under heavy load from foreground
- // traffic. Pretend that our delays are very high to force the
- // connection to use sub-packet size window sizes
- //our_delay *= 4;
-
- // target is microseconds
- int target = target_delay;
- if (target <= 0) target = 100000;
-
- // this is here to compensate for very large clock drift that affects
- // the congestion controller into giving certain endpoints an unfair
- // share of the bandwidth. We have an estimate of the clock drift
- // (clock_drift). The unit of this is microseconds per 5 seconds.
- // empirically, a reasonable cut-off appears to be about 200000
- // (which is pretty high). The main purpose is to compensate for
- // people trying to "cheat" uTP by making their clock run slower,
- // and this definitely catches that without any risk of false positives
- // if clock_drift < -200000 start applying a penalty delay proportional
- // to how far beoynd -200000 the clock drift is
- int32 penalty = 0;
- if (clock_drift < -200000) {
- penalty = (-clock_drift - 200000) / 7;
- our_delay += penalty;
- }
-
- double off_target = target - our_delay;
-
- // this is the same as:
- //
- // (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
- //
- // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
- // of the target delay the current delay represents.
- // The min() around off_target protects against crazy values of our_delay, which may happen when th
- // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
- // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
- // as for large negative numbers, this direction is already capped at the min packet size further down
- // the min around the bytes_acked protects against the case where the window size was recently
- // shrunk and the number of acked bytes exceeds that. This is considered no more than one full
- // window, in order to keep the gain within sane boundries.
-
- assert(bytes_acked > 0);
- double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
-
- double delay_factor = off_target / target;
- double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
-
- // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
- // may increase per RTT, we may not increase the window size more than that proportional
- // to the number of bytes that were acked, so that once one window has been acked (one rtt)
- // the increase limit is not exceeded
- // the +1. is to allow for floating point imprecision
- assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
-
- if (scaled_gain > 0 && ctx->current_ms - last_maxed_out_window > 1000) {
- // if it was more than 1 second since we tried to send a packet
- // and stopped because we hit the max window, we're most likely rate
- // limited (which prevents us from ever hitting the window size)
- // if this is the case, we cannot let the max_window grow indefinitely
- scaled_gain = 0;
- }
-
- size_t ledbat_cwnd = (max_window + scaled_gain < MIN_WINDOW_SIZE) ? MIN_WINDOW_SIZE : (size_t)(max_window + scaled_gain);
-
- if (slow_start) {
- size_t ss_cwnd = (size_t)(max_window + window_factor*get_packet_size());
- if (ss_cwnd > ssthresh) {
- slow_start = false;
- } else if (our_delay > target*0.9) {
- // even if we're a little under the target delay, we conservatively
- // discontinue the slow start phase
- slow_start = false;
- ssthresh = max_window;
- } else {
- max_window = max(ss_cwnd, ledbat_cwnd);
- }
- } else {
- max_window = ledbat_cwnd;
- }
-
-
- // make sure that the congestion window is below max
- // make sure that we don't shrink our window too small
- max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
-
- // used in parse_log.py
- log(UTP_LOG_NORMAL, "actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
- "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
- "scaled_gain:%f rtt:%u rate:%u wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" "
- "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u "
- "average_delay:%d clock_drift:%d clock_drift_raw:%d delay_penalty:%d current_delay_sum:"I64u
- "current_delay_samples:%d average_delay_base:%d last_maxed_out_window:"I64u" opt_sndbuf:%d "
- "current_ms:"I64u"",
- actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
- int(off_target / 1000), uint(max_window), uint32(our_hist.delay_base),
- int((our_delay + their_hist.get_value()) / 1000), int(target / 1000), uint(bytes_acked),
- (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
- (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
- (uint)max_window_user, rto, (int)(rto_timeout - ctx->current_ms),
- utp_call_get_microseconds(this->ctx, this), cur_window_packets, (uint)get_packet_size(),
- their_hist.delay_base, their_hist.delay_base + their_hist.get_value(),
- average_delay, clock_drift, clock_drift_raw, penalty / 1000,
- current_delay_sum, current_delay_samples, average_delay_base,
- uint64(last_maxed_out_window), int(opt_sndbuf), uint64(ctx->current_ms));
-}
-
-static void utp_register_recv_packet(UTPSocket *conn, size_t len)
-{
- #ifdef _DEBUG
- ++conn->_stats.nrecv;
- conn->_stats.nbytes_recv += len;
- #endif
-
- if (len <= PACKET_SIZE_MID) {
- if (len <= PACKET_SIZE_EMPTY) {
- conn->ctx->context_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
- } else if (len <= PACKET_SIZE_SMALL) {
- conn->ctx->context_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
- } else
- conn->ctx->context_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
- } else {
- if (len <= PACKET_SIZE_BIG) {
- conn->ctx->context_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
- } else
- conn->ctx->context_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
- }
-}
-
-// returns the max number of bytes of payload the uTP
-// connection is allowed to send
-size_t UTPSocket::get_packet_size() const
-{
- int header_size = sizeof(PacketFormatV1);
- size_t mtu = mtu_last ? mtu_last : mtu_ceiling;
- return mtu - header_size;
-}
-
-// Process an incoming packet
-// syn is true if this is the first packet received. It will cut off parsing
-// as soon as the header is done
-size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
-{
- utp_register_recv_packet(conn, len);
-
- conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
-
- const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
- const byte *packet_end = packet + len;
-
- uint16 pk_seq_nr = pf1->seq_nr;
- uint16 pk_ack_nr = pf1->ack_nr;
- uint8 pk_flags = pf1->type();
-
- if (pk_flags >= ST_NUM_STATES) return 0;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Got %s. seq_nr:%u ack_nr:%u state:%s timestamp:"I64u" reply_micro:%u"
- , flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state]
- , uint64(pf1->tv_usec), (uint32)(pf1->reply_micro));
- #endif
-
- // mark receipt time
- uint64 time = utp_call_get_microseconds(conn->ctx, conn);
-
- // window packets size is used to calculate a minimum
- // permissible range for received acks. connections with acks falling
- // out of this range are dropped
- const uint16 curr_window = max<uint16>(conn->cur_window_packets + ACK_NR_ALLOWED_WINDOW, ACK_NR_ALLOWED_WINDOW);
-
- // ignore packets whose ack_nr is invalid. This would imply a spoofed address
- // or a malicious attempt to attach the uTP implementation.
- // acking a packet that hasn't been sent yet!
- // SYN packets have an exception, since there are no previous packets
- if ((pk_flags != ST_SYN || conn->state != CS_SYN_RECV) &&
- (wrapping_compare_less(conn->seq_nr - 1, pk_ack_nr, ACK_NR_MASK)
- || wrapping_compare_less(pk_ack_nr, conn->seq_nr - 1 - curr_window, ACK_NR_MASK))) {
-#if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Invalid ack_nr: %u. our seq_nr: %u last unacked: %u"
- , pk_ack_nr, conn->seq_nr, (conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK);
-#endif
- return 0;
- }
-
- // RSTs are handled earlier, since the connid matches the send id not the recv id
- assert(pk_flags != ST_RESET);
-
- // TODO: maybe send a ST_RESET if we're in CS_RESET?
-
- const byte *selack_ptr = NULL;
-
- // Unpack UTP packet options
- // Data pointer
- const byte *data = (const byte*)pf1 + conn->get_header_size();
- if (conn->get_header_size() > len) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Invalid packet size (less than header size)");
- #endif
-
- return 0;
- }
- // Skip the extension headers
- uint extension = pf1->ext;
- if (extension != 0) {
- do {
- // Verify that the packet is valid.
- data += 2;
-
- if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Invalid len of extensions");
- #endif
-
- return 0;
- }
-
- switch(extension) {
- case 1: // Selective Acknowledgment
- selack_ptr = data;
- break;
- case 2: // extension bits
- if (data[-1] != 8) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Invalid len of extension bits header");
- #endif
-
- return 0;
- }
- memcpy(conn->extensions, data, 8);
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x",
- conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
- conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
- #endif
- }
- extension = data[-2];
- data += data[-1];
- } while (extension);
- }
-
- if (conn->state == CS_SYN_SENT) {
- // if this is a syn-ack, initialize our ack_nr
- // to match the sequence number we got from
- // the other end
- conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
- }
-
- conn->last_got_packet = conn->ctx->current_ms;
-
- if (syn) {
- return 0;
- }
-
- // seqnr is the number of packets past the expected
- // packet this is. ack_nr is the last acked, seq_nr is the
- // current. Subtracring 1 makes 0 mean "this is the next
- // expected packet".
- const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
-
- // Getting an invalid sequence number?
- if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
- if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
- conn->schedule_ack();
- }
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, " Got old Packet/Ack (%u/%u)=%u"
- , pk_seq_nr, conn->ack_nr, seqnr);
- #endif
- return 0;
- }
-
- // Process acknowledgment
- // acks is the number of packets that was acked
- int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
-
- // this happens when we receive an old ack nr
- if (acks > conn->cur_window_packets) acks = 0;
-
- // if we get the same ack_nr as in the last packet
- // increase the duplicate_ack counter, otherwise reset
- // it to 0
- if (conn->cur_window_packets > 0) {
- if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK)
- && conn->cur_window_packets > 0) {
- ++conn->duplicate_ack;
- if (conn->duplicate_ack == DUPLICATE_ACKS_BEFORE_RESEND && conn->mtu_probe_seq) {
- // It's likely that the probe was rejected due to its size, but we haven't got an
- // ICMP report back yet
- if (pk_ack_nr == ((conn->mtu_probe_seq - 1) & ACK_NR_MASK)) {
- conn->mtu_ceiling = conn->mtu_probe_size - 1;
- conn->mtu_search_update();
- conn->log(UTP_LOG_MTU, "MTU [DUPACK] floor:%d ceiling:%d current:%d"
- , conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
- } else {
- // A non-probe was blocked before our probe.
- // Can't conclude much, send a new probe
- conn->mtu_probe_seq = conn->mtu_probe_size = 0;
- }
- }
- } else {
- conn->duplicate_ack = 0;
- }
-
- // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
- // and fast_resend_seq_nr <= ack_nr + 1
- // resend ack_nr + 1
- // also call maybe_decay_win()
- }
-
- // figure out how many bytes were acked
- size_t acked_bytes = 0;
-
- // the minimum rtt of all acks
- // this is the upper limit on the delay we get back
- // from the other peer. Our delay cannot exceed
- // the rtt of the packet. If it does, clamp it.
- // this is done in apply_ledbat_ccontrol()
- int64 min_rtt = INT64_MAX;
-
- uint64 now = utp_call_get_microseconds(conn->ctx, conn);
-
- for (int i = 0; i < acks; ++i) {
- int seq = (conn->seq_nr - conn->cur_window_packets + i) & ACK_NR_MASK;
- OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
- if (pkt == 0 || pkt->transmissions == 0) continue;
- assert((int)(pkt->payload) >= 0);
- acked_bytes += pkt->payload;
- if (conn->mtu_probe_seq && seq == conn->mtu_probe_seq) {
- conn->mtu_floor = conn->mtu_probe_size;
- conn->mtu_search_update();
- conn->log(UTP_LOG_MTU, "MTU [ACK] floor:%d ceiling:%d current:%d"
- , conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
- }
-
- // in case our clock is not monotonic
- if (pkt->time_sent < now)
- min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
- else
- min_rtt = min<int64>(min_rtt, 50000);
- }
-
- // count bytes acked by EACK
- if (selack_ptr != NULL) {
- acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
- selack_ptr, selack_ptr[-1], min_rtt);
- }
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
- acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
- seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
- #endif
-
- uint64 p = pf1->tv_usec;
-
- conn->last_measured_delay = conn->ctx->current_ms;
-
- // get delay in both directions
- // record the delay to report back
- const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
- conn->reply_micro = their_delay;
- uint32 prev_delay_base = conn->their_hist.delay_base;
- if (their_delay != 0) conn->their_hist.add_sample(their_delay, conn->ctx->current_ms);
-
- // if their new delay base is less than their previous one
- // we should shift our delay base in the other direction in order
- // to take the clock skew into account
- if (prev_delay_base != 0 &&
- wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base, TIMESTAMP_MASK)) {
- // never adjust more than 10 milliseconds
- if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
- conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
- }
- }
-
- const uint32 actual_delay = (uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
-
- // if the actual delay is 0, it means the other end
- // hasn't received a sample from us yet, and doesn't
- // know what it is. We can't update out history unless
- // we have a true measured sample
- prev_delay_base = conn->our_hist.delay_base;
- if (actual_delay != 0) {
- conn->our_hist.add_sample(actual_delay, conn->ctx->current_ms);
-
- // this is keeping an average of the delay samples
- // we've recevied within the last 5 seconds. We sum
- // all the samples and increase the count in order to
- // calculate the average every 5 seconds. The samples
- // are based off of the average_delay_base to deal with
- // wrapping counters.
- if (conn->average_delay_base == 0) conn->average_delay_base = actual_delay;
- int64 average_delay_sample = 0;
- // distance walking from lhs to rhs, downwards
- const uint32 dist_down = conn->average_delay_base - actual_delay;
- // distance walking from lhs to rhs, upwards
- const uint32 dist_up = actual_delay - conn->average_delay_base;
-
- if (dist_down > dist_up) {
-// assert(dist_up < INT_MAX / 4);
- // average_delay_base < actual_delay, we should end up
- // with a positive sample
- average_delay_sample = dist_up;
- } else {
-// assert(-int64(dist_down) < INT_MAX / 4);
- // average_delay_base >= actual_delay, we should end up
- // with a negative sample
- average_delay_sample = -int64(dist_down);
- }
- conn->current_delay_sum += average_delay_sample;
- ++conn->current_delay_samples;
-
- if (conn->ctx->current_ms > conn->average_sample_time) {
-
- int32 prev_average_delay = conn->average_delay;
-
- assert(conn->current_delay_sum / conn->current_delay_samples < INT_MAX);
- assert(conn->current_delay_sum / conn->current_delay_samples > -INT_MAX);
- // write the new average
- conn->average_delay = (int32)(conn->current_delay_sum / conn->current_delay_samples);
- // each slot represents 5 seconds
- conn->average_sample_time += 5000;
-
- conn->current_delay_sum = 0;
- conn->current_delay_samples = 0;
-
- // this makes things very confusing when logging the average delay
-//#if !g_log_utp
- // normalize the average samples
- // since we're only interested in the slope
- // of the curve formed by the average delay samples,
- // we can cancel out the actual offset to make sure
- // we won't have problems with wrapping.
- int min_sample = min(prev_average_delay, conn->average_delay);
- int max_sample = max(prev_average_delay, conn->average_delay);
-
- // normalize around zero. Try to keep the min <= 0 and max >= 0
- int adjust = 0;
- if (min_sample > 0) {
- // adjust all samples (and the baseline) down by min_sample
- adjust = -min_sample;
- } else if (max_sample < 0) {
- // adjust all samples (and the baseline) up by -max_sample
- adjust = -max_sample;
- }
- if (adjust) {
- conn->average_delay_base -= adjust;
- conn->average_delay += adjust;
- prev_average_delay += adjust;
- }
-//#endif
-
- // update the clock drift estimate
- // the unit is microseconds per 5 seconds
- // what we're doing is just calculating the average of the
- // difference between each slot. Since each slot is 5 seconds
- // and the timestamps unit are microseconds, we'll end up with
- // the average slope across our history. If there is a consistent
- // trend, it will show up in this value
-
- //int64 slope = 0;
- int32 drift = conn->average_delay - prev_average_delay;
-
- // clock_drift is a rolling average
- conn->clock_drift = (int64(conn->clock_drift) * 7 + drift) / 8;
- conn->clock_drift_raw = drift;
- }
- }
-
- // if our new delay base is less than our previous one
- // we should shift the other end's delay base in the other
- // direction in order to take the clock skew into account
- // This is commented out because it creates bad interactions
- // with our adjustment in the other direction. We don't really
- // need our estimates of the other peer to be very accurate
- // anyway. The problem with shifting here is that we're more
- // likely shift it back later because of a low latency. This
- // second shift back would cause us to shift our delay base
- // which then get's into a death spiral of shifting delay bases
-/* if (prev_delay_base != 0 &&
- wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
- // never adjust more than 10 milliseconds
- if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
- conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
- }
- }
-*/
-
- // if the delay estimate exceeds the RTT, adjust the base_delay to
- // compensate
- assert(min_rtt >= 0);
- if (int64(conn->our_hist.get_value()) > min_rtt) {
- conn->our_hist.shift((uint32)(conn->our_hist.get_value() - min_rtt));
- }
-
- // only apply the congestion controller on acks
- // if we don't have a delay measurement, there's
- // no point in invoking the congestion control
- if (actual_delay != 0 && acked_bytes >= 1)
- conn->apply_ccontrol(acked_bytes, actual_delay, min_rtt);
-
- // sanity check, the other end should never ack packets
- // past the point we've sent
- if (acks <= conn->cur_window_packets) {
- conn->max_window_user = pf1->windowsize;
-
- // If max user window is set to 0, then we startup a timer
- // That will reset it to 1 after 15 seconds.
- if (conn->max_window_user == 0)
- // Reset max_window_user to 1 every 15 seconds.
- conn->zerowindow_time = conn->ctx->current_ms + 15000;
-
- // Respond to connect message
- // Switch to CONNECTED state.
- // If this is an ack and we're in still handshaking
- // transition over to the connected state.
-
- // Incoming connection completion
- if (pk_flags == ST_DATA && conn->state == CS_SYN_RECV) {
- conn->state = CS_CONNECTED;
- }
-
- // Outgoing connection completion
- if (pk_flags == ST_STATE && conn->state == CS_SYN_SENT) {
- conn->state = CS_CONNECTED;
-
- // If the user has defined the ON_CONNECT callback, use that to
- // notify the user that the socket is now connected. If ON_CONNECT
- // has not been defined, notify the user via ON_STATE_CHANGE.
- if (conn->ctx->callbacks[UTP_ON_CONNECT])
- utp_call_on_connect(conn->ctx, conn);
- else
- utp_call_on_state_change(conn->ctx, conn, UTP_STATE_CONNECT);
-
- // We've sent a fin, and everything was ACKed (including the FIN),
- // it's safe to destroy the socket. cur_window_packets == acks
- // means that this packet acked all the remaining packets that
- // were in-flight.
- } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
- conn->state = CS_DESTROY;
- }
-
- // Update fast resend counter
- if (wrapping_compare_less(conn->fast_resend_seq_nr
- , (pk_ack_nr + 1) & ACK_NR_MASK, ACK_NR_MASK))
- conn->fast_resend_seq_nr = (pk_ack_nr + 1) & ACK_NR_MASK;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "fast_resend_seq_nr:%u", conn->fast_resend_seq_nr);
- #endif
-
- for (int i = 0; i < acks; ++i) {
- int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
- // if ack_status is 0, the packet was acked.
- // if acl_stauts is 1, it means that the packet had already been acked
- // if it's 2, the packet has not been sent yet
- // We need to break this loop in the latter case. This could potentially
- // happen if we get an ack_nr that does not exceed what we have stuffed
- // into the outgoing buffer, but does exceed what we have sent
- if (ack_status == 2) {
- #ifdef _DEBUG
- OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
- assert(pkt->transmissions == 0);
- #endif
-
- break;
- }
- conn->cur_window_packets--;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
- #endif
-
- }
-
- #ifdef _DEBUG
- if (conn->cur_window_packets == 0)
- assert(conn->cur_window == 0);
- #endif
-
- // packets in front of this may have been acked by a
- // selective ack (EACK). Keep decreasing the window packet size
- // until we hit a packet that is still waiting to be acked
- // in the send queue
- // this is especially likely to happen when the other end
- // has the EACK send bug older versions of uTP had
- while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)) {
- conn->cur_window_packets--;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
- #endif
-
- }
-
- #ifdef _DEBUG
- if (conn->cur_window_packets == 0)
- assert(conn->cur_window == 0);
- #endif
-
- // this invariant should always be true
- assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
-
- // flush Nagle
- if (conn->cur_window_packets == 1) {
- OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
- // do we still have quota?
- if (pkt->transmissions == 0) {
- conn->send_packet(pkt);
- }
- }
-
- // Fast timeout-retry
- if (conn->fast_timeout) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
- #endif
-
- // if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
- // resent the packet that timed out, and we should leave the fast-timeout mode.
- if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
- conn->fast_timeout = false;
- } else {
- // resend the oldest packet and increment fast_resend_seq_nr
- // to not allow another fast resend on it again
- OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
- if (pkt && pkt->transmissions > 0) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Packet %u fast timeout-retry.", conn->seq_nr - conn->cur_window_packets);
- #endif
-
- #ifdef _DEBUG
- ++conn->_stats.fastrexmit;
- #endif
-
- conn->fast_resend_seq_nr++;
- conn->send_packet(pkt);
- }
- }
- }
- }
-
- // Process selective acknowledgent
- if (selack_ptr != NULL) {
- conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
- }
-
- // this invariant should always be true
- assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u ",
- acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets);
- #endif
-
- // In case the ack dropped the current window below
- // the max_window size, Mark the socket as writable
- if (conn->state == CS_CONNECTED_FULL && !conn->is_full()) {
- conn->state = CS_CONNECTED;
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
- (uint)conn->max_window, (uint)conn->cur_window, (uint)conn->get_packet_size());
- #endif
- utp_call_on_state_change(conn->ctx, conn, UTP_STATE_WRITABLE);
- }
-
- if (pk_flags == ST_STATE) {
- // This is a state packet only.
- return 0;
- }
-
- // The connection is not in a state that can accept data?
- if (conn->state != CS_CONNECTED &&
- conn->state != CS_CONNECTED_FULL &&
- conn->state != CS_FIN_SENT) {
- return 0;
- }
-
- // Is this a finalize packet?
- if (pk_flags == ST_FIN && !conn->got_fin) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Got FIN eof_pkt:%u", pk_seq_nr);
- #endif
-
- conn->got_fin = true;
- conn->eof_pkt = pk_seq_nr;
- // at this point, it is possible for the
- // other end to have sent packets with
- // sequence numbers higher than seq_nr.
- // if this is the case, our reorder_count
- // is out of sync. This case is dealt with
- // when we re-order and hit the eof_pkt.
- // we'll just ignore any packets with
- // sequence numbers past this
- }
-
- // Getting an in-order packet?
- if (seqnr == 0) {
- size_t count = packet_end - data;
- if (count > 0 && conn->state != CS_FIN_SENT) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Got Data len:%u (rb:%u)", (uint)count, (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
- #endif
-
- // Post bytes to the upper layer
- utp_call_on_read(conn->ctx, conn, data, count);
- }
- conn->ack_nr++;
-
- // Check if the next packet has been received too, but waiting
- // in the reorder buffer.
- for (;;) {
-
- if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
- if (conn->state != CS_FIN_SENT) {
- conn->state = CS_GOT_FIN;
- conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
-
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Posting EOF");
- #endif
-
- utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
- }
-
- // if the other end wants to close, ack
- conn->send_ack();
-
- // reorder_count is not necessarily 0 at this point.
- // even though it is most of the time, the other end
- // may have sent packets with higher sequence numbers
- // than what later end up being eof_pkt
- // since we have received all packets up to eof_pkt
- // just ignore the ones after it.
- conn->reorder_count = 0;
- }
-
- // Quick get-out in case there is nothing to reorder
- if (conn->reorder_count == 0)
- break;
-
- // Check if there are additional buffers in the reorder buffers
- // that need delivery.
- byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
- if (p == NULL)
- break;
- conn->inbuf.put(conn->ack_nr+1, NULL);
- count = *(uint*)p;
- if (count > 0 && conn->state != CS_FIN_SENT) {
- // Pass the bytes to the upper layer
- utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
- }
- conn->ack_nr++;
-
- // Free the element from the reorder buffer
- free(p);
- assert(conn->reorder_count > 0);
- conn->reorder_count--;
- }
-
- conn->schedule_ack();
- } else {
- // Getting an out of order packet.
- // The packet needs to be remembered and rearranged later.
-
- // if we have received a FIN packet, and the EOF-sequence number
- // is lower than the sequence number of the packet we just received
- // something is wrong.
- if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Got an invalid packet sequence number, past EOF "
- "reorder_count:%u len:%u (rb:%u)",
- conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
- #endif
- return 0;
- }
-
- // if the sequence number is entirely off the expected
- // one, just drop it. We can't allocate buffer space in
- // the inbuf entirely based on untrusted input
- if (seqnr > 0x3ff) {
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "0x%08x: Got an invalid packet sequence number, too far off "
- "reorder_count:%u len:%u (rb:%u)",
- conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
- #endif
- return 0;
- }
-
- // we need to grow the circle buffer before we
- // check if the packet is already in here, so that
- // we don't end up looking at an older packet (since
- // the indices wraps around).
- conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
-
- // Has this packet already been received? (i.e. a duplicate)
- // If that is the case, just discard it.
- if (conn->inbuf.get(pk_seq_nr) != NULL) {
- #ifdef _DEBUG
- ++conn->_stats.nduprecv;
- #endif
-
- return 0;
- }
-
- // Allocate memory to fit the packet that needs to re-ordered
- byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
- *(uint*)mem = (uint)(packet_end - data);
- memcpy(mem + sizeof(uint), data, packet_end - data);
-
- // Insert into reorder buffer and increment the count
- // of # of packets to be reordered.
- // we add one to seqnr in order to leave the last
- // entry empty, that way the assert in send_ack
- // is valid. we have to add one to seqnr too, in order
- // to make the circular buffer grow around the correct
- // point (which is conn->ack_nr + 1).
- assert(conn->inbuf.get(pk_seq_nr) == NULL);
- assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
- conn->inbuf.put(pk_seq_nr, mem);
- conn->reorder_count++;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
- conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
- #endif
-
- conn->schedule_ack();
- }
-
- return (size_t)(packet_end - data);
-}
-
-inline byte UTP_Version(PacketFormatV1 const* pf)
-{
- return (pf->type() < ST_NUM_STATES && pf->ext < 3 ? pf->version() : 0);
-}
-
-UTPSocket::~UTPSocket()
-{
- #if UTP_DEBUG_LOGGING
- log(UTP_LOG_DEBUG, "Killing socket");
- #endif
-
- utp_call_on_state_change(ctx, this, UTP_STATE_DESTROYING);
-
- if (ctx->last_utp_socket == this) {
- ctx->last_utp_socket = NULL;
- }
-
- // Remove object from the global hash table
- UTPSocketKeyData* kd = ctx->utp_sockets->Delete(UTPSocketKey(addr, conn_id_recv));
- assert(kd);
-
- // remove the socket from ack_sockets if it was there also
- removeSocketFromAckList(this);
-
- // Free all memory occupied by the socket object.
- for (size_t i = 0; i <= inbuf.mask; i++) {
- free(inbuf.elements[i]);
- }
- for (size_t i = 0; i <= outbuf.mask; i++) {
- free(outbuf.elements[i]);
- }
- // TODO: The circular buffer should have a destructor
- free(inbuf.elements);
- free(outbuf.elements);
-}
-
-void UTP_FreeAll(struct UTPSocketHT *utp_sockets) {
- utp_hash_iterator_t it;
- UTPSocketKeyData* keyData;
- while ((keyData = utp_sockets->Iterate(it))) {
- delete keyData->socket;
- }
-}
-
-void utp_initialize_socket( utp_socket *conn,
- const struct sockaddr *addr,
- socklen_t addrlen,
- bool need_seed_gen,
- uint32 conn_seed,
- uint32 conn_id_recv,
- uint32 conn_id_send)
-{
- PackedSockAddr psaddr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
-
- if (need_seed_gen) {
- do {
- conn_seed = utp_call_get_random(conn->ctx, conn);
- // we identify v1 and higher by setting the first two bytes to 0x0001
- conn_seed &= 0xffff;
- } while (conn->ctx->utp_sockets->Lookup(UTPSocketKey(psaddr, conn_seed)));
-
- conn_id_recv += conn_seed;
- conn_id_send += conn_seed;
- }
-
- conn->state = CS_IDLE;
- conn->conn_seed = conn_seed;
- conn->conn_id_recv = conn_id_recv;
- conn->conn_id_send = conn_id_send;
- conn->addr = psaddr;
- conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, NULL);
- conn->last_got_packet = conn->ctx->current_ms;
- conn->last_sent_packet = conn->ctx->current_ms;
- conn->last_measured_delay = conn->ctx->current_ms + 0x70000000;
- conn->average_sample_time = conn->ctx->current_ms + 5000;
- conn->last_rwin_decay = conn->ctx->current_ms - MAX_WINDOW_DECAY;
-
- conn->our_hist.clear(conn->ctx->current_ms);
- conn->their_hist.clear(conn->ctx->current_ms);
- conn->rtt_hist.clear(conn->ctx->current_ms);
-
- // initialize MTU floor and ceiling
- conn->mtu_reset();
- conn->mtu_last = conn->mtu_ceiling;
-
- conn->ctx->utp_sockets->Add(UTPSocketKey(conn->addr, conn->conn_id_recv))->socket = conn;
-
- // we need to fit one packet in the window when we start the connection
- conn->max_window = conn->get_packet_size();
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "UTP socket initialized");
- #endif
-}
-
-utp_socket* utp_create_socket(utp_context *ctx)
-{
- assert(ctx);
- if (!ctx) return NULL;
-
- UTPSocket *conn = new UTPSocket; // TODO: UTPSocket should have a constructor
-
- conn->state = CS_UNINITIALIZED;
- conn->ctx = ctx;
- conn->userdata = NULL;
- conn->reorder_count = 0;
- conn->duplicate_ack = 0;
- conn->timeout_seq_nr = 0;
- conn->last_rcv_win = 0;
- conn->got_fin = false;
- conn->fast_timeout = false;
- conn->rtt = 0;
- conn->retransmit_timeout = 0;
- conn->rto_timeout = 0;
- conn->zerowindow_time = 0;
- conn->average_delay = 0;
- conn->current_delay_samples = 0;
- conn->cur_window = 0;
- conn->eof_pkt = 0;
- conn->last_maxed_out_window = 0;
- conn->mtu_probe_seq = 0;
- conn->mtu_probe_size = 0;
- conn->current_delay_sum = 0;
- conn->average_delay_base = 0;
- conn->retransmit_count = 0;
- conn->rto = 3000;
- conn->rtt_var = 800;
- conn->seq_nr = 1;
- conn->ack_nr = 0;
- conn->max_window_user = 255 * PACKET_SIZE;
- conn->cur_window_packets = 0;
- conn->fast_resend_seq_nr = conn->seq_nr;
- conn->target_delay = ctx->target_delay;
- conn->reply_micro = 0;
- conn->opt_sndbuf = ctx->opt_sndbuf;
- conn->opt_rcvbuf = ctx->opt_rcvbuf;
- conn->slow_start = true;
- conn->ssthresh = conn->opt_sndbuf;
- conn->clock_drift = 0;
- conn->clock_drift_raw = 0;
- conn->outbuf.mask = 15;
- conn->inbuf.mask = 15;
- conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
- conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
- conn->ida = -1; // set the index of every new socket in ack_sockets to
- // -1, which also means it is not in ack_sockets yet
-
- memset(conn->extensions, 0, sizeof(conn->extensions));
-
- #ifdef _DEBUG
- memset(&conn->_stats, 0, sizeof(utp_socket_stats));
- #endif
-
- return conn;
-}
-
-int utp_context_set_option(utp_context *ctx, int opt, int val)
-{
- assert(ctx);
- if (!ctx) return -1;
-
- switch (opt) {
- case UTP_LOG_NORMAL:
- ctx->log_normal = val ? true : false;
- return 0;
-
- case UTP_LOG_MTU:
- ctx->log_mtu = val ? true : false;
- return 0;
-
- case UTP_LOG_DEBUG:
- ctx->log_debug = val ? true : false;
- return 0;
-
- case UTP_TARGET_DELAY:
- ctx->target_delay = val;
- return 0;
-
- case UTP_SNDBUF:
- assert(val >= 1);
- ctx->opt_sndbuf = val;
- return 0;
-
- case UTP_RCVBUF:
- assert(val >= 1);
- ctx->opt_rcvbuf = val;
- return 0;
- }
- return -1;
-}
-
-int utp_context_get_option(utp_context *ctx, int opt)
-{
- assert(ctx);
- if (!ctx) return -1;
-
- switch (opt) {
- case UTP_LOG_NORMAL: return ctx->log_normal ? 1 : 0;
- case UTP_LOG_MTU: return ctx->log_mtu ? 1 : 0;
- case UTP_LOG_DEBUG: return ctx->log_debug ? 1 : 0;
- case UTP_TARGET_DELAY: return ctx->target_delay;
- case UTP_SNDBUF: return ctx->opt_sndbuf;
- case UTP_RCVBUF: return ctx->opt_rcvbuf;
- }
- return -1;
-}
-
-
-int utp_setsockopt(UTPSocket* conn, int opt, int val)
-{
- assert(conn);
- if (!conn) return -1;
-
- switch (opt) {
-
- case UTP_SNDBUF:
- assert(val >= 1);
- conn->opt_sndbuf = val;
- return 0;
-
- case UTP_RCVBUF:
- assert(val >= 1);
- conn->opt_rcvbuf = val;
- return 0;
-
- case UTP_TARGET_DELAY:
- conn->target_delay = val;
- return 0;
- }
-
- return -1;
-}
-
-int utp_getsockopt(UTPSocket* conn, int opt)
-{
- assert(conn);
- if (!conn) return -1;
-
- switch (opt) {
- case UTP_SNDBUF: return conn->opt_sndbuf;
- case UTP_RCVBUF: return conn->opt_rcvbuf;
- case UTP_TARGET_DELAY: return conn->target_delay;
- }
-
- return -1;
-}
-
-// Try to connect to a specified host.
-int utp_connect(utp_socket *conn, const struct sockaddr *to, socklen_t tolen)
-{
- assert(conn);
- if (!conn) return -1;
-
- assert(conn->state == CS_UNINITIALIZED);
- if (conn->state != CS_UNINITIALIZED) {
- conn->state = CS_DESTROY;
- return -1;
- }
-
- utp_initialize_socket(conn, to, tolen, true, 0, 0, 1);
-
- assert(conn->cur_window_packets == 0);
- assert(conn->outbuf.get(conn->seq_nr) == NULL);
- assert(sizeof(PacketFormatV1) == 20);
-
- conn->state = CS_SYN_SENT;
- conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
-
- // Create and send a connect message
-
- // used in parse_log.py
- conn->log(UTP_LOG_NORMAL, "UTP_Connect conn_seed:%u packet_size:%u (B) "
- "target_delay:%u (ms) delay_history:%u "
- "delay_base_history:%u (minutes)",
- conn->conn_seed, PACKET_SIZE, conn->target_delay / 1000,
- CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
-
- // Setup initial timeout timer.
- conn->retransmit_timeout = 3000;
- conn->rto_timeout = conn->ctx->current_ms + conn->retransmit_timeout;
- conn->last_rcv_win = conn->get_rcv_window();
-
- // if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
- //conn->seq_nr = 1;
- conn->seq_nr = utp_call_get_random(conn->ctx, conn);
-
- // Create the connect packet.
- const size_t header_size = sizeof(PacketFormatV1);
-
- OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_size);
- PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
-
- memset(p1, 0, header_size);
- // SYN packets are special, and have the receive ID in the connid field,
- // instead of conn_id_send.
- p1->set_version(1);
- p1->set_type(ST_SYN);
- p1->ext = 0;
- p1->connid = conn->conn_id_recv;
- p1->windowsize = (uint32)conn->last_rcv_win;
- p1->seq_nr = conn->seq_nr;
- pkt->transmissions = 0;
- pkt->length = header_size;
- pkt->payload = 0;
-
- /*
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Sending connect %s [%u].",
- addrfmt(conn->addr, addrbuf), conn_seed);
- #endif
- */
-
- // Remember the message in the outgoing queue.
- conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
- conn->outbuf.put(conn->seq_nr, pkt);
- conn->seq_nr++;
- conn->cur_window_packets++;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "incrementing cur_window_packets:%u", conn->cur_window_packets);
- #endif
-
- conn->send_packet(pkt);
- return 0;
-}
-
-// Returns 1 if the UDP payload was recognized as a UTP packet, or 0 if it was not
-int utp_process_udp(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
-{
- assert(ctx);
- if (!ctx) return 0;
-
- assert(buffer);
- if (!buffer) return 0;
-
- assert(to);
- if (!to) return 0;
-
- const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
-
- if (len < sizeof(PacketFormatV1)) {
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
- #endif
- return 0;
- }
-
- const PacketFormatV1 *pf1 = (PacketFormatV1*)buffer;
- const byte version = UTP_Version(pf1);
- const uint32 id = uint32(pf1->connid);
-
- if (version != 1) {
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u version:%u unsupported version", addrfmt(addr, addrbuf), (uint)len, version);
- #endif
-
- return 0;
- }
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
- ctx->log(UTP_LOG_DEBUG, NULL, "recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
- #endif
-
- const byte flags = pf1->type();
-
- if (flags == ST_RESET) {
- // id is either our recv id or our send id
- // if it's our send id, and we initiated the connection, our recv id is id + 1
- // if it's our send id, and we did not initiate the connection, our recv id is id - 1
- // we have to check every case
-
- UTPSocketKeyData* keyData;
- if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
- ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
- ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
- {
- UTPSocket* conn = keyData->socket;
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for existing connection");
- #endif
-
- if (conn->state == CS_FIN_SENT)
- conn->state = CS_DESTROY;
- else
- conn->state = CS_RESET;
-
- utp_call_on_overhead_statistics(conn->ctx, conn, false, len + conn->get_udp_overhead(), close_overhead);
- const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
- utp_call_on_error(conn->ctx, conn, err);
- }
- else {
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for unknown connection");
- #endif
- }
- return 1;
- }
- else if (flags != ST_SYN) {
- UTPSocket* conn = NULL;
-
- if (ctx->last_utp_socket && ctx->last_utp_socket->addr == addr && ctx->last_utp_socket->conn_id_recv == id) {
- conn = ctx->last_utp_socket;
- } else {
- UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id));
- if (keyData) {
- conn = keyData->socket;
- ctx->last_utp_socket = conn;
- }
- }
-
- if (conn) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv processing");
- #endif
-
- const size_t read = utp_process_incoming(conn, buffer, len);
- utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead);
- return 1;
- }
- }
-
- // We have not found a matching utp_socket, and this isn't a SYN. Reject it.
- const uint32 seq_nr = pf1->seq_nr;
- if (flags != ST_SYN) {
- ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
-
- for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
- if ((ctx->rst_info[i].connid == id) &&
- (ctx->rst_info[i].addr == addr) &&
- (ctx->rst_info[i].ack_nr == seq_nr))
- {
- ctx->rst_info[i].timestamp = ctx->current_ms;
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (stored)");
- #endif
-
- return 1;
- }
- }
-
- if (ctx->rst_info.GetCount() > RST_INFO_LIMIT) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (limit at %u stored)", (uint)ctx->rst_info.GetCount());
- #endif
-
- return 1;
- }
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv send RST to non-SYN (%u stored)", (uint)ctx->rst_info.GetCount());
- #endif
-
- RST_Info &r = ctx->rst_info.Append();
- r.addr = addr;
- r.connid = id;
- r.ack_nr = seq_nr;
- r.timestamp = ctx->current_ms;
-
- UTPSocket::send_rst(ctx, addr, id, seq_nr, utp_call_get_random(ctx, NULL));
- return 1;
- }
-
- if (ctx->callbacks[UTP_ON_ACCEPT]) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "Incoming connection from %s", addrfmt(addr, addrbuf));
- #endif
-
- UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1));
- if (keyData) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, connection already exists");
- #endif
-
- return 1;
- }
-
- if (ctx->utp_sockets->GetCount() > 3000) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, too many uTP sockets %d", ctx->utp_sockets->GetCount());
- #endif
-
- return 1;
- }
- // true means yes, block connection. false means no, don't block.
- if (utp_call_on_firewall(ctx, to, tolen)) {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, firewall callback returned true");
- #endif
-
- return 1;
- }
-
- // Create a new UTP socket to handle this new connection
- UTPSocket *conn = utp_create_socket(ctx);
- utp_initialize_socket(conn, to, tolen, false, id, id+1, id);
- conn->ack_nr = seq_nr;
- conn->seq_nr = utp_call_get_random(ctx, NULL);
- conn->fast_resend_seq_nr = conn->seq_nr;
- conn->state = CS_SYN_RECV;
-
- const size_t read = utp_process_incoming(conn, buffer, len, true);
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "recv send connect ACK");
- #endif
-
- conn->send_ack(true);
-
- utp_call_on_accept(ctx, conn, to, tolen);
-
- // we report overhead after on_accept(), because the callbacks are setup now
- utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead); // SYN
- utp_call_on_overhead_statistics(conn->ctx, conn, true, conn->get_overhead(), ack_overhead); // SYNACK
- }
- else {
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, UTP_ON_ACCEPT callback not set");
- #endif
-
- }
-
- return 1;
-}
-
-// Called by utp_process_icmp_fragmentation() and utp_process_icmp_error() below
-static UTPSocket* parse_icmp_payload(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
-{
- assert(ctx);
- if (!ctx) return NULL;
-
- assert(buffer);
- if (!buffer) return NULL;
-
- assert(to);
- if (!to) return NULL;
-
- const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
-
- // ICMP packets are only required to quote the first 8 bytes of the layer4
- // payload. The UDP payload is 8 bytes, and the UTP header is another 20
- // bytes. So, in order to find the entire UTP header, we need the ICMP
- // packet to quote 28 bytes.
- if (len < sizeof(PacketFormatV1)) {
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: runt length %d", addrfmt(addr, addrbuf), len);
- #endif
- return NULL;
- }
-
- const PacketFormatV1 *pf = (PacketFormatV1*)buffer;
- const byte version = UTP_Version(pf);
- const uint32 id = uint32(pf->connid);
-
- if (version != 1) {
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: not UTP version 1", addrfmt(addr, addrbuf));
- #endif
- return NULL;
- }
-
- UTPSocketKeyData* keyData;
-
- if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
- ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
- ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
- {
- return keyData->socket;
- }
-
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: No matching connection found for id %u", addrfmt(addr, addrbuf), id);
- #endif
- return NULL;
-}
-
-// Should be called when an ICMP Type 3, Code 4 packet (fragmentation needed) is received, to adjust the MTU
-//
-// Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
-//
-// @ctx: utp_context
-// @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
-// @len: buffer length
-// @to: destination address of the original UDP pakcet
-// @tolen: address length
-// @next_hop_mtu:
-int utp_process_icmp_fragmentation(utp_context *ctx, const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen, uint16 next_hop_mtu)
-{
- UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
- if (!conn) return 0;
-
- // Constrain the next_hop_mtu to sane values. It might not be initialized or sent properly
- if (next_hop_mtu >= 576 && next_hop_mtu < 0x2000) {
- conn->mtu_ceiling = min<uint32>(next_hop_mtu, conn->mtu_ceiling);
- conn->mtu_search_update();
- // this is something of a speecial case, where we don't set mtu_last
- // to the value in between the floor and the ceiling. We can update the
- // floor, because there might be more network segments after the one
- // that sent this ICMP with smaller MTUs. But we want to test this
- // MTU size first. If the next probe gets through, mtu_floor is updated
- conn->mtu_last = conn->mtu_ceiling;
- } else {
- // Otherwise, binary search. At this point we don't actually know
- // what size the packet that failed was, and apparently we can't
- // trust the next hop mtu either. It seems reasonably conservative
- // to just lower the ceiling. This should not happen on working networks
- // anyway.
- conn->mtu_ceiling = (conn->mtu_floor + conn->mtu_ceiling) / 2;
- conn->mtu_search_update();
- }
-
- conn->log(UTP_LOG_MTU, "MTU [ICMP] floor:%d ceiling:%d current:%d", conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
- return 1;
-}
-
-// Should be called when an ICMP message is received that should tear down the connection.
-//
-// Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
-//
-// @ctx: utp_context
-// @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
-// @len: buffer length
-// @to: destination address of the original UDP pakcet
-// @tolen: address length
-int utp_process_icmp_error(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
-{
- UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
- if (!conn) return 0;
-
- const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
- const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
-
- switch(conn->state) {
- // Don't pass on errors for idle/closed connections
- case CS_IDLE:
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_IDLE, ignoring", addrfmt(addr, addrbuf));
- #endif
- return 1;
-
- case CS_FIN_SENT:
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_FIN_SENT, setting state to CS_DESTROY and causing error %d", addrfmt(addr, addrbuf), err);
- #endif
- conn->state = CS_DESTROY;
- break;
-
- default:
- #if UTP_DEBUG_LOGGING
- ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s, setting state to CS_RESET and causing error %d", addrfmt(addr, addrbuf), err);
- #endif
- conn->state = CS_RESET;
- break;
- }
-
- utp_call_on_error(conn->ctx, conn, err);
- return 1;
-}
-
-// Write bytes to the UTP socket. Returns the number of bytes written.
-// 0 indicates the socket is no longer writable, -1 indicates an error
-ssize_t utp_writev(utp_socket *conn, struct utp_iovec *iovec_input, size_t num_iovecs)
-{
- static utp_iovec iovec[UTP_IOV_MAX];
-
- assert(conn);
- if (!conn) return -1;
-
- assert(iovec_input);
- if (!iovec_input) return -1;
-
- assert(num_iovecs);
- if (!num_iovecs) return -1;
-
- if (num_iovecs > UTP_IOV_MAX)
- num_iovecs = UTP_IOV_MAX;
-
- memcpy(iovec, iovec_input, sizeof(struct utp_iovec)*num_iovecs);
-
- size_t bytes = 0;
- size_t sent = 0;
- for (size_t i = 0; i < num_iovecs; i++)
- bytes += iovec[i].iov_len;
-
- #if UTP_DEBUG_LOGGING
- size_t param = bytes;
- #endif
-
- if (conn->state != CS_CONNECTED) {
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = false (not CS_CONNECTED)", (uint)bytes);
- #endif
- return 0;
- }
-
- conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
-
- // don't send unless it will all fit in the window
- size_t packet_size = conn->get_packet_size();
- size_t num_to_send = min<size_t>(bytes, packet_size);
- while (!conn->is_full(num_to_send)) {
- // Send an outgoing packet.
- // Also add it to the outgoing of packets that have been sent but not ACKed.
-
- bytes -= num_to_send;
- sent += num_to_send;
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u cur_window_packets:%u",
- conn->seq_nr, conn->ack_nr,
- (uint)(conn->cur_window + num_to_send),
- (uint)conn->max_window, (uint)conn->max_window_user,
- (uint)conn->last_rcv_win, num_to_send,
- conn->cur_window_packets);
- #endif
- conn->write_outgoing_packet(num_to_send, ST_DATA, iovec, num_iovecs);
- num_to_send = min<size_t>(bytes, packet_size);
-
- if (num_to_send == 0) {
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = true", (uint)param);
- #endif
- return sent;
- }
- }
-
- bool full = conn->is_full();
- if (full) {
- // mark the socket as not being writable.
- conn->state = CS_CONNECTED_FULL;
- }
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = %s", (uint)bytes, full ? "false" : "true");
- #endif
-
- // returns whether or not the socket is still writable
- // if the congestion window is not full, we can still write to it
- //return !full;
- return sent;
-}
-
-void utp_read_drained(utp_socket *conn)
-{
- assert(conn);
- if (!conn) return;
-
- assert(conn->state != CS_UNINITIALIZED);
- if (conn->state == CS_UNINITIALIZED) return;
-
- const size_t rcvwin = conn->get_rcv_window();
-
- if (rcvwin > conn->last_rcv_win) {
- // If last window was 0 send ACK immediately, otherwise should set timer
- if (conn->last_rcv_win == 0) {
- conn->send_ack();
- } else {
- conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
- conn->schedule_ack();
- }
- }
-}
-
-// Should be called each time the UDP socket is drained
-void utp_issue_deferred_acks(utp_context *ctx)
-{
- assert(ctx);
- if (!ctx) return;
-
- for (size_t i = 0; i < ctx->ack_sockets.GetCount(); i++) {
- UTPSocket *conn = ctx->ack_sockets[i];
- conn->send_ack();
- i--;
- }
-}
-
-// Should be called every 500ms
-void utp_check_timeouts(utp_context *ctx)
-{
- assert(ctx);
- if (!ctx) return;
-
- ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
-
- if (ctx->current_ms - ctx->last_check < TIMEOUT_CHECK_INTERVAL)
- return;
-
- ctx->last_check = ctx->current_ms;
-
- for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
- if ((int)(ctx->current_ms - ctx->rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
- ctx->rst_info.MoveUpLast(i);
- i--;
- }
- }
- if (ctx->rst_info.GetCount() != ctx->rst_info.GetAlloc()) {
- ctx->rst_info.Compact();
- }
-
- utp_hash_iterator_t it;
- UTPSocketKeyData* keyData;
- while ((keyData = ctx->utp_sockets->Iterate(it))) {
- UTPSocket *conn = keyData->socket;
- conn->check_timeouts();
-
- // Check if the object was deleted
- if (conn->state == CS_DESTROY) {
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "Destroying");
- #endif
- delete conn;
- }
- }
-}
-
-int utp_getpeername(utp_socket *conn, struct sockaddr *addr, socklen_t *addrlen)
-{
- assert(addr);
- if (!addr) return -1;
-
- assert(addrlen);
- if (!addrlen) return -1;
-
- assert(conn);
- if (!conn) return -1;
-
- assert(conn->state != CS_UNINITIALIZED);
- if (conn->state == CS_UNINITIALIZED) return -1;
-
- socklen_t len;
- const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
- *addrlen = min(len, *addrlen);
- memcpy(addr, &sa, *addrlen);
- return 0;
-}
-
-int utp_get_delays(UTPSocket *conn, uint32 *ours, uint32 *theirs, uint32 *age)
-{
- assert(conn);
- if (!conn) return -1;
-
- assert(conn->state != CS_UNINITIALIZED);
- if (conn->state == CS_UNINITIALIZED) {
- if (ours) *ours = 0;
- if (theirs) *theirs = 0;
- if (age) *age = 0;
- return -1;
- }
-
- if (ours) *ours = conn->our_hist.get_value();
- if (theirs) *theirs = conn->their_hist.get_value();
- if (age) *age = (uint32)(conn->ctx->current_ms - conn->last_measured_delay);
- return 0;
-}
-
-// Close the UTP socket.
-// It is not valid for the upper layer to refer to socket after it is closed.
-// Data will keep to try being delivered after the close.
-void utp_close(UTPSocket *conn)
-{
- assert(conn);
- if (!conn) return;
-
- assert(conn->state != CS_UNINITIALIZED
- && conn->state != CS_DESTROY_DELAY
- && conn->state != CS_FIN_SENT
- && conn->state != CS_DESTROY);
-
- #if UTP_DEBUG_LOGGING
- conn->log(UTP_LOG_DEBUG, "UTP_Close in state:%s", statenames[conn->state]);
- #endif
-
- switch(conn->state) {
- case CS_CONNECTED:
- case CS_CONNECTED_FULL:
- conn->state = CS_FIN_SENT;
- conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
- break;
-
- case CS_SYN_SENT:
- conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
- // fall through
- case CS_GOT_FIN:
- conn->state = CS_DESTROY_DELAY;
- break;
- case CS_SYN_RECV:
- // fall through
- default:
- conn->state = CS_DESTROY;
- break;
- }
-}
-
-utp_context* utp_get_context(utp_socket *socket) {
- assert(socket);
- return socket ? socket->ctx : NULL;
-}
-
-void* utp_set_userdata(utp_socket *socket, void *userdata) {
- assert(socket);
- if (socket) socket->userdata = userdata;
- return socket ? socket->userdata : NULL;
-}
-
-void* utp_get_userdata(utp_socket *socket) {
- assert(socket);
- return socket ? socket->userdata : NULL;
-}
-
-void struct_utp_context::log(int level, utp_socket *socket, char const *fmt, ...)
-{
- switch (level) {
- case UTP_LOG_NORMAL: if (!log_normal) return;
- case UTP_LOG_MTU: if (!log_mtu) return;
- case UTP_LOG_DEBUG: if (!log_debug) return;
- }
-
- va_list va;
- char buf[4096];
-
- va_start(va, fmt);
- vsnprintf(buf, 4096, fmt, va);
- buf[4095] = '\0';
- va_end(va);
-
- utp_call_log(this, socket, (const byte *)buf);
-}
-
-utp_socket_stats* utp_get_stats(utp_socket *socket)
-{
- #ifdef _DEBUG
- assert(socket);
- if (!socket) return NULL;
- socket->_stats.mtu_guess = socket->mtu_last ? socket->mtu_last : socket->mtu_ceiling;
- return &socket->_stats;
- #else
- return NULL;
- #endif
-}