]> git.ipfire.org Git - thirdparty/tor.git/commitdiff
Prop#324: Hook up flow control
authorMike Perry <mikeperry-git@torproject.org>
Tue, 10 Aug 2021 21:35:46 +0000 (21:35 +0000)
committerDavid Goulet <dgoulet@torproject.org>
Mon, 4 Oct 2021 14:45:46 +0000 (10:45 -0400)
src/app/main/main.c
src/core/mainloop/connection.c
src/core/mainloop/mainloop.c
src/core/or/or.h
src/core/or/relay.c
src/core/or/sendme.c
src/core/or/sendme.h
src/feature/nodelist/networkstatus.c

index 89564490e6668a9c6d2925ab366bb4dde74fe2e7..0742abe70a07a92ac3c3e24de206433106067ce5 100644 (file)
@@ -27,6 +27,7 @@
 #include "core/or/channel.h"
 #include "core/or/channelpadding.h"
 #include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
 #include "core/or/circuitlist.h"
 #include "core/or/command.h"
 #include "core/or/connection_or.h"
@@ -630,6 +631,7 @@ tor_init(int argc, char *argv[])
    * until we get a consensus */
   channelpadding_new_consensus_params(NULL);
   circpad_new_consensus_params(NULL);
+  flow_control_new_consensus_params(NULL);
 
   /* Initialize circuit padding to defaults+torrc until we get a consensus */
   circpad_machines_init();
index 48bea792ae654d7433a2b3544be3b04741cf1da4..9271a70914fa7ae25009560e255ea3b846befe34 100644 (file)
 #include "feature/nodelist/routerinfo_st.h"
 #include "core/or/socks_request_st.h"
 
+#include "core/or/congestion_control_flow.h"
+
 /**
  * On Windows and Linux we cannot reliably bind() a socket to an
  * address and port if: 1) There's already a socket bound to wildcard
@@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
       !dont_stop_writing) { /* it's done flushing */
     if (connection_finished_flushing(conn) < 0) {
       /* already marked */
-      return -1;
+      goto err;
     }
-    return 0;
+    goto done;
   }
 
   /* Call even if result is 0, since the global write bucket may
@@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
   if (n_read > 0 && connection_is_reading(conn))
     connection_consider_empty_read_buckets(conn);
 
+ done:
+  /* If this is an edge connection with congestion control, check to see
+   * if it is time to send an xon */
+  if (conn_uses_flow_control(conn)) {
+    flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
+  }
+
   return 0;
+
+ err:
+  return -1;
 }
 
 /* DOCDOC connection_handle_write */
index 37b53db92acfd4b2152fff877e2b656b37948743..cd57dea3d4d5bf5c833af4b11c5594811cda9fd3 100644 (file)
@@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
     if (connection_should_read_from_linked_conn(conn))
       connection_start_reading_from_linked_conn(conn);
   } else {
+    if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
+      /* We should not get called here if we're waiting for an XON, but
+       * belt-and-suspenders */
+      log_notice(LD_NET,
+                 "Request to start reading on an edgeconn blocked with XOFF");
+      return;
+    }
     if (event_add(conn->read_event, NULL))
       log_warn(LD_NET, "Error from libevent setting read event state for %d "
                "to watched: %s",
index 99948f26e2274caa19c6a1f061255514168ba18b..ad82130301ca888139629be29d64916418330768 100644 (file)
@@ -210,6 +210,9 @@ struct curve25519_public_key_t;
 #define RELAY_COMMAND_PADDING_NEGOTIATE 41
 #define RELAY_COMMAND_PADDING_NEGOTIATED 42
 
+#define RELAY_COMMAND_XOFF 43
+#define RELAY_COMMAND_XON 44
+
 /* Reasons why an OR connection is closed. */
 #define END_OR_CONN_REASON_DONE           1
 #define END_OR_CONN_REASON_REFUSED        2 /* connection refused */
index e3d41d7bf01792264bf372bc71de37117e927007..0e889eb3483a4c90eadf5d5a04eee9aa5dd8e557 100644 (file)
@@ -98,6 +98,7 @@
 #include "core/or/socks_request_st.h"
 #include "core/or/sendme.h"
 #include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
 
 static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
                                             cell_direction_t cell_direction,
@@ -1739,6 +1740,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
         sendme_connection_edge_consider_sending(conn);
       }
 
+      return 0;
+    case RELAY_COMMAND_XOFF:
+      if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+              connection_half_edge_is_valid_data(ocirc->half_streams,
+                                                rh->stream_id)) {
+            circuit_read_valid_data(ocirc, rh->length);
+          }
+        }
+        return 0;
+      }
+
+      if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+        }
+      }
+      return 0;
+    case RELAY_COMMAND_XON:
+      if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+              connection_half_edge_is_valid_data(ocirc->half_streams,
+                                                rh->stream_id)) {
+            circuit_read_valid_data(ocirc, rh->length);
+          }
+        }
+        return 0;
+      }
+
+      if (circuit_process_stream_xon(conn, layer_hint, cell)) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+        }
+      }
       return 0;
     case RELAY_COMMAND_END:
       reason = rh->length > 0 ?
@@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
   }
 
   /* Handle the stream-level SENDME package window. */
-  if (sendme_note_stream_data_packaged(conn) < 0) {
+  if (sendme_note_stream_data_packaged(conn, length) < 0) {
     connection_stop_reading(TO_CONN(conn));
     log_debug(domain,"conn->package_window reached 0.");
     circuit_consider_stop_edge_reading(circ, cpath_layer);
@@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   /* Activate reading starting from the chosen stream */
   for (conn=chosen_stream; conn; conn = conn->next_stream) {
     /* Start reading for the streams starting from here */
-    if (conn->base_.marked_for_close || conn->package_window <= 0)
+    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+        conn->xoff_received)
       continue;
     if (!layer_hint || conn->cpath_layer == layer_hint) {
       connection_start_reading(TO_CONN(conn));
@@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   }
   /* Go back and do the ones we skipped, circular-style */
   for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
-    if (conn->base_.marked_for_close || conn->package_window <= 0)
+    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+        conn->xoff_received)
       continue;
     if (!layer_hint || conn->cpath_layer == layer_hint) {
       connection_start_reading(TO_CONN(conn));
index 900490a89268cde6648c7610c54dc5e03cdc5eaa..ee670f9d5140fd01262ec4e5dc0869065c387053 100644 (file)
@@ -22,6 +22,7 @@
 #include "core/or/relay.h"
 #include "core/or/sendme.h"
 #include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
 #include "feature/nodelist/networkstatus.h"
 #include "lib/ctime/di_ops.h"
 #include "trunnel/sendme_cell.h"
@@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
 
   int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
 
+  /* If we use flow control, we do not send stream sendmes */
+  if (edge_uses_flow_control(conn))
+    goto end;
+
   /* Don't send it if we still have data to deliver. */
   if (connection_outbuf_too_full(TO_CONN(conn))) {
     goto end;
@@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
   tor_assert(conn);
   tor_assert(circ);
 
+  if (edge_uses_flow_control(conn)) {
+    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+           "Congestion control got stream sendme");
+    return -END_CIRC_REASON_TORPROTOCOL;
+  }
+
   /* Don't allow the other endpoint to request more than our maximum (i.e.
    * initial) stream SENDME window worth of data. Well-behaved stock clients
    * will not request more than this max (as per the check in the while loop
@@ -603,7 +614,12 @@ int
 sendme_stream_data_received(edge_connection_t *conn)
 {
   tor_assert(conn);
-  return --conn->deliver_window;
+
+  if (edge_uses_flow_control(conn)) {
+    return flow_control_decide_xoff(conn);
+  } else {
+    return --conn->deliver_window;
+  }
 }
 
 /* Called when a relay DATA cell is packaged on the given circuit. If
@@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
 /* Called when a relay DATA cell is packaged for the given edge connection
  * conn. Update the package window and return its new value. */
 int
-sendme_note_stream_data_packaged(edge_connection_t *conn)
+sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
 {
   tor_assert(conn);
 
+  if (edge_uses_flow_control(conn)) {
+    flow_control_note_sent_data(conn, len);
+    if (conn->xoff_received)
+      return -1;
+    else
+      return 1;
+  }
+
   --conn->package_window;
   log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
   return conn->package_window;
index c224d0a9213cecda0664c94b9c799fbffd05d0ba..2abec91a91b94754d8fb5805a35d42ee0ca9a5c8 100644 (file)
@@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
 /* Update package window functions. */
 int sendme_note_circuit_data_packaged(circuit_t *circ,
                                       crypt_path_t *layer_hint);
-int sendme_note_stream_data_packaged(edge_connection_t *conn);
+int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
 
 /* Record cell digest on circuit. */
 void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);
index 7a1e73ef600cf8363e62139d9d551ab4d35983d9..0138dff0331051c72441ca1d0162f2d0aabd081f 100644 (file)
@@ -45,6 +45,7 @@
 #include "core/or/channel.h"
 #include "core/or/channelpadding.h"
 #include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
 #include "core/or/circuitmux.h"
 #include "core/or/circuitmux_ewma.h"
 #include "core/or/circuitstats.h"
@@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void)
   channelpadding_new_consensus_params(c);
   circpad_new_consensus_params(c);
   router_new_consensus_params(c);
+  flow_control_new_consensus_params(c);
 
   /* Maintenance of our L2 guard list */
   maintain_layer2_guards();