edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
+static void set_block_state_for_streams(edge_connection_t *stream_list,
+ int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
chan->num_p_circuits = 0;
}
-/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
+/**
+ * Called when a circuit becomes blocked or unblocked due to the channel
+ * cell queue.
+ *
+ * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate.
- *
- * If <b>stream_id</b> is nonzero, block only the edge connection whose
- * stream_id matches it.
- *
- * Returns the number of streams whose status we changed.
*/
-static int
-set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
- int block, streamid_t stream_id)
+static void
+set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
{
edge_connection_t *edge = NULL;
- int n = 0;
if (circ->n_chan == chan) {
- circ->streams_blocked_on_n_chan = block;
+ circ->circuit_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
- circ->streams_blocked_on_p_chan = block;
+ circ->circuit_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
- for (; edge; edge = edge->next_stream) {
+ set_block_state_for_streams(edge, block, 0);
+}
+
+/**
+ * Helper function to block or unblock streams in a stream list.
+ *
+ * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
+ * in the stream list. If it is non-zero, only apply to that specific stream.
+ */
+static void
+set_block_state_for_streams(edge_connection_t *stream_list, int block,
+ streamid_t stream_id)
+{
+ for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
- if (edge->edge_blocked_on_circ != block) {
- ++n;
- edge->edge_blocked_on_circ = block;
- }
-
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
connection_start_reading(conn);
}
}
-
- return n;
}
/** Extract the command from a packed cell. */
destroy_cell_queue_t *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
- int streams_blocked;
+ int circ_blocked;
packed_cell_t *cell;
/* Get the cmux */
if (circ->n_chan == chan) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
} else {
or_circ = TO_OR_CIRCUIT(circ);
tor_assert(or_circ->p_chan == chan);
queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
}
/* Circuitmux told us this was active, so it should have cells.
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
- if (streams_blocked && queue->n <= cell_queue_lowwatermark())
- set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+ if (circ_blocked && queue->n <= cell_queue_lowwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
}
streamid_t fromstream)
{
or_circuit_t *orcirc = NULL;
+ edge_connection_t *stream_list = NULL;
cell_queue_t *queue;
int32_t max_queue_size;
- int streams_blocked;
+ int circ_blocked;
int exitward;
if (circ->marked_for_close)
return;
exitward = (direction == CELL_DIRECTION_OUT);
if (exitward) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
max_queue_size = max_circuit_cell_queue_size_out;
+ if (CIRCUIT_IS_ORIGIN(circ))
+ stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
max_queue_size = max_circuit_cell_queue_size;
+ stream_list = TO_OR_CIRCUIT(circ)->n_streams;
}
if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
return;
}
- /* If we have too many cells on the circuit, we should stop reading from
- * the edge streams for a while. */
- if (!streams_blocked && queue->n >= cell_queue_highwatermark())
- set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
+ /* If we have too many cells on the circuit, note that it should
+ * be blocked from new cells. */
+ if (!circ_blocked && queue->n >= cell_queue_highwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 1);
- if (streams_blocked && fromstream) {
- /* This edge connection is apparently not blocked; block it. */
- set_streams_blocked_on_circ(circ, chan, 1, fromstream);
+ if (circ_blocked && fromstream) {
+ /* This edge connection is apparently not blocked; this can happen for
+ * new streams on a blocked circuit, for their CONNECTED response.
+ * block it now. */
+ set_block_state_for_streams(stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);
circuit_queue_streams_are_blocked(circuit_t *circ)
{
if (CIRCUIT_IS_ORIGIN(circ)) {
- return circ->streams_blocked_on_n_chan;
+ return circ->circuit_blocked_on_n_chan;
} else {
- return circ->streams_blocked_on_p_chan;
+ return circ->circuit_blocked_on_p_chan;
}
}