]> git.ipfire.org Git - thirdparty/tor.git/commitdiff
Refactor stream blocking due to channel cell queues
authorMike Perry <mikeperry-git@torproject.org>
Fri, 20 Jan 2023 19:14:33 +0000 (19:14 +0000)
committerMike Perry <mikeperry-git@torproject.org>
Thu, 6 Apr 2023 15:57:10 +0000 (15:57 +0000)
Streams can get blocked on a circuit in two ways:
  1. When the circuit package window is full
  2. When the channel's cell queue is too high

Conflux needs to decouple stream blocking from both of these conditions,
because streams can continue on another circuit, even if the primary circuit
is blocked for either of these cases.

However, both conflux and congestion control need to know if the channel's
cell queue hit the highwatermark and is still draining, because this condition
is used by those components, independent of stream state.

Therefore, this commit renames the 'streams_blocked_on_chan' variable to
signify that it refers to the cell queue state, and also refactors the actual
stream blocking bits out, so they can be handled separately if conflux is
present.

src/core/mainloop/mainloop.c
src/core/mainloop/mainloop.h
src/core/or/circuit_st.h
src/core/or/circuituse.c
src/core/or/congestion_control_common.c
src/core/or/edge_connection_st.h
src/core/or/relay.c
src/test/fakecircs.c

index a1ea32220aae843812bd24f3bfffd3a7f65e1c57..e1c9786b2ecf9062c1dfe562b7d7d8640dd4f068 100644 (file)
@@ -497,7 +497,7 @@ connection_watch_events(connection_t *conn, watchable_events_t events)
 
 /** Return true iff <b>conn</b> is listening for read events. */
 int
-connection_is_reading(connection_t *conn)
+connection_is_reading(const connection_t *conn)
 {
   tor_assert(conn);
 
index 98d0b3a058f6b98096ef250fe24a39f336e89f16..64782c131847bf9cca803c099980930f1b7e46a9 100644 (file)
@@ -38,7 +38,7 @@ typedef enum watchable_events {
   WRITE_EVENT=0x04 /**< We want to know when a connection is writable */
 } watchable_events_t;
 void connection_watch_events(connection_t *conn, watchable_events_t events);
-int connection_is_reading(connection_t *conn);
+int connection_is_reading(const connection_t *conn);
 MOCK_DECL(void,connection_stop_reading,(connection_t *conn));
 MOCK_DECL(void,connection_start_reading,(connection_t *conn));
 
index 7f39c9337e921c0cfd03fdab346a5809531a0235..1afb4d4426008fbcccf3e78e6b2acbcf8290cab6 100644 (file)
@@ -88,11 +88,11 @@ struct circuit_t {
   extend_info_t *n_hop;
 
   /** True iff we are waiting for n_chan_cells to become less full before
-   * allowing p_streams to add any more cells. (Origin circuit only.) */
-  unsigned int streams_blocked_on_n_chan : 1;
+   * allowing any more cells on this circuit. (Origin circuit only.) */
+  unsigned int circuit_blocked_on_n_chan : 1;
   /** True iff we are waiting for p_chan_cells to become less full before
-   * allowing n_streams to add any more cells. (OR circuit only.) */
-  unsigned int streams_blocked_on_p_chan : 1;
+   * allowing any more cells on this circuit. (OR circuit only.) */
+  unsigned int circuit_blocked_on_p_chan : 1;
 
   /** True iff we have queued a delete backwards on this circuit, but not put
    * it on the output buffer. */
index 911025297643751cd12f6625068c4dc65b3bc987..25401aea55dcbb480faa11bf91c8960484a975d6 100644 (file)
@@ -63,6 +63,7 @@
 #include "lib/math/fp.h"
 #include "lib/time/tvdiff.h"
 #include "lib/trace/events.h"
+#include "src/core/mainloop/mainloop.h"
 
 #include "core/or/cpath_build_state_st.h"
 #include "feature/dircommon/dir_connection_st.h"
@@ -938,7 +939,7 @@ circuit_log_ancient_one_hop_circuits(int age)
                  c->marked_for_close,
                  c->hold_open_until_flushed ? "" : "not ",
                  conn->edge_has_sent_end ? "" : "not ",
-                 conn->edge_blocked_on_circ ? "Blocked" : "Not blocked");
+                 connection_is_reading(c) ? "Not blocked" : "Blocked");
       if (! c->linked_conn)
         continue;
 
index 920b57cf0039a08d872c99d300b9edc53cab1af1..c7c950d0c8283f0d992652e81d6bf3ba64570acb 100644 (file)
@@ -954,11 +954,11 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
   if (CIRCUIT_IS_ORIGIN(circ)) {
     /* origin circs use n_chan */
     chan_q = circ->n_chan_cells.n;
-    blocked_on_chan = circ->streams_blocked_on_n_chan;
+    blocked_on_chan = circ->circuit_blocked_on_n_chan;
   } else {
     /* Both onion services and exits use or_circuit and p_chan */
     chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n;
-    blocked_on_chan = circ->streams_blocked_on_p_chan;
+    blocked_on_chan = circ->circuit_blocked_on_p_chan;
   }
 
   /* If we have no EWMA RTT, it is because monotime has been stalled
index 942991f1391938ec4822541c197a520eec75baf8..22f9040d15302015d89229763fb7cafe106a27a0 100644 (file)
@@ -66,9 +66,6 @@ struct edge_connection_t {
                          * connections.  Set once we've set the stream end,
                          * and check in connection_about_to_close_connection().
                          */
-  /** True iff we've blocked reading until the circuit has fewer queued
-   * cells. */
-  unsigned int edge_blocked_on_circ:1;
 
   /** Unique ID for directory requests; this used to be in connection_t, but
    * that's going away and being used on channels instead.  We still tag
index 38fb560e34ac3707192e0631a6c83f36679d030a..827f0c3e462d3bd060445da466c48b55eef19006 100644 (file)
@@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
                                            edge_connection_t *conn,
                                            crypt_path_t *layer_hint,
                                            relay_header_t *rh);
+static void set_block_state_for_streams(edge_connection_t *stream_list,
+                                        int block, streamid_t stream_id);
 
 /** Stats: how many relay cells have originated at this hop, or have
  * been relayed onward (not recognized at this hop)?
@@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
   chan->num_p_circuits = 0;
 }
 
-/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
+/**
+ * Called when a circuit becomes blocked or unblocked due to the channel
+ * cell queue.
+ *
+ * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
  * every edge connection that is using <b>circ</b> to write to <b>chan</b>,
  * and start or stop reading as appropriate.
- *
- * If <b>stream_id</b> is nonzero, block only the edge connection whose
- * stream_id matches it.
- *
- * Returns the number of streams whose status we changed.
  */
-static int
-set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
-                            int block, streamid_t stream_id)
+static void
+set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
 {
   edge_connection_t *edge = NULL;
-  int n = 0;
   if (circ->n_chan == chan) {
-    circ->streams_blocked_on_n_chan = block;
+    circ->circuit_blocked_on_n_chan = block;
     if (CIRCUIT_IS_ORIGIN(circ))
       edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
   } else {
-    circ->streams_blocked_on_p_chan = block;
+    circ->circuit_blocked_on_p_chan = block;
     tor_assert(!CIRCUIT_IS_ORIGIN(circ));
     edge = TO_OR_CIRCUIT(circ)->n_streams;
   }
 
-  for (; edge; edge = edge->next_stream) {
+  set_block_state_for_streams(edge, block, 0);
+}
+
+/**
+ * Helper function to block or unblock streams in a stream list.
+ *
+ * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
+ * in the stream list. If it is non-zero, only apply to that specific stream.
+ */
+static void
+set_block_state_for_streams(edge_connection_t *stream_list, int block,
+                            streamid_t stream_id)
+{
+  for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
     connection_t *conn = TO_CONN(edge);
     if (stream_id && edge->stream_id != stream_id)
       continue;
 
-    if (edge->edge_blocked_on_circ != block) {
-      ++n;
-      edge->edge_blocked_on_circ = block;
-    }
-
     if (!conn->read_event) {
       /* This connection is a placeholder for something; probably a DNS
        * request.  It can't actually stop or start reading.*/
@@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
         connection_start_reading(conn);
     }
   }
-
-  return n;
 }
 
 /** Extract the command from a packed cell. */
@@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
   destroy_cell_queue_t *destroy_queue=NULL;
   circuit_t *circ;
   or_circuit_t *or_circ;
-  int streams_blocked;
+  int circ_blocked;
   packed_cell_t *cell;
 
   /* Get the cmux */
@@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
 
     if (circ->n_chan == chan) {
       queue = &circ->n_chan_cells;
-      streams_blocked = circ->streams_blocked_on_n_chan;
+      circ_blocked = circ->circuit_blocked_on_n_chan;
     } else {
       or_circ = TO_OR_CIRCUIT(circ);
       tor_assert(or_circ->p_chan == chan);
       queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
-      streams_blocked = circ->streams_blocked_on_p_chan;
+      circ_blocked = circ->circuit_blocked_on_p_chan;
     }
 
     /* Circuitmux told us this was active, so it should have cells.
@@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
 
     /* Is the cell queue low enough to unblock all the streams that are waiting
      * to write to this circuit? */
-    if (streams_blocked && queue->n <= cell_queue_lowwatermark())
-      set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+    if (circ_blocked && queue->n <= cell_queue_lowwatermark())
+      set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
 
     /* If n_flushed < max still, loop around and pick another circuit */
   }
@@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
                              streamid_t fromstream)
 {
   or_circuit_t *orcirc = NULL;
+  edge_connection_t *stream_list = NULL;
   cell_queue_t *queue;
   int32_t max_queue_size;
-  int streams_blocked;
+  int circ_blocked;
   int exitward;
   if (circ->marked_for_close)
     return;
@@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
   exitward = (direction == CELL_DIRECTION_OUT);
   if (exitward) {
     queue = &circ->n_chan_cells;
-    streams_blocked = circ->streams_blocked_on_n_chan;
+    circ_blocked = circ->circuit_blocked_on_n_chan;
     max_queue_size = max_circuit_cell_queue_size_out;
+    if (CIRCUIT_IS_ORIGIN(circ))
+      stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
   } else {
     orcirc = TO_OR_CIRCUIT(circ);
     queue = &orcirc->p_chan_cells;
-    streams_blocked = circ->streams_blocked_on_p_chan;
+    circ_blocked = circ->circuit_blocked_on_p_chan;
     max_queue_size = max_circuit_cell_queue_size;
+    stream_list = TO_OR_CIRCUIT(circ)->n_streams;
   }
 
   if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
@@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
       return;
   }
 
-  /* If we have too many cells on the circuit, we should stop reading from
-   * the edge streams for a while. */
-  if (!streams_blocked && queue->n >= cell_queue_highwatermark())
-    set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
+  /* If we have too many cells on the circuit, note that it should
+   * be blocked from new cells. */
+  if (!circ_blocked && queue->n >= cell_queue_highwatermark())
+    set_circuit_blocked_on_chan(circ, chan, 1);
 
-  if (streams_blocked && fromstream) {
-    /* This edge connection is apparently not blocked; block it. */
-    set_streams_blocked_on_circ(circ, chan, 1, fromstream);
+  if (circ_blocked && fromstream) {
+    /* This edge connection is apparently not blocked; this can happen for
+     * new streams on a blocked circuit, for their CONNECTED response.
+     * block it now. */
+    set_block_state_for_streams(stream_list, 1, fromstream);
   }
 
   update_circuit_on_cmux(circ, direction);
@@ -3508,8 +3519,8 @@ static int
 circuit_queue_streams_are_blocked(circuit_t *circ)
 {
   if (CIRCUIT_IS_ORIGIN(circ)) {
-    return circ->streams_blocked_on_n_chan;
+    return circ->circuit_blocked_on_n_chan;
   } else {
-    return circ->streams_blocked_on_p_chan;
+    return circ->circuit_blocked_on_p_chan;
   }
 }
index cca3b4348373d5edd8022fc88233916cb5eed2c4..caeacd84efc52b8abe88f657f0b1509a2cfd6f2d 100644 (file)
@@ -41,8 +41,8 @@ new_fake_orcirc(channel_t *nchan, channel_t *pchan)
   cell_queue_init(&(circ->n_chan_cells));
 
   circ->n_hop = NULL;
-  circ->streams_blocked_on_n_chan = 0;
-  circ->streams_blocked_on_p_chan = 0;
+  circ->circuit_blocked_on_n_chan = 0;
+  circ->circuit_blocked_on_p_chan = 0;
   circ->n_delete_pending = 0;
   circ->p_delete_pending = 0;
   circ->received_destroy = 0;