Description of how this works in `docs/internal/RATELIMITS.ms`.
Notable implementation changes:
- KEEP_SEND_PAUSE/KEEP_SEND_HOLD and KEEP_RECV_PAUSE/KEEP_RECV_HOLD
no longer exist. Pausing is down via blocked the new rlimits.
- KEEP_SEND_TIMED no longer exists. Pausing "100-continue" transfers
is done in the new `Curl_http_perform_pollset()` method.
- HTTP/2 rate limiting implemented via window updates. When
transfer initiaiting connection has a ratelimit, adjust the
initial window size
- HTTP/3 ngtcp2 rate limitin implemnented via ack updates
- HTTP/3 quiche does not seem to support this via its API
- the default progress-meter has been improved for accuracy
in "current speed" results.
pytest speed tests have been improved.
Closes #19384
internals/MULTI-EV.md \
internals/NEW-PROTOCOL.md \
internals/PORTING.md \
+ internals/RATELIMITS.md \
internals/README.md \
internals/SCORECARD.md \
internals/SPLAY.md \
--- /dev/null
+<!--
+Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+
+SPDX-License-Identifier: curl
+-->
+
+# Rate Limiting Transfers
+
+Rate limiting a transfer means that no more than "n bytes per second"
+shall be sent or received. It can be set individually for both directions
+via `CURLOPT_MAX_RECV_SPEED_LARGE` and `CURLOPT_MAX_SEND_SPEED_LARGE`. These
+options may be adjusted for an ongoing transfer.
+
+### Implementation Base
+
+`ratelimit.[ch]` implements `struct Curl_rlimit` and functions to manage
+such limits. It has the following properties:
+
+* `rate_per_sec`: how many "tokens" can be used per second, 0 for infinite.
+* `tokens`: the currently available tokens to consume
+* `burst_per_sec`: an upper limit on tokens available
+* `ts`: the microsecond timestamp of the last tokens update
+* `spare_us`: elapsed microseconds that have not counted yet for a token update
+* `blocked`: if the limit is blocked
+
+Tokens can be *drained* from an `rlimit`. This reduces `tokens`, even to
+negative values. To enforce the limits, tokens should not be drained
+further when they reach 0, but such things may happen.
+
+An `rlimit`can be asked how long to wait until `tokens` are positive again.
+This is given in milliseconds. When token are available, this wait
+time is 0.
+
+Ideally a user of `rlimit` would consume the available tokens to 0, then
+get a wait times of 1000ms, after which the set rate of tokens has
+regenerated. Rinse and repeat.
+
+Should a user drain twice the amount of the rate, tokens are negative
+and the wait time is 2 seconds. The `spare_us` account for the
+time that has passed for the consumption. When a user takes 250ms to
+consume the rate, the wait time is then 750ms.
+
+When a user drains nothing for two seconds, the available tokens would
+grow to twice the rate, unless a burst rate is set.
+
+Finally, an `rlimit` may be set to `blocked` and later unblocked again.
+A blocked `rlimit` has no tokens available. This works also when the rate
+is unlimited (`rate_per_sec` set to 0).
+
+### Downloads
+
+`rlimit` is in `data->progress.dl.rlimit`. `setopt.c` initializes it whenever
+the application sets `CURLOPT_MAX_RECV_SPEED_LARGE`. This may be done
+in the middle of a transfer.
+
+`rlimit` tokens are drained in the "protocol" client writer. Checks for
+capacity depend on the protocol:
+
+* HTTP and other plain protocols: `transfer.c:sendrecv_dl()` reads only
+up to capacity.
+* HTTP/2: capacity is used to adjust a stream's window size. Since all
+streams start with `64kb`, `rlimit` takes a few seconds to take effect.
+* HTTP/3: ngtcp2 acknowledges stream data according to capacity. It
+keeps track of bytes not acknowledged yet. This has the same effect as HTTP/2
+window sizes.
+
+(The quiche API does not offer control of `ACK`s and `rlimits` for download
+do not work in that backend.)
+
+### Uploads
+
+`rlimit` is in `data->progress.ul.rlimit`. `setopt.c` initializes it whenever
+the application sets `CURLOPT_MAX_SEND_SPEED_LARGE`. This may be done
+in the middle of a transfer.
+
+The upload capacity is checked in `Curl_client_read()` and readers are
+only asked to read bytes up to the `rlimit` capacity. This limits upload
+of data for all protocols in the same way.
+
+### Pause/Unpause
+
+Pausing of up-/downloads sets the corresponding `rlimit` to blocked. Unpausing
+removes that block.
+
+### Suspending transfers
+
+While obeying the `rlimit` for up-/download leads to the desired transfer
+rates, the other issue that needs care is CPU consumption.
+
+`rlimits` are inspected when computing the "pollset" of a transfer. When
+a transfer wants to send, but not send tokens are available, the `POLLOUT`
+is removed from the pollset. Same for receiving.
+
+For a transfer that is, due to `rlimit`, not able to progress, the pollset
+is then empty. No socket events are monitored, no CPU activity
+happens. For paused transfers, this is sufficient.
+
+Draining `rlimit` happens when a transfer is in `PERFORM` state and
+exhausted limits cause the timer `TOOFAST` to be set. When the fires,
+the transfer runs again and `rlimit`s are re-evaluated.
progress.c \
psl.c \
rand.c \
+ ratelimit.c \
rename.c \
request.c \
rtsp.c \
socks.c \
socks_gssapi.c \
socks_sspi.c \
- speedcheck.c \
splay.c \
strcase.c \
strdup.c \
progress.h \
psl.h \
rand.h \
+ ratelimit.h \
rename.h \
request.h \
rtsp.h \
sockaddr.h \
socketpair.h \
socks.h \
- speedcheck.h \
splay.h \
strcase.h \
strdup.h \
/* socket buffer drained, return */
return CURLE_OK;
- if(Curl_pgrsUpdate(data))
- return CURLE_ABORTED_BY_CALLBACK;
+ if(!result)
+ result = Curl_pgrsUpdate(data);
if(result) {
ts->keepon = KEEPON_DONE;
/* read what is there */
CURL_TRC_CF(data, cf, "CONNECT receive");
result = recv_CONNECT_resp(cf, data, ts, &done);
- if(Curl_pgrsUpdate(data)) {
- result = CURLE_ABORTED_BY_CALLBACK;
- goto out;
- }
+ if(!result)
+ result = Curl_pgrsUpdate(data);
/* error or not complete yet. return for more multi-multi */
if(result || !done)
goto out;
/* The real request will follow the CONNECT, reset request partially */
Curl_req_soft_reset(&data->req, data);
Curl_client_reset(data);
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
+ Curl_pgrsReset(data);
tunnel_free(cf, data);
}
#include "sendf.h"
#include "escape.h"
#include "file.h"
-#include "speedcheck.h"
#include "multiif.h"
#include "transfer.h"
#include "url.h"
Curl_pgrsSetUploadCounter(data, bytecount);
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, curlx_now());
+ result = Curl_pgrsCheck(data);
}
- if(!result && Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
+ if(!result)
+ result = Curl_pgrsUpdate(data);
out:
close(fd);
if(result)
goto out;
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, curlx_now());
+ result = Curl_pgrsCheck(data);
if(result)
goto out;
}
#endif
}
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
+ if(!result)
+ result = Curl_pgrsUpdate(data);
out:
Curl_multi_xfer_buf_release(data, xfer_buf);
#include "sockaddr.h" /* required for Curl_sockaddr_storage */
#include "multiif.h"
#include "url.h"
-#include "speedcheck.h"
#include "curlx/warnless.h"
#include "http_proxy.h"
#include "socks.h"
return CURLE_RECV_ERROR;
}
else if(ev == 0) {
- if(Curl_pgrsUpdate(data))
- return CURLE_ABORTED_BY_CALLBACK;
+ result = Curl_pgrsUpdate(data);
continue; /* just continue in our loop for the timeout duration */
}
}
bool connected = FALSE;
data->req.size = -1; /* make sure this is unknown at this point */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
ftpc->ctl_valid = TRUE; /* starts good */
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */
- Curl_http_do_pollset, /* doing_pollset */
+ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
- ZERO_NULL, /* perform_pollset */
+ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_pollset */
- Curl_http_do_pollset, /* doing_pollset */
+ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
- ZERO_NULL, /* perform_pollset */
+ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
/* this returns the socket to wait for in the DO and DOING state for the multi
interface and then we are always _sending_ a request and thus we wait for
the single socket to become writable only */
-CURLcode Curl_http_do_pollset(struct Curl_easy *data,
- struct easy_pollset *ps)
+CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
+ struct easy_pollset *ps)
{
/* write mode */
return Curl_pollset_add_out(data, ps, data->conn->sock[FIRSTSOCKET]);
}
+CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
+ struct easy_pollset *ps)
+{
+ struct connectdata *conn = data->conn;
+ CURLcode result = CURLE_OK;
+
+ if(CURL_WANT_RECV(data)) {
+ result = Curl_pollset_add_in(data, ps, conn->sock[FIRSTSOCKET]);
+ }
+
+ /* on a "Expect: 100-continue" timed wait, do not poll for outgoing */
+ if(!result && Curl_req_want_send(data) && !http_exp100_is_waiting(data)) {
+ result = Curl_pollset_add_out(data, ps, conn->sock[FIRSTSOCKET]);
+ }
+ return result;
+}
+
/*
* Curl_http_done() gets called after a single HTTP request has been
* performed.
struct cr_exp100_ctx *ctx = reader->ctx;
if(ctx->state > EXP100_SEND_DATA) {
ctx->state = EXP100_SEND_DATA;
- data->req.keepon |= KEEP_SEND;
- data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT);
}
}
ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = curlx_now();
Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT);
- data->req.keepon &= ~KEEP_SEND;
- data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0;
*eos = FALSE;
return CURLE_OK;
ms = curlx_timediff_ms(curlx_now(), ctx->start);
if(ms < data->set.expect_100_timeout) {
DEBUGF(infof(data, "cr_exp100_read, AWAITING_CONTINUE, not expired"));
- data->req.keepon &= ~KEEP_SEND;
- data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0;
*eos = FALSE;
return CURLE_OK;
{
struct cr_exp100_ctx *ctx = reader->ctx;
ctx->state = premature ? EXP100_FAILED : EXP100_SEND_DATA;
- data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT);
}
CURLcode Curl_http(struct Curl_easy *data, bool *done);
CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature);
CURLcode Curl_http_connect(struct Curl_easy *data, bool *done);
-CURLcode Curl_http_do_pollset(struct Curl_easy *data,
- struct easy_pollset *ps);
+CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
+ struct easy_pollset *ps);
+CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
+ struct easy_pollset *ps);
CURLcode Curl_http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos);
#define H2_SETTINGS_IV_LEN 3
#define H2_BINSETTINGS_LEN 80
-static size_t populate_settings(nghttp2_settings_entry *iv,
- struct Curl_easy *data)
-{
- iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
- iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
-
- iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
- iv[1].value = H2_STREAM_WINDOW_SIZE_INITIAL;
-
- iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
- iv[2].value = data->multi->push_cb != NULL;
-
- return 3;
-}
-
-static ssize_t populate_binsettings(uint8_t *binsettings,
- struct Curl_easy *data)
-{
- nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
- size_t ivlen;
-
- ivlen = populate_settings(iv, data);
- /* this returns number of bytes it wrote or a negative number on error. */
- return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
- iv, ivlen);
-}
-
struct cf_h2_ctx {
nghttp2_session *h2;
/* The easy handle used in the current filter call, cleared at return */
struct uint_hash streams; /* hash of `data->mid` to `h2_stream_ctx` */
size_t drain_total; /* sum of all stream's UrlState drain */
+ uint32_t initial_win_size; /* current initial window size (settings) */
uint32_t max_concurrent_streams;
uint32_t goaway_error; /* goaway error code from server */
int32_t remote_max_sid; /* max id processed by server */
}
}
+static uint32_t cf_h2_initial_win_size(struct Curl_easy *data)
+{
+#if NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
+ /* If the transfer has a rate-limit lower than the default initial
+ * stream window size, use that. It needs to be at least 8k or servers
+ * may be unhappy. */
+ if(data->progress.dl.rlimit.rate_per_step &&
+ (data->progress.dl.rlimit.rate_per_step < H2_STREAM_WINDOW_SIZE_INITIAL))
+ return CURLMAX((uint32_t)data->progress.dl.rlimit.rate_per_step, 8192);
+#endif
+ return H2_STREAM_WINDOW_SIZE_INITIAL;
+}
+
+static size_t populate_settings(nghttp2_settings_entry *iv,
+ struct Curl_easy *data,
+ struct cf_h2_ctx *ctx)
+{
+ iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
+ iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
+
+ iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ iv[1].value = cf_h2_initial_win_size(data);
+ if(ctx)
+ ctx->initial_win_size = iv[1].value;
+ iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+ iv[2].value = data->multi->push_cb != NULL;
+
+ return 3;
+}
+
+static ssize_t populate_binsettings(uint8_t *binsettings,
+ struct Curl_easy *data)
+{
+ nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
+ size_t ivlen;
+
+ ivlen = populate_settings(iv, data, NULL);
+ /* this returns number of bytes it wrote or a negative number on error. */
+ return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
+ iv, ivlen);
+}
+
+static CURLcode cf_h2_update_settings(struct cf_h2_ctx *ctx,
+ uint32_t initial_win_size)
+{
+ nghttp2_settings_entry entry;
+ entry.settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ entry.value = initial_win_size;
+ if(nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE, &entry, 1))
+ return CURLE_SEND_ERROR;
+ ctx->initial_win_size = initial_win_size;
+ return CURLE_OK;
+}
+
static CURLcode nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data);
static int32_t cf_h2_get_desired_local_win(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ curl_off_t avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
+ curlx_now());
+
(void)cf;
- if(data->set.max_recv_speed && data->set.max_recv_speed < INT32_MAX) {
- /* The transfer should only receive `max_recv_speed` bytes per second.
- * We restrict the stream's local window size, so that the server cannot
- * send us "too much" at a time.
- * This gets less precise the higher the latency. */
- return (int32_t)data->set.max_recv_speed;
+ if(avail < CURL_OFF_T_MAX) { /* limit in place */
+ if(avail <= 0)
+ return 0;
+ else if(avail < INT32_MAX)
+ return (int32_t)avail;
}
#ifdef DEBUGBUILD
- else {
+ {
struct cf_h2_ctx *ctx = cf->ctx;
CURL_TRC_CF(data, cf, "stream_win_max=%d", ctx->stream_win_max);
return ctx->stream_win_max;
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
- ivlen = populate_settings(iv, data);
+ ivlen = populate_settings(iv, data, ctx);
rc = nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE,
iv, ivlen);
if(rc) {
(void)len;
*pnread = 0;
+ if(!stream->xfer_result)
+ stream->xfer_result = cf_h2_update_local_win(cf, data, stream);
+
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%d] xfer write failed", stream->id);
result = stream->xfer_result;
nghttp2_priority_spec pri_spec;
size_t nwritten;
CURLcode result = CURLE_OK;
+ uint32_t initial_win_size;
*pnwritten = 0;
Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST);
if(!nghttp2_session_check_request_allowed(ctx->h2))
CURL_TRC_CF(data, cf, "send request NOT allowed (via nghttp2)");
+ /* Check the initial windows size of the transfer (rate-limits?) and
+ * send an updated settings on changes from previous value. */
+ initial_win_size = cf_h2_initial_win_size(data);
+ if(initial_win_size != ctx->initial_win_size) {
+ result = cf_h2_update_settings(ctx, initial_win_size);
+ if(result)
+ goto out;
+ }
+
switch(data->state.httpreq) {
case HTTPREQ_POST:
case HTTPREQ_POST_FORM:
data->req.size = -1;
/* Set the progress data */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
/* Carry out the perform */
result = imap_perform(data, &connected, dophase_done);
goto quit;
}
- Curl_pgrsSetDownloadCounter(data, 0);
+ Curl_pgrsReset(data);
rc = ldap_search_s(server, ludp->lud_dn,
(curl_ldap_num_t)ludp->lud_scope,
ludp->lud_filter, ludp->lud_attrs, 0, &ldapmsg);
#include "select.h"
#include "curlx/warnless.h"
#include "curlx/wait.h"
-#include "speedcheck.h"
#include "conncache.h"
#include "multihandle.h"
#include "sigpipe.h"
conn->handler->attach(data, conn);
}
+/* adjust pollset for rate limits/pauses */
+static CURLcode multi_adjust_pollset(struct Curl_easy *data,
+ struct easy_pollset *ps)
+{
+ CURLcode result = CURLE_OK;
+
+ if(ps->n) {
+ struct curltime now = curlx_now();
+ bool send_blocked, recv_blocked;
+
+ recv_blocked = (Curl_rlimit_avail(&data->progress.dl.rlimit, now) <= 0);
+ send_blocked = (Curl_rlimit_avail(&data->progress.ul.rlimit, now) <= 0);
+ if(send_blocked || recv_blocked) {
+ int i;
+ for(i = 0; i <= SECONDARYSOCKET; ++i) {
+ curl_socket_t sock = data->conn->sock[i];
+ if(sock == CURL_SOCKET_BAD)
+ continue;
+ if(recv_blocked && Curl_pollset_want_recv(data, ps, sock)) {
+ result = Curl_pollset_remove_in(data, ps, sock);
+ if(result)
+ break;
+ }
+ if(send_blocked && Curl_pollset_want_send(data, ps, sock)) {
+ result = Curl_pollset_remove_out(data, ps, sock);
+ if(result)
+ break;
+ }
+ }
+ }
+
+ /* Not blocked and wanting to receive. If there is data pending
+ * in the connection filters, make transfer run again. */
+ if(!recv_blocked &&
+ ((Curl_pollset_want_recv(data, ps, data->conn->sock[FIRSTSOCKET]) &&
+ Curl_conn_data_pending(data, FIRSTSOCKET)) ||
+ (Curl_pollset_want_recv(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
+ Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
+ CURL_TRC_M(data, "pollset[] has POLLIN, but there is still "
+ "buffered input -> mark as dirty");
+ Curl_multi_mark_dirty(data);
+ }
+ }
+ return result;
+}
+
static CURLcode mstate_connecting_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
- if(data->conn) {
- curl_socket_t sockfd = Curl_conn_get_first_socket(data);
- if(sockfd != CURL_SOCKET_BAD) {
- /* Default is to wait to something from the server */
- return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
- }
+ struct connectdata *conn = data->conn;
+ curl_socket_t sockfd;
+ CURLcode result = CURLE_OK;
+
+ if(Curl_xfer_recv_is_paused(data))
+ return CURLE_OK;
+ /* If a socket is set, receiving is default. If the socket
+ * has not been determined yet (eyeballing), always ask the
+ * connection filters for what to monitor. */
+ sockfd = Curl_conn_get_first_socket(data);
+ if(sockfd != CURL_SOCKET_BAD) {
+ result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
+ if(!result)
+ result = multi_adjust_pollset(data, ps);
}
- return CURLE_OK;
+ if(!result)
+ result = Curl_conn_adjust_pollset(data, conn, ps);
+ return result;
}
static CURLcode mstate_protocol_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
- if(conn) {
- curl_socket_t sockfd;
- if(conn->handler->proto_pollset)
- return conn->handler->proto_pollset(data, ps);
- sockfd = conn->sock[FIRSTSOCKET];
+ CURLcode result = CURLE_OK;
+
+ if(conn->handler->proto_pollset)
+ result = conn->handler->proto_pollset(data, ps);
+ else {
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
if(sockfd != CURL_SOCKET_BAD) {
/* Default is to wait to something from the server */
- return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
+ result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
}
}
- return CURLE_OK;
+ if(!result)
+ result = multi_adjust_pollset(data, ps);
+ if(!result)
+ result = Curl_conn_adjust_pollset(data, conn, ps);
+ return result;
}
static CURLcode mstate_do_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
- if(conn) {
- if(conn->handler->doing_pollset)
- return conn->handler->doing_pollset(data, ps);
- else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
- /* Default is that we want to send something to the server */
- return Curl_pollset_add_out(
- data, ps, conn->sock[conn->send_idx]);
- }
+ CURLcode result = CURLE_OK;
+
+ if(conn->handler->doing_pollset)
+ result = conn->handler->doing_pollset(data, ps);
+ else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
+ /* Default is that we want to send something to the server */
+ result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
- return CURLE_OK;
+ if(!result)
+ result = multi_adjust_pollset(data, ps);
+ if(!result)
+ result = Curl_conn_adjust_pollset(data, conn, ps);
+ return result;
}
static CURLcode mstate_domore_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
- if(conn) {
- if(conn->handler->domore_pollset)
- return conn->handler->domore_pollset(data, ps);
- else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
- /* Default is that we want to send something to the server */
- return Curl_pollset_add_out(
- data, ps, conn->sock[conn->send_idx]);
- }
+ CURLcode result = CURLE_OK;
+
+ if(conn->handler->domore_pollset)
+ result = conn->handler->domore_pollset(data, ps);
+ else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
+ /* Default is that we want to send something to the server */
+ result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
- return CURLE_OK;
+ if(!result)
+ result = multi_adjust_pollset(data, ps);
+ if(!result)
+ result = Curl_conn_adjust_pollset(data, conn, ps);
+ return result;
}
static CURLcode mstate_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
- if(!conn)
- return CURLE_OK;
- else if(conn->handler->perform_pollset)
- return conn->handler->perform_pollset(data, ps);
+ CURLcode result = CURLE_OK;
+
+ if(conn->handler->perform_pollset)
+ result = conn->handler->perform_pollset(data, ps);
else {
/* Default is to obey the data->req.keepon flags for send/recv */
- CURLcode result = CURLE_OK;
if(CURL_WANT_RECV(data) && CONN_SOCK_IDX_VALID(conn->recv_idx)) {
result = Curl_pollset_add_in(
data, ps, conn->sock[conn->recv_idx]);
result = Curl_pollset_add_out(
data, ps, conn->sock[conn->send_idx]);
}
- return result;
}
+ if(!result)
+ result = multi_adjust_pollset(data, ps);
+ if(!result)
+ result = Curl_conn_adjust_pollset(data, conn, ps);
+ return result;
}
/* Initializes `poll_set` with the current socket poll actions needed
* for transfer `data`. */
CURLMcode Curl_multi_pollset(struct Curl_easy *data,
- struct easy_pollset *ps,
- const char *caller)
+ struct easy_pollset *ps)
{
CURLMcode mresult = CURLM_OK;
CURLcode result = CURLE_OK;
- bool expect_sockets = TRUE;
/* If the transfer has no connection, this is fine. Happens when
called via curl_multi_remove_handle() => Curl_multi_ev_assess() =>
case MSTATE_SETUP:
case MSTATE_CONNECT:
/* nothing to poll for yet */
- expect_sockets = FALSE;
break;
case MSTATE_RESOLVING:
result = Curl_resolv_pollset(data, ps);
- /* connection filters are not involved in this phase. It is OK if we get no
- * sockets to wait for. Resolving can wake up from other sources. */
- expect_sockets = FALSE;
break;
case MSTATE_CONNECTING:
case MSTATE_TUNNELING:
- if(!Curl_xfer_recv_is_paused(data)) {
- result = mstate_connecting_pollset(data, ps);
- if(!result)
- result = Curl_conn_adjust_pollset(data, data->conn, ps);
- }
- else
- expect_sockets = FALSE;
+ result = mstate_connecting_pollset(data, ps);
break;
case MSTATE_PROTOCONNECT:
case MSTATE_PROTOCONNECTING:
result = mstate_protocol_pollset(data, ps);
- if(!result)
- result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DO:
case MSTATE_DOING:
result = mstate_do_pollset(data, ps);
- if(!result)
- result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DOING_MORE:
result = mstate_domore_pollset(data, ps);
- if(!result)
- result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DID: /* same as PERFORMING in regard to polling */
case MSTATE_PERFORMING:
result = mstate_perform_pollset(data, ps);
- if(!result)
- result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_RATELIMITING:
/* we need to let time pass, ignore socket(s) */
- expect_sockets = FALSE;
break;
case MSTATE_DONE:
case MSTATE_COMPLETED:
case MSTATE_MSGSENT:
/* nothing more to poll for */
- expect_sockets = FALSE;
break;
default:
failf(data, "multi_getsock: unexpected multi state %d", data->mstate);
DEBUGASSERT(0);
- expect_sockets = FALSE;
break;
}
goto out;
}
- /* Unblocked and waiting to receive with buffered input.
- * Make transfer run again at next opportunity. */
- if(!Curl_xfer_is_blocked(data) && !Curl_xfer_is_too_fast(data) &&
- ((Curl_pollset_want_read(data, ps, data->conn->sock[FIRSTSOCKET]) &&
- Curl_conn_data_pending(data, FIRSTSOCKET)) ||
- (Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
- Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
- CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
- "buffered input to consume -> mark as dirty", caller);
- Curl_multi_mark_dirty(data);
- }
-
#ifndef CURL_DISABLE_VERBOSE_STRINGS
if(CURL_TRC_M_is_verbose(data)) {
size_t timeout_count = Curl_llist_count(&data->state.timeoutlist);
switch(ps->n) {
case 0:
- CURL_TRC_M(data, "%s pollset[], timeouts=%zu, paused %d/%d (r/w)",
- caller, timeout_count,
+ CURL_TRC_M(data, "pollset[], timeouts=%zu, paused %d/%d (r/w)",
+ timeout_count,
Curl_xfer_send_is_paused(data),
Curl_xfer_recv_is_paused(data));
break;
case 1:
- CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
- caller, ps->sockets[0],
+ CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
+ ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
timeout_count);
break;
case 2:
- CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s, "
+ CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s, "
"fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
- caller, ps->sockets[0],
+ ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
ps->sockets[1],
timeout_count);
break;
default:
- CURL_TRC_M(data, "%s pollset[fds=%u], timeouts=%zu",
- caller, ps->n, timeout_count);
+ CURL_TRC_M(data, "pollset[fds=%u], timeouts=%zu",
+ ps->n, timeout_count);
break;
}
CURL_TRC_EASY_TIMERS(data);
}
#endif
- if(expect_sockets && !ps->n && data->multi &&
- !Curl_uint_bset_contains(&data->multi->dirty, data->mid) &&
- !Curl_llist_count(&data->state.timeoutlist) &&
- !Curl_cwriter_is_paused(data) && !Curl_creader_is_paused(data) &&
- Curl_conn_is_ip_connected(data, FIRSTSOCKET)) {
- /* We expected sockets for POLL monitoring, but none are set.
- * We are not dirty (and run anyway).
- * We are not waiting on any timer.
- * None of the READ/WRITE directions are paused.
- * We are connected to the server on IP level, at least. */
- infof(data, "WARNING: no socket in pollset or timer, transfer may stall!");
- DEBUGASSERT(0);
- }
out:
return mresult;
}
continue;
}
- Curl_multi_pollset(data, &ps, "curl_multi_fdset");
+ Curl_multi_pollset(data, &ps);
for(i = 0; i < ps.n; i++) {
if(!FDSET_SOCK(ps.sockets[i]))
/* pretend it does not exist */
Curl_uint_bset_remove(&multi->dirty, mid);
continue;
}
- Curl_multi_pollset(data, &ps, "curl_multi_waitfds");
+ Curl_multi_pollset(data, &ps);
need += Curl_waitfds_add_ps(&cwfds, &ps);
}
while(Curl_uint_bset_next(&multi->process, mid, &mid));
Curl_uint_bset_remove(&multi->dirty, mid);
continue;
}
- Curl_multi_pollset(data, &ps, "multi_wait");
+ Curl_multi_pollset(data, &ps);
if(Curl_pollfds_add_ps(&cpfds, &ps)) {
result = CURLM_OUT_OF_MEMORY;
goto out;
}
static CURLcode mspeed_check(struct Curl_easy *data,
- struct curltime *nowp)
+ struct curltime now)
{
timediff_t recv_wait_ms = 0;
timediff_t send_wait_ms = 0;
- /* check if over send speed */
- if(data->set.max_send_speed)
- send_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.ul,
- data->set.max_send_speed,
- *nowp);
-
- /* check if over recv speed */
- if(data->set.max_recv_speed)
- recv_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.dl,
- data->set.max_recv_speed,
- *nowp);
+ /* check if our send/recv limits require idle waits */
+ send_wait_ms = Curl_rlimit_wait_ms(&data->progress.ul.rlimit, now);
+ recv_wait_ms = Curl_rlimit_wait_ms(&data->progress.dl.rlimit, now);
if(send_wait_ms || recv_wait_ms) {
if(data->mstate != MSTATE_RATELIMITING) {
- Curl_ratelimit(data, *nowp);
multistate(data, MSTATE_RATELIMITING);
}
Curl_expire(data, CURLMAX(send_wait_ms, recv_wait_ms), EXPIRE_TOOFAST);
Curl_multi_clear_dirty(data);
+ CURL_TRC_M(data, "[RLIMIT] waiting %" FMT_TIMEDIFF_T "ms",
+ CURLMAX(send_wait_ms, recv_wait_ms));
return CURLE_AGAIN;
}
else if(data->mstate != MSTATE_PERFORMING) {
+ CURL_TRC_M(data, "[RLIMIT] wait over, continue");
multistate(data, MSTATE_PERFORMING);
- Curl_ratelimit(data, *nowp);
}
return CURLE_OK;
}
CURLcode result = *resultp = CURLE_OK;
*stream_errorp = FALSE;
- if(mspeed_check(data, nowp) == CURLE_AGAIN)
+ if(mspeed_check(data, *nowp) == CURLE_AGAIN)
return CURLM_OK;
/* read/write data if it is ready to do so */
}
}
else { /* not errored, not done */
- mspeed_check(data, nowp);
+ *nowp = curlx_now();
+ mspeed_check(data, *nowp);
}
free(newurl);
*resultp = result;
CURLMcode rc = CURLM_OK;
DEBUGASSERT(data->conn);
/* if both rates are within spec, resume transfer */
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, *nowp);
+ result = Curl_pgrsCheck(data);
if(result) {
if(!(data->conn->handler->flags & PROTOPT_DUAL) &&
multi_done(data, result, TRUE);
}
else {
- if(!mspeed_check(data, nowp))
+ if(!mspeed_check(data, *nowp))
rc = CURLM_CALL_MULTI_PERFORM;
}
*resultp = result;
(HTTP/2), or the full connection for older protocols */
bool stream_error = FALSE;
rc = CURLM_OK;
+ /* update at start for continuous increase when looping */
+ *nowp = curlx_now();
if(multi_ischanged(multi, TRUE)) {
CURL_TRC_M(data, "multi changed, check CONNECT_PEND queue");
rc = CURLM_CALL_MULTI_PERFORM;
}
/* if there is still a connection to use, call the progress function */
- else if(data->conn && Curl_pgrsUpdate(data)) {
- /* aborted due to progress callback return code must close the
- connection */
- result = CURLE_ABORTED_BY_CALLBACK;
- streamclose(data->conn, "Aborted by callback");
-
- /* if not yet in DONE state, go there, otherwise COMPLETED */
- multistate(data, (data->mstate < MSTATE_DONE) ?
- MSTATE_DONE : MSTATE_COMPLETED);
- rc = CURLM_CALL_MULTI_PERFORM;
+ else if(data->conn) {
+ result = Curl_pgrsUpdate(data);
+ if(result) {
+ /* aborted due to progress callback return code must close the
+ connection */
+ streamclose(data->conn, "Aborted by callback");
+
+ /* if not yet in DONE state, go there, otherwise COMPLETED */
+ multistate(data, (data->mstate < MSTATE_DONE) ?
+ MSTATE_DONE : MSTATE_COMPLETED);
+ rc = CURLM_CALL_MULTI_PERFORM;
+ }
}
}
}
}
else
- Curl_multi_pollset(data, &ps, "ev assess");
+ Curl_multi_pollset(data, &ps);
last_ps = mev_get_last_pollset(data, conn);
if(!last_ps && ps.n) {
unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi);
CURLMcode Curl_multi_pollset(struct Curl_easy *data,
- struct easy_pollset *ps,
- const char *caller);
+ struct easy_pollset *ps);
/**
* Borrow the transfer buffer from the multi, suitable
#include "sendf.h"
#include "select.h"
#include "progress.h"
-#include "speedcheck.h"
#include "pingpong.h"
#include "multiif.h"
#include "vtls/vtls.h"
if(block) {
/* if we did not wait, we do not have to spend time on this now */
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, curlx_now());
-
+ result = Curl_pgrsCheck(data);
if(result)
return result;
}
data->req.size = -1;
/* Set the progress data */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
/* Carry out the perform */
result = pop3_perform(data, &connected, dophase_done);
#include "sendf.h"
#include "multiif.h"
#include "progress.h"
+#include "transfer.h"
#include "curlx/timeval.h"
/* check rate limits within this many recent milliseconds, at minimum. */
}
#endif
+static void pgrs_speedinit(struct Curl_easy *data)
+{
+ memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
+}
+
+/*
+ * @unittest: 1606
+ */
+UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
+ struct curltime *pnow)
+{
+ if(!data->set.low_speed_time || !data->set.low_speed_limit ||
+ Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
+ /* A paused transfer is not qualified for speed checks */
+ return CURLE_OK;
+
+ if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
+ if(data->progress.current_speed < data->set.low_speed_limit) {
+ if(!data->state.keeps_speed.tv_sec)
+ /* under the limit at this moment */
+ data->state.keeps_speed = *pnow;
+ else {
+ /* how long has it been under the limit */
+ timediff_t howlong = curlx_timediff_ms(*pnow, data->state.keeps_speed);
+
+ if(howlong >= data->set.low_speed_time * 1000) {
+ /* too long */
+ failf(data,
+ "Operation too slow. "
+ "Less than %ld bytes/sec transferred the last %ld seconds",
+ data->set.low_speed_limit,
+ data->set.low_speed_time);
+ return CURLE_OPERATION_TIMEDOUT;
+ }
+ }
+ }
+ else
+ /* faster right now */
+ data->state.keeps_speed.tv_sec = 0;
+ }
+
+ if(data->set.low_speed_limit)
+ /* if low speed limit is enabled, set the expire timer to make this
+ connection's speed get checked again in a second */
+ Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
+
+ return CURLE_OK;
+}
+
/*
New proposed interface, 9th of February 2000:
* hidden */
curl_mfprintf(data->set.err, "\n");
- data->progress.speeder_c = 0; /* reset the progress meter display */
return 0;
}
+void Curl_pgrsReset(struct Curl_easy *data)
+{
+ Curl_pgrsSetUploadCounter(data, 0);
+ Curl_pgrsSetDownloadCounter(data, 0);
+ Curl_pgrsSetUploadSize(data, -1);
+ Curl_pgrsSetDownloadSize(data, -1);
+ data->progress.speeder_c = 0; /* reset speed records */
+ pgrs_speedinit(data);
+}
+
/* reset the known transfer sizes */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data)
{
Curl_pgrsSetUploadSize(data, -1);
}
+void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable)
+{
+ if(!enable) {
+ data->progress.speeder_c = 0; /* reset speed records */
+ pgrs_speedinit(data); /* reset low speed measurements */
+ }
+}
+
/*
*
* Curl_pgrsTimeWas(). Store the timestamp time at the given label.
p->speeder_c = 0; /* reset the progress meter display */
p->start = curlx_now();
p->is_t_startransfer_set = FALSE;
- p->ul.limit.start = p->start;
- p->dl.limit.start = p->start;
- p->ul.limit.start_size = 0;
- p->dl.limit.start_size = 0;
p->dl.cur_size = 0;
p->ul.cur_size = 0;
/* the sizes are unknown at start */
p->dl_size_known = FALSE;
p->ul_size_known = FALSE;
- Curl_ratelimit(data, p->start);
-}
-
-/*
- * This is used to handle speed limits, calculating how many milliseconds to
- * wait until we are back under the speed limit, if needed.
- *
- * The way it works is by having a "starting point" (time & amount of data
- * transferred by then) used in the speed computation, to be used instead of
- * the start of the transfer. This starting point is regularly moved as
- * transfer goes on, to keep getting accurate values (instead of average over
- * the entire transfer).
- *
- * This function takes the current amount of data transferred, the amount at
- * the starting point, the limit (in bytes/s), the time of the starting point
- * and the current time.
- *
- * Returns 0 if no waiting is needed or when no waiting is needed but the
- * starting point should be reset (to current); or the number of milliseconds
- * to wait to get back under the speed limit.
- */
-timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
- curl_off_t bytes_per_sec,
- struct curltime now)
-{
- curl_off_t bytes = d->cur_size - d->limit.start_size;
- timediff_t should_ms;
- timediff_t took_ms;
-
- /* no limit or we did not get to any bytes yet */
- if(!bytes_per_sec || !bytes)
- return 0;
-
- /* The time it took us to have `bytes` */
- took_ms = curlx_timediff_ceil_ms(now, d->limit.start);
-
- /* The time it *should* have taken us to have `bytes`
- * when obeying the bytes_per_sec speed_limit. */
- if(bytes < CURL_OFF_T_MAX/1000) {
- /* (1000 * bytes / (bytes / sec)) = 1000 * sec = ms */
- should_ms = (timediff_t) (1000 * bytes / bytes_per_sec);
- }
- else {
- /* large `bytes`, first calc the seconds it should have taken.
- * if that is small enough, convert to milliseconds. */
- should_ms = (timediff_t) (bytes / bytes_per_sec);
- if(should_ms < TIMEDIFF_T_MAX/1000)
- should_ms *= 1000;
- else
- should_ms = TIMEDIFF_T_MAX;
- }
-
- if(took_ms < should_ms) {
- /* when gotten to `bytes` too fast, wait the difference */
- return should_ms - took_ms;
- }
- return 0;
}
/*
data->progress.dl.cur_size = size;
}
-/*
- * Update the timestamp and sizestamp to use for rate limit calculations.
- */
-void Curl_ratelimit(struct Curl_easy *data, struct curltime now)
-{
- /* do not set a new stamp unless the time since last update is long enough */
- if(data->set.max_recv_speed) {
- if(curlx_timediff_ms(now, data->progress.dl.limit.start) >=
- MIN_RATE_LIMIT_PERIOD) {
- data->progress.dl.limit.start = now;
- data->progress.dl.limit.start_size = data->progress.dl.cur_size;
- }
- }
- if(data->set.max_send_speed) {
- if(curlx_timediff_ms(now, data->progress.ul.limit.start) >=
- MIN_RATE_LIMIT_PERIOD) {
- data->progress.ul.limit.start = now;
- data->progress.ul.limit.start_size = data->progress.ul.cur_size;
- }
- }
-}
-
/*
* Set the number of uploaded bytes so far.
*/
}
/* returns TRUE if it is time to show the progress meter */
-static bool progress_calc(struct Curl_easy *data, struct curltime now)
+static bool progress_calc(struct Curl_easy *data, struct curltime *pnow)
{
- bool timetoshow = FALSE;
struct Progress * const p = &data->progress;
+ int i_next, i_oldest, i_latest;
+ timediff_t duration_ms;
+ curl_off_t amount;
/* The time spent so far (from the start) in microseconds */
- p->timespent = curlx_timediff_us(now, p->start);
+ p->timespent = curlx_timediff_us(*pnow, p->start);
p->dl.speed = trspeed(p->dl.cur_size, p->timespent);
p->ul.speed = trspeed(p->ul.cur_size, p->timespent);
- /* Calculations done at most once a second, unless end is reached */
- if(p->lastshow != now.tv_sec) {
- int countindex; /* amount of seconds stored in the speeder array */
- int nowindex = p->speeder_c% CURR_TIME;
- p->lastshow = now.tv_sec;
- timetoshow = TRUE;
-
- /* Let's do the "current speed" thing, with the dl + ul speeds
- combined. Store the speed at entry 'nowindex'. */
- p->speeder[ nowindex ] = p->dl.cur_size + p->ul.cur_size;
-
- /* remember the exact time for this moment */
- p->speeder_time [ nowindex ] = now;
-
- /* advance our speeder_c counter, which is increased every time we get
- here and we expect it to never wrap as 2^32 is a lot of seconds! */
+ if(!p->speeder_c) { /* no previous record exists */
+ p->speed_amount[0] = p->dl.cur_size + p->ul.cur_size;
+ p->speed_time[0] = *pnow;
p->speeder_c++;
-
- /* figure out how many index entries of data we have stored in our speeder
- array. With N_ENTRIES filled in, we have about N_ENTRIES-1 seconds of
- transfer. Imagine, after one second we have filled in two entries,
- after two seconds we have filled in three entries etc. */
- countindex = ((p->speeder_c >= CURR_TIME) ? CURR_TIME : p->speeder_c) - 1;
-
- /* first of all, we do not do this if there is no counted seconds yet */
- if(countindex) {
- int checkindex;
- timediff_t span_ms;
- curl_off_t amount;
-
- /* Get the index position to compare with the 'nowindex' position.
- Get the oldest entry possible. While we have less than CURR_TIME
- entries, the first entry will remain the oldest. */
- checkindex = (p->speeder_c >= CURR_TIME) ? p->speeder_c%CURR_TIME : 0;
-
- /* Figure out the exact time for the time span */
- span_ms = curlx_timediff_ms(now, p->speeder_time[checkindex]);
- if(span_ms == 0)
- span_ms = 1; /* at least one millisecond MUST have passed */
-
- /* Calculate the average speed the last 'span_ms' milliseconds */
- amount = p->speeder[nowindex]- p->speeder[checkindex];
-
- if(amount > (0xffffffff/1000))
- /* the 'amount' value is bigger than would fit in 32 bits if
- multiplied with 1000, so we use the double math for this */
- p->current_speed = (curl_off_t)
- ((double)amount/((double)span_ms/1000.0));
- else
- /* the 'amount' value is small enough to fit within 32 bits even
- when multiplied with 1000 */
- p->current_speed = amount * 1000/span_ms;
+ /* use the overall average at the start */
+ p->current_speed = p->ul.speed + p->dl.speed;
+ p->lastshow = pnow->tv_sec;
+ return TRUE;
+ }
+ /* We have at least one record now. Where to put the next and
+ * where is the latest one? */
+ i_next = p->speeder_c % CURL_SPEED_RECORDS;
+ i_latest = (i_next > 0) ? (i_next - 1) : (CURL_SPEED_RECORDS - 1);
+
+ /* Make a new record only when some time has passed.
+ * Too frequent calls otherwise ruin the history. */
+ if(curlx_timediff_ms(*pnow, p->speed_time[i_latest]) >= 1000) {
+ p->speeder_c++;
+ i_latest = i_next;
+ p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
+ p->speed_time[i_latest] = *pnow;
+ }
+ else if(data->req.done) {
+ /* When a transfer is done, and we did not have a current speed
+ * already, update the last record. Otherwise, stay at the speed
+ * we have. The last chunk of data, when rate limiting, would increase
+ * reported speed since it no longer measures a full second. */
+ if(!p->current_speed) {
+ p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
+ p->speed_time[i_latest] = *pnow;
}
- else
- /* the first second we use the average */
- p->current_speed = p->ul.speed + p->dl.speed;
+ }
+ else {
+ /* transfer ongoing, wait for more time to pass. */
+ return FALSE;
+ }
- } /* Calculations end */
- return timetoshow;
+ i_oldest = (p->speeder_c < CURL_SPEED_RECORDS) ? 0 :
+ ((i_latest + 1) % CURL_SPEED_RECORDS);
+
+ /* How much we transferred between oldest and current records */
+ amount = p->speed_amount[i_latest]- p->speed_amount[i_oldest];
+ /* How long this took */
+ duration_ms = curlx_timediff_ms(p->speed_time[i_latest],
+ p->speed_time[i_oldest]);
+ if(duration_ms <= 0)
+ duration_ms = 1;
+
+ if(amount > (CURL_OFF_T_MAX/1000)) {
+ /* the 'amount' value is bigger than would fit in 64 bits if
+ multiplied with 1000, so we use the double math for this */
+ p->current_speed = (curl_off_t)
+ (((double)amount * 1000.0)/(double)duration_ms);
+ }
+ else {
+ /* the 'amount' value is small enough to fit within 32 bits even
+ when multiplied with 1000 */
+ p->current_speed = amount * 1000 / duration_ms;
+ }
+
+ if((p->lastshow == pnow->tv_sec) && !data->req.done)
+ return FALSE;
+ p->lastshow = pnow->tv_sec;
+ return TRUE;
}
#ifndef CURL_DISABLE_PROGRESS_METER
* Curl_pgrsUpdate() returns 0 for success or the value returned by the
* progress callback!
*/
-static int pgrsupdate(struct Curl_easy *data, bool showprogress)
+static CURLcode pgrsupdate(struct Curl_easy *data, bool showprogress)
{
if(!data->progress.hide) {
if(data->set.fxferinfo) {
data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) {
- if(result)
+ if(result) {
failf(data, "Callback aborted");
- return result;
+ return CURLE_ABORTED_BY_CALLBACK;
+ }
+ return CURLE_OK;
}
}
else if(data->set.fprogress) {
(double)data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) {
- if(result)
+ if(result) {
failf(data, "Callback aborted");
- return result;
+ return CURLE_ABORTED_BY_CALLBACK;
+ }
+ return CURLE_OK;
}
}
progress_meter(data);
}
- return 0;
+ return CURLE_OK;
}
-int Curl_pgrsUpdate(struct Curl_easy *data)
+static CURLcode pgrs_update(struct Curl_easy *data, struct curltime *pnow)
{
- struct curltime now = curlx_now(); /* what time is it */
- bool showprogress = progress_calc(data, now);
+ bool showprogress = progress_calc(data, pnow);
return pgrsupdate(data, showprogress);
}
+CURLcode Curl_pgrsUpdate(struct Curl_easy *data)
+{
+ struct curltime now = curlx_now(); /* what time is it */
+ return pgrs_update(data, &now);
+}
+
+CURLcode Curl_pgrsCheck(struct Curl_easy *data)
+{
+ struct curltime now = curlx_now();
+ CURLcode result;
+
+ result = pgrs_update(data, &now);
+ if(!result && !data->req.done)
+ result = pgrs_speedcheck(data, &now);
+ return result;
+}
+
/*
* Update all progress, do not do progress meter/callbacks.
*/
void Curl_pgrsUpdate_nometer(struct Curl_easy *data)
{
struct curltime now = curlx_now(); /* what time is it */
- (void)progress_calc(data, now);
+ (void)progress_calc(data, &now);
}
#include "curlx/timeval.h"
+struct Curl_easy;
typedef enum {
TIMER_NONE,
void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size);
void Curl_pgrsSetUploadCounter(struct Curl_easy *data, curl_off_t size);
-void Curl_ratelimit(struct Curl_easy *data, struct curltime now);
-int Curl_pgrsUpdate(struct Curl_easy *data);
+
+/* perform progress update, invoking callbacks at intervals */
+CURLcode Curl_pgrsUpdate(struct Curl_easy *data);
+/* perform progress update, no callbacks invoked */
void Curl_pgrsUpdate_nometer(struct Curl_easy *data);
+/* perform progress update with callbacks and speed checks */
+CURLcode Curl_pgrsCheck(struct Curl_easy *data);
+
+/* Inform progress/speedcheck about receive pausing */
+void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable);
+/* Reset sizes and couners for up- and download. */
+void Curl_pgrsReset(struct Curl_easy *data);
+/* Reset sizes for up- and download. */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data);
+
struct curltime Curl_pgrsTime(struct Curl_easy *data, timerid timer);
-timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
- curl_off_t speed_limit,
- struct curltime now);
/**
* Update progress timer with the elapsed time from its start to `timestamp`.
* This allows updating timers later and is used by happy eyeballing, where
void Curl_pgrsEarlyData(struct Curl_easy *data, curl_off_t sent);
+#ifdef UNITTESTS
+UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
+ struct curltime *pnow);
+#endif
+
#endif /* HEADER_CURL_PROGRESS_H */
--- /dev/null
+/***************************************************************************
+ * _ _ ____ _
+ * Project ___| | | | _ \| |
+ * / __| | | | |_) | |
+ * | (__| |_| | _ <| |___
+ * \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ * SPDX-License-Identifier: curl
+ *
+ ***************************************************************************/
+
+#include "curl_setup.h"
+
+#include "curlx/timeval.h"
+#include "ratelimit.h"
+
+
+#define CURL_US_PER_SEC 1000000
+#define CURL_RLIMIT_MIN_CHUNK (16 * 1024)
+#define CURL_RLIMIT_MAX_STEPS 2 /* 500ms interval */
+
+void Curl_rlimit_init(struct Curl_rlimit *r,
+ curl_off_t rate_per_s,
+ curl_off_t burst_per_s,
+ struct curltime ts)
+{
+ curl_off_t rate_steps;
+
+ DEBUGASSERT(rate_per_s >= 0);
+ DEBUGASSERT(burst_per_s >= rate_per_s || !burst_per_s);
+ r->step_us = CURL_US_PER_SEC;
+ r->rate_per_step = rate_per_s;
+ r->burst_per_step = burst_per_s;
+ /* On rates that are multiples of CURL_RLIMIT_MIN_CHUNK, we reduce
+ * the interval `step_us` from 1 second to smaller steps with at
+ * most CURL_RLIMIT_MAX_STEPS.
+ * Smaller means more CPU, but also more precision. */
+ rate_steps = rate_per_s / CURL_RLIMIT_MIN_CHUNK;
+ rate_steps = CURLMIN(rate_steps, CURL_RLIMIT_MAX_STEPS);
+ if(rate_steps >= 2) {
+ r->step_us /= rate_steps;
+ r->rate_per_step /= rate_steps;
+ r->burst_per_step /= rate_steps;
+ }
+ r->tokens = r->rate_per_step;
+ r->spare_us = 0;
+ r->ts = ts;
+ r->blocked = FALSE;
+}
+
+void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts)
+{
+ r->tokens = r->rate_per_step;
+ r->spare_us = 0;
+ r->ts = ts;
+}
+
+bool Curl_rlimit_active(struct Curl_rlimit *r)
+{
+ return (r->rate_per_step > 0) || r->blocked;
+}
+
+bool Curl_rlimit_is_blocked(struct Curl_rlimit *r)
+{
+ return r->blocked;
+}
+
+static void ratelimit_update(struct Curl_rlimit *r,
+ struct curltime ts)
+{
+ timediff_t elapsed_us, elapsed_steps;
+ curl_off_t token_gain;
+
+ DEBUGASSERT(r->rate_per_step);
+ if((r->ts.tv_sec == ts.tv_sec) && (r->ts.tv_usec == ts.tv_usec))
+ return;
+
+ elapsed_us = curlx_timediff_us(ts, r->ts);
+ if(elapsed_us < 0) { /* not going back in time */
+ curl_mfprintf(stderr, "rlimit: neg elapsed time %" FMT_TIMEDIFF_T "us\n",
+ elapsed_us);
+ DEBUGASSERT(0);
+ return;
+ }
+
+ elapsed_us += r->spare_us;
+ if(elapsed_us < r->step_us)
+ return;
+
+ /* we do the update */
+ r->ts = ts;
+ elapsed_steps = elapsed_us / r->step_us;
+ r->spare_us = elapsed_us % r->step_us;
+
+ /* How many tokens did we gain since the last update? */
+ if(r->rate_per_step > (CURL_OFF_T_MAX / elapsed_steps))
+ token_gain = CURL_OFF_T_MAX;
+ else {
+ token_gain = r->rate_per_step * elapsed_steps;
+ }
+
+ /* Limit the token again by the burst rate per second (if set), so we
+ * do not suddenly have a huge number of tokens after inactivity. */
+ r->tokens += token_gain;
+ if(r->burst_per_step && (r->tokens > r->burst_per_step)) {
+ r->tokens = r->burst_per_step;
+ }
+}
+
+curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
+ struct curltime ts)
+{
+ if(r->blocked)
+ return 0;
+ else if(r->rate_per_step) {
+ ratelimit_update(r, ts);
+ return r->tokens;
+ }
+ else
+ return CURL_OFF_T_MAX;
+}
+
+void Curl_rlimit_drain(struct Curl_rlimit *r,
+ size_t tokens,
+ struct curltime ts)
+{
+ if(r->blocked || !r->rate_per_step)
+ return;
+
+ ratelimit_update(r, ts);
+#if SIZEOF_CURL_OFF_T <= SIZEOF_SIZE_T
+ if(tokens > CURL_OFF_T_MAX) {
+ r->tokens = CURL_OFF_T_MIN;
+ return;
+ }
+ else
+#endif
+ {
+ curl_off_t val = (curl_off_t)tokens;
+ if((CURL_OFF_T_MIN + val) < r->tokens)
+ r->tokens -= val;
+ else
+ r->tokens = CURL_OFF_T_MIN;
+ }
+}
+
+timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
+ struct curltime ts)
+{
+ timediff_t wait_us, elapsed_us;
+
+ if(r->blocked || !r->rate_per_step)
+ return 0;
+ ratelimit_update(r, ts);
+ if(r->tokens > 0)
+ return 0;
+
+ /* How much time will it take tokens to become positive again?
+ * Deduct `spare_us` and check against already elapsed time */
+ wait_us = (1 + (-r->tokens / r->rate_per_step)) * r->step_us;
+ wait_us -= r->spare_us;
+
+ elapsed_us = curlx_timediff_us(ts, r->ts);
+ if(elapsed_us >= wait_us)
+ return 0;
+ wait_us -= elapsed_us;
+ return (wait_us + 999) / 1000; /* in milliseconds */
+}
+
+void Curl_rlimit_block(struct Curl_rlimit *r,
+ bool activate,
+ struct curltime ts)
+{
+ if(!activate == !r->blocked)
+ return;
+
+ r->ts = ts;
+ r->blocked = activate;
+ if(!r->blocked) {
+ /* Start rate limiting fresh. The amount of time this was blocked
+ * does not generate extra tokens. */
+ Curl_rlimit_start(r, ts);
+ }
+ else {
+ r->tokens = 0;
+ }
+}
--- /dev/null
+#ifndef HEADER_Curl_rlimit_H
+#define HEADER_Curl_rlimit_H
+/***************************************************************************
+ * _ _ ____ _
+ * Project ___| | | | _ \| |
+ * / __| | | | |_) | |
+ * | (__| |_| | _ <| |___
+ * \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ * SPDX-License-Identifier: curl
+ *
+ ***************************************************************************/
+
+#include "curlx/timeval.h"
+
+/* This is a rate limiter that provides "tokens" to be consumed
+ * per second with a "burst" rate limitation. Example:
+ * A rate limit of 1 megabyte per second with a burst rate of 1.5MB.
+ * - initially 1 million tokens are available.
+ * - these are drained in the first second.
+ * - checking available tokens before the 2nd second will return 0.
+ * - at/after the 2nd second, 1 million tokens are available again.
+ * - nothing happens for a second, the 1 million tokens would grow
+ * to 2 million, however the burst limit caps those at 1.5 million.
+ * Thus:
+ * - setting "burst" to CURL_OFF_T_MAX would average tokens over the
+ * complete lifetime. E.g. for a download, at the *end* of it, the
+ * average rate from start to finish would be the rate limit.
+ * - setting "burst" to the same value as "rate" would make a
+ * download always try to stay *at/below* the rate and slow times will
+ * not generate extra tokens.
+ * A rate limit can be blocked, causing the available tokens to become
+ * always 0 until unblocked. After unblocking, the rate limiting starts
+ * again with no history of the past.
+ * Finally, a rate limiter with rate 0 will always have CURL_OFF_T_MAX
+ * tokens available, unless blocked.
+ */
+
+struct Curl_rlimit {
+ curl_off_t rate_per_step; /* rate tokens are generated per step us */
+ curl_off_t burst_per_step; /* burst rate of tokens per step us */
+ timediff_t step_us; /* microseconds between token increases */
+ curl_off_t tokens; /* tokens available in the next second */
+ timediff_t spare_us; /* microseconds unaffecting tokens */
+ struct curltime ts; /* time of the last update */
+ BIT(blocked); /* blocking sets available tokens to 0 */
+};
+
+void Curl_rlimit_init(struct Curl_rlimit *r,
+ curl_off_t rate_per_s,
+ curl_off_t burst_per_s,
+ struct curltime ts);
+
+/* Start ratelimiting with the given timestamp. Resets available tokens. */
+void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts);
+
+/* How many milliseconds to wait until token are available again. */
+timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
+ struct curltime ts);
+
+/* Return if rate limiting of tokens is active */
+bool Curl_rlimit_active(struct Curl_rlimit *r);
+bool Curl_rlimit_is_blocked(struct Curl_rlimit *r);
+
+/* Return how many tokens are available to spend, may be negative */
+curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
+ struct curltime ts);
+
+/* Drain tokens from the ratelimit, return how many are now available. */
+void Curl_rlimit_drain(struct Curl_rlimit *r,
+ size_t tokens,
+ struct curltime ts);
+
+/* Block/unblock ratelimiting. A blocked ratelimit has 0 tokens available. */
+void Curl_rlimit_block(struct Curl_rlimit *r,
+ bool activate,
+ struct curltime ts);
+
+#endif /* HEADER_Curl_rlimit_H */
{
DEBUGASSERT(!data->req.upload_done);
data->req.upload_done = TRUE;
- data->req.keepon &= ~(KEEP_SEND|KEEP_SEND_TIMED); /* we are done sending */
+ data->req.keepon &= ~KEEP_SEND; /* we are done sending */
Curl_pgrsTime(data, TIMER_POSTRANSFER);
Curl_creader_done(data, data->req.upload_aborted);
* - or request has buffered data to send
* - or transfer connection has pending data to send */
return !data->req.done &&
- (((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) ||
- !Curl_req_sendbuf_empty(data) ||
- Curl_xfer_needs_flush(data));
+ ((data->req.keepon & KEEP_SEND) ||
+ !Curl_req_sendbuf_empty(data) ||
+ Curl_xfer_needs_flush(data));
}
bool Curl_req_done_sending(struct Curl_easy *data)
if(!data->req.upload_done) {
Curl_bufq_reset(&data->req.sendbuf);
data->req.upload_aborted = TRUE;
- /* no longer KEEP_SEND and KEEP_SEND_PAUSE */
- data->req.keepon &= ~KEEP_SENDBITS;
+ data->req.keepon &= ~KEEP_SEND;
return req_set_upload_done(data);
}
return CURLE_OK;
/* stop receiving and ALL sending as well, including PAUSE and HOLD.
* We might still be paused on receive client writes though, so
* keep those bits around. */
- data->req.keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
+ data->req.keepon &= ~(KEEP_RECV|KEEP_SEND);
return Curl_req_abort_sending(data);
}
BIT(sendbuf_init); /* sendbuf is initialized */
BIT(shutdown); /* request end will shutdown connection */
BIT(shutdown_err_ignore); /* errors in shutdown will not fail request */
+ BIT(reader_started); /* client reads have started */
};
/**
ZERO_NULL, /* proto_pollset */
rtsp_do_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
- ZERO_NULL, /* perform_pollset */
+ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
rtsp_rtp_write_resp, /* write_resp */
rtsp_rtp_write_resp_hd, /* write_resp_hd */
/* if a request-body has been sent off, we make sure this progress is
noted properly */
Curl_pgrsSetUploadCounter(data, data->req.writebytecount);
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
+ result = Curl_pgrsUpdate(data);
}
out:
curlx_dyn_free(&req_buffer);
*pwant_read = *pwant_write = FALSE;
}
-bool Curl_pollset_want_read(struct Curl_easy *data,
+bool Curl_pollset_want_recv(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock)
{
}
return FALSE;
}
+
+bool Curl_pollset_want_send(struct Curl_easy *data,
+ struct easy_pollset *ps,
+ curl_socket_t sock)
+{
+ unsigned int i;
+ (void)data;
+ for(i = 0; i < ps->n; ++i) {
+ if((ps->sockets[i] == sock) && (ps->actions[i] & CURL_POLL_OUT))
+ return TRUE;
+ }
+ return FALSE;
+}
#define Curl_pollset_add_in(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_IN, 0)
+#define Curl_pollset_remove_in(data, ps, sock) \
+ Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_IN)
#define Curl_pollset_add_out(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_OUT, 0)
+#define Curl_pollset_remove_out(data, ps, sock) \
+ Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_OUT)
#define Curl_pollset_add_inout(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), \
CURL_POLL_IN|CURL_POLL_OUT, 0)
struct easy_pollset *ps, curl_socket_t sock,
bool *pwant_read, bool *pwant_write);
-/**
- * Return TRUE if the pollset contains socket with CURL_POLL_IN.
- */
-bool Curl_pollset_want_read(struct Curl_easy *data,
+/* TRUE if the pollset contains socket with CURL_POLL_IN. */
+bool Curl_pollset_want_recv(struct Curl_easy *data,
+ struct easy_pollset *ps,
+ curl_socket_t sock);
+/* TRUE if the pollset contains socket with CURL_POLL_OUT. */
+bool Curl_pollset_want_send(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock);
static void cl_reset_reader(struct Curl_easy *data)
{
struct Curl_creader *reader = data->req.reader_stack;
+ data->req.reader_started = FALSE;
while(reader) {
data->req.reader_stack = reader->next;
reader->crt->do_close(data, reader);
if(!is_connect && !ctx->started_response) {
Curl_pgrsTime(data, TIMER_STARTTRANSFER);
+ Curl_rlimit_start(&data->progress.dl.rlimit, curlx_now());
ctx->started_response = TRUE;
}
if(result)
return result;
}
+
/* Update stats, write and report progress */
+ Curl_rlimit_drain(&data->progress.dl.rlimit, nwrite, curlx_now());
data->req.bytecount += nwrite;
Curl_pgrsSetDownloadCounter(data, data->req.bytecount);
return result;
DEBUGASSERT(data->req.reader_stack);
}
+ if(!data->req.reader_started) {
+ Curl_rlimit_start(&data->progress.ul.rlimit, curlx_now());
+ data->req.reader_started = TRUE;
+ }
+ if(Curl_rlimit_active(&data->progress.ul.rlimit)) {
+ curl_off_t ul_avail =
+ Curl_rlimit_avail(&data->progress.ul.rlimit, curlx_now());
+ if(ul_avail <= 0) {
+ result = CURLE_OK;
+ *eos = FALSE;
+ goto out;
+ }
+ if(ul_avail < (curl_off_t)blen)
+ blen = (size_t)ul_avail;
+ }
result = Curl_creader_read(data, data->req.reader_stack, buf, blen,
nread, eos);
+ if(!result)
+ Curl_rlimit_drain(&data->progress.ul.rlimit, *nread, curlx_now());
+
+out:
CURL_TRC_READ(data, "client_read(len=%zu) -> %d, nread=%zu, eos=%d",
blen, result, *nread, *eos);
return result;
if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_send_speed = offt;
+ Curl_rlimit_init(&data->progress.ul.rlimit, offt, offt, curlx_now());
break;
case CURLOPT_MAX_RECV_SPEED_LARGE:
/*
if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_recv_speed = offt;
+ Curl_rlimit_init(&data->progress.dl.rlimit, offt, offt, curlx_now());
break;
case CURLOPT_RESUME_FROM_LARGE:
/*
data->req.size = -1;
/* Set the progress data */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
/* Carry out the perform */
result = smtp_perform(data, smtpc, smtp, &connected, dophase_done);
+++ /dev/null
-/***************************************************************************
- * _ _ ____ _
- * Project ___| | | | _ \| |
- * / __| | | | |_) | |
- * | (__| |_| | _ <| |___
- * \___|\___/|_| \_\_____|
- *
- * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
- *
- * This software is licensed as described in the file COPYING, which
- * you should have received as part of this distribution. The terms
- * are also available at https://curl.se/docs/copyright.html.
- *
- * You may opt to use, copy, modify, merge, publish, distribute and/or sell
- * copies of the Software, and permit persons to whom the Software is
- * furnished to do so, under the terms of the COPYING file.
- *
- * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- * KIND, either express or implied.
- *
- * SPDX-License-Identifier: curl
- *
- ***************************************************************************/
-
-#include "curl_setup.h"
-
-#include <curl/curl.h>
-#include "urldata.h"
-#include "sendf.h"
-#include "transfer.h"
-#include "multiif.h"
-#include "speedcheck.h"
-
-void Curl_speedinit(struct Curl_easy *data)
-{
- memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
-}
-
-/*
- * @unittest: 1606
- */
-CURLcode Curl_speedcheck(struct Curl_easy *data,
- struct curltime now)
-{
- if(Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
- /* A paused transfer is not qualified for speed checks */
- return CURLE_OK;
-
- if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
- if(data->progress.current_speed < data->set.low_speed_limit) {
- if(!data->state.keeps_speed.tv_sec)
- /* under the limit at this moment */
- data->state.keeps_speed = now;
- else {
- /* how long has it been under the limit */
- timediff_t howlong = curlx_timediff_ms(now, data->state.keeps_speed);
-
- if(howlong >= data->set.low_speed_time * 1000) {
- /* too long */
- failf(data,
- "Operation too slow. "
- "Less than %ld bytes/sec transferred the last %ld seconds",
- data->set.low_speed_limit,
- data->set.low_speed_time);
- return CURLE_OPERATION_TIMEDOUT;
- }
- }
- }
- else
- /* faster right now */
- data->state.keeps_speed.tv_sec = 0;
- }
-
- if(data->set.low_speed_limit)
- /* if low speed limit is enabled, set the expire timer to make this
- connection's speed get checked again in a second */
- Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
-
- return CURLE_OK;
-}
+++ /dev/null
-#ifndef HEADER_CURL_SPEEDCHECK_H
-#define HEADER_CURL_SPEEDCHECK_H
-/***************************************************************************
- * _ _ ____ _
- * Project ___| | | | _ \| |
- * / __| | | | |_) | |
- * | (__| |_| | _ <| |___
- * \___|\___/|_| \_\_____|
- *
- * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
- *
- * This software is licensed as described in the file COPYING, which
- * you should have received as part of this distribution. The terms
- * are also available at https://curl.se/docs/copyright.html.
- *
- * You may opt to use, copy, modify, merge, publish, distribute and/or sell
- * copies of the Software, and permit persons to whom the Software is
- * furnished to do so, under the terms of the COPYING file.
- *
- * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- * KIND, either express or implied.
- *
- * SPDX-License-Identifier: curl
- *
- ***************************************************************************/
-
-#include "curl_setup.h"
-
-#include "curlx/timeval.h"
-struct Curl_easy;
-void Curl_speedinit(struct Curl_easy *data);
-CURLcode Curl_speedcheck(struct Curl_easy *data,
- struct curltime now);
-
-#endif /* HEADER_CURL_SPEEDCHECK_H */
}
}
- if(Curl_pgrsUpdate(data)) {
- result = CURLE_ABORTED_BY_CALLBACK;
- break;
+ if(!result) {
+ result = Curl_pgrsUpdate(data);
+ if(result)
+ keepon = FALSE;
}
}
#endif
#include "multiif.h"
#include "url.h"
#include "strcase.h"
-#include "speedcheck.h"
#include "select.h"
#include "escape.h"
#include "curlx/strerr.h"
}
/* Update the progress meter */
- if(Curl_pgrsUpdate(data)) {
+ result = Curl_pgrsUpdate(data);
+ if(result) {
tftp_state_machine(state, TFTP_EVENT_ERROR);
- return CURLE_ABORTED_BY_CALLBACK;
+ return result;
}
}
return result;
/* The multi code does not have this logic for the DOING state so we
provide it for TFTP since it may do the entire transfer in this
state. */
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, curlx_now());
+ result = Curl_pgrsCheck(data);
}
return result;
}
#include "cw-out.h"
#include "transfer.h"
#include "sendf.h"
-#include "speedcheck.h"
#include "progress.h"
#include "http.h"
#include "url.h"
char *buf, *xfer_buf;
size_t blen, xfer_blen;
int maxloops = 10;
- curl_off_t total_received = 0;
bool is_multiplex = FALSE;
bool rcvd_eagain = FALSE;
- bool is_eos = FALSE;
+ bool is_eos = FALSE, rate_limited = FALSE;
result = Curl_multi_xfer_buf_borrow(data, &xfer_buf, &xfer_blen);
if(result)
buf = xfer_buf;
bytestoread = xfer_blen;
- if(bytestoread && data->set.max_recv_speed > 0) {
- /* In case of speed limit on receiving: if this loop already got
- * a quarter of the quota, break out. We want to stutter a bit
- * to keep in the limit, but too small receives will just cost
- * cpu unnecessarily. */
- if(total_received && (total_received >= (data->set.max_recv_speed / 4)))
+ if(bytestoread && Curl_rlimit_active(&data->progress.dl.rlimit)) {
+ curl_off_t dl_avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
+ curlx_now());
+ /* DEBUGF(infof(data, "dl_rlimit, available=%" FMT_OFF_T, dl_avail));
+ */
+ /* In case of rate limited downloads: if this loop already got
+ * data and less than 16k is left in the limit, break out.
+ * We want to stutter a bit to keep in the limit, but too small
+ * receives will just cost cpu unnecessarily. */
+ if(dl_avail <= 0) {
+ rate_limited = TRUE;
break;
- if(data->set.max_recv_speed < (curl_off_t)bytestoread)
- bytestoread = (size_t)data->set.max_recv_speed;
+ }
+ if(dl_avail < (curl_off_t)bytestoread)
+ bytestoread = (size_t)dl_avail;
}
rcvd_eagain = FALSE;
if(k->eos_written) /* already did write this to client, leave */
break;
}
- total_received += blen;
result = Curl_xfer_write_resp(data, buf, blen, is_eos);
if(result || data->req.done)
if((!is_multiplex && data->req.download_done) || is_eos) {
data->req.keepon &= ~KEEP_RECV;
}
- /* if we are PAUSEd or stopped receiving, leave the loop */
- if((k->keepon & KEEP_RECV_PAUSE) || !(k->keepon & KEEP_RECV))
+ /* if we stopped receiving, leave the loop */
+ if(!(k->keepon & KEEP_RECV))
break;
} while(maxloops--);
- if(!is_eos && !Curl_xfer_is_blocked(data) &&
+ if(!is_eos && !rate_limited && CURL_WANT_RECV(data) &&
(!rcvd_eagain || data_pending(data, rcvd_eagain))) {
/* Did not read until EAGAIN/EOS or there is still data pending
* in buffers. Mark as read-again via simulated SELECT results. */
}
/* If we still have writing to do, we check if we have a writable socket. */
- if(Curl_req_want_send(data) || (data->req.keepon & KEEP_SEND_TIMED)) {
+ if(Curl_req_want_send(data)) {
result = sendrecv_ul(data);
if(result)
goto out;
}
- if(Curl_pgrsUpdate(data))
- result = CURLE_ABORTED_BY_CALLBACK;
- else
- result = Curl_speedcheck(data, *nowp);
+ result = Curl_pgrsCheck(data);
if(result)
goto out;
result = CURLE_PARTIAL_FILE;
goto out;
}
- if(Curl_pgrsUpdate(data)) {
- result = CURLE_ABORTED_BY_CALLBACK;
- goto out;
- }
}
/* If there is nothing more to send/recv, the request is done */
- if((k->keepon & (KEEP_RECVBITS|KEEP_SENDBITS)) == 0)
+ if((k->keepon & (KEEP_RECV|KEEP_SEND)) == 0)
data->req.done = TRUE;
+ result = Curl_pgrsUpdate(data);
+
out:
if(result)
DEBUGF(infof(data, "Curl_sendrecv() -> %d", result));
bool Curl_xfer_send_is_paused(struct Curl_easy *data)
{
- return (data->req.keepon & KEEP_SEND_PAUSE);
+ return Curl_rlimit_is_blocked(&data->progress.ul.rlimit);
}
bool Curl_xfer_recv_is_paused(struct Curl_easy *data)
{
- return (data->req.keepon & KEEP_RECV_PAUSE);
+ return Curl_rlimit_is_blocked(&data->progress.dl.rlimit);
}
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
- if(enable) {
- data->req.keepon |= KEEP_SEND_PAUSE;
- }
- else {
- data->req.keepon &= ~KEEP_SEND_PAUSE;
- if(Curl_creader_is_paused(data))
- result = Curl_creader_unpause(data);
- }
+ Curl_rlimit_block(&data->progress.ul.rlimit, enable, curlx_now());
+ if(!enable && Curl_creader_is_paused(data))
+ result = Curl_creader_unpause(data);
return result;
}
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
- if(enable) {
- data->req.keepon |= KEEP_RECV_PAUSE;
- }
- else {
- data->req.keepon &= ~KEEP_RECV_PAUSE;
- if(Curl_cwriter_is_paused(data))
- result = Curl_cwriter_unpause(data);
- }
+ Curl_rlimit_block(&data->progress.dl.rlimit, enable, curlx_now());
+ if(!enable && Curl_cwriter_is_paused(data))
+ result = Curl_cwriter_unpause(data);
Curl_conn_ev_data_pause(data, enable);
+ Curl_pgrsRecvPause(data, enable);
return result;
}
-
-bool Curl_xfer_is_too_fast(struct Curl_easy *data)
-{
- struct Curl_llist_node *e = Curl_llist_head(&data->state.timeoutlist);
- while(e) {
- struct time_node *n = Curl_node_elem(e);
- e = Curl_node_next(e);
- if(n->eid == EXPIRE_TOOFAST)
- return TRUE;
- }
- return FALSE;
-}
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable);
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable);
-/* Query if transfer has expire timeout TOOFAST set. */
-bool Curl_xfer_is_too_fast(struct Curl_easy *data);
-
#endif /* HEADER_CURL_TRANSFER_H */
#include "select.h"
#include "multiif.h"
#include "easyif.h"
-#include "speedcheck.h"
#include "curlx/warnless.h"
#include "getinfo.h"
#include "pop3.h"
CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn)
{
- /* if this is a pushed stream, we need this: */
CURLcode result;
if(conn) {
result = Curl_req_start(&data->req, data);
if(!result) {
- Curl_speedinit(data);
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
+ Curl_pgrsReset(data);
}
return result;
}
#include "curlx/dynbuf.h"
#include "dynhds.h"
#include "request.h"
+#include "ratelimit.h"
#include "netrc.h"
/* On error return, the value of `pnwritten` has no meaning */
#define KEEP_NONE 0
#define KEEP_RECV (1<<0) /* there is or may be data to read */
#define KEEP_SEND (1<<1) /* there is or may be data to write */
-#define KEEP_RECV_HOLD (1<<2) /* when set, no reading should be done but there
- might still be data to read */
-#define KEEP_SEND_HOLD (1<<3) /* when set, no writing should be done but there
- might still be data to write */
-#define KEEP_RECV_PAUSE (1<<4) /* reading is paused */
-#define KEEP_SEND_PAUSE (1<<5) /* writing is paused */
-
-/* KEEP_SEND_TIMED is set when the transfer should attempt sending
- * at timer (or other) events. A transfer waiting on a timer will
- * remove KEEP_SEND to suppress POLLOUTs of the connection.
- * Adding KEEP_SEND_TIMED will then attempt to send whenever the transfer
- * enters the "readwrite" loop, e.g. when a timer fires.
- * This is used in HTTP for 'Expect: 100-continue' waiting. */
-#define KEEP_SEND_TIMED (1<<6)
-
-#define KEEP_RECVBITS (KEEP_RECV | KEEP_RECV_HOLD | KEEP_RECV_PAUSE)
-#define KEEP_SENDBITS (KEEP_SEND | KEEP_SEND_HOLD | KEEP_SEND_PAUSE)
-
-/* transfer wants to send is not PAUSE or HOLD */
-#define CURL_WANT_SEND(data) \
- (((data)->req.keepon & KEEP_SENDBITS) == KEEP_SEND)
-/* transfer receive is not on PAUSE or HOLD */
-#define CURL_WANT_RECV(data) \
- (((data)->req.keepon & KEEP_RECVBITS) == KEEP_RECV)
+
+/* transfer wants to send */
+#define CURL_WANT_SEND(data) ((data)->req.keepon & KEEP_SEND)
+/* transfer wants to receive */
+#define CURL_WANT_RECV(data) ((data)->req.keepon & KEEP_RECV)
#define FIRSTSOCKET 0
#define SECONDARYSOCKET 1
BIT(used_proxy); /* the transfer used a proxy */
};
-struct pgrs_measure {
- struct curltime start; /* when measure started */
- curl_off_t start_size; /* the 'cur_size' the measure started at */
-};
-
struct pgrs_dir {
curl_off_t total_size; /* total expected bytes */
curl_off_t cur_size; /* transferred bytes so far */
curl_off_t speed; /* bytes per second transferred */
- struct pgrs_measure limit;
+ struct Curl_rlimit rlimit; /* speed limiting / pausing */
};
struct Progress {
struct curltime t_startqueue;
struct curltime t_acceptdata;
-#define CURR_TIME (5 + 1) /* 6 entries for 5 seconds */
+#define CURL_SPEED_RECORDS (5 + 1) /* 6 entries for 5 seconds */
- curl_off_t speeder[ CURR_TIME ];
- struct curltime speeder_time[ CURR_TIME ];
+ curl_off_t speed_amount[ CURL_SPEED_RECORDS ];
+ struct curltime speed_time[ CURL_SPEED_RECORDS ];
unsigned char speeder_c;
BIT(hide);
BIT(ul_size_known);
#define QUIC_HANDSHAKE_TIMEOUT (10*NGTCP2_SECONDS)
/* A stream window is the maximum amount we need to buffer for
- * each active transfer. We use HTTP/3 flow control and only ACK
- * when we take things out of the buffer.
+ * each active transfer.
* Chunk size is large enough to take a full DATA frame */
-#define H3_STREAM_WINDOW_SIZE (128 * 1024)
+#define H3_STREAM_WINDOW_SIZE (64 * 1024)
#define H3_STREAM_CHUNK_SIZE (16 * 1024)
#if H3_STREAM_CHUNK_SIZE < NGTCP2_MAX_UDP_PAYLOAD_SIZE
#error H3_STREAM_CHUNK_SIZE smaller than NGTCP2_MAX_UDP_PAYLOAD_SIZE
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
curl_uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
+ uint64_t download_unacked; /* bytes not acknowledged yet */
int status_code; /* HTTP status code */
CURLcode xfer_result; /* result from xfer_resp_write(_hd) */
BIT(resp_hds_complete); /* we have a complete, final response */
s->handshake_timeout = (data->set.connecttimeout > 0) ?
data->set.connecttimeout * NGTCP2_MILLISECONDS : QUIC_HANDSHAKE_TIMEOUT;
s->max_window = 100 * ctx->max_stream_window;
- s->max_stream_window = 10 * ctx->max_stream_window;
+ s->max_stream_window = ctx->max_stream_window;
s->no_pmtud = FALSE;
#ifdef NGTCP2_SETTINGS_V3
/* try ten times the ngtcp2 defaults here for problems with Caddy */
}
}
+static void cf_ngtcp2_ack_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct h3_stream_ctx *stream)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ struct curltime now = curlx_now();
+ curl_off_t avail;
+ uint64_t ack_len = 0;
+
+ /* How many byte to ack on the stream? */
+
+ /* how much does rate limiting allow us to acknowledge? */
+ avail = Curl_rlimit_avail(&data->progress.dl.rlimit, now);
+ if(avail == CURL_OFF_T_MAX) { /* no rate limit, ack all */
+ ack_len = stream->download_unacked;
+ }
+ else if(avail > 0) {
+ ack_len = CURLMIN(stream->download_unacked, (uint64_t)avail);
+ }
+
+ if(ack_len) {
+ CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %" PRIu64
+ "/%" PRIu64 " bytes of DATA", stream->id,
+ ack_len, stream->download_unacked);
+ ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, ack_len);
+ stream->download_unacked -= ack_len;
+ }
+}
+
static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
const uint8_t *buf, size_t blen,
void *user_data, void *stream_user_data)
return NGHTTP3_ERR_CALLBACK_FAILURE;
h3_xfer_write_resp(cf, data, stream, (const char *)buf, blen, FALSE);
- if(blen) {
- CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %zu bytes of DATA",
- stream->id, blen);
- ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, blen);
- ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
- }
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu", stream->id, blen);
+
+ ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
+ if(UINT64_MAX - blen < stream->download_unacked)
+ stream->download_unacked = UINT64_MAX; /* unlikely */
+ else
+ stream->download_unacked += blen;
+
+ cf_ngtcp2_ack_stream(cf, data, stream);
return 0;
}
goto out;
}
+ cf_ngtcp2_ack_stream(cf, data, stream);
+
if(cf_progress_ingress(cf, data, &pktx)) {
result = CURLE_RECV_ERROR;
goto out;
#include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h"
#include "../url.h"
-#include "../speedcheck.h"
#include "../vtls/vtls.h"
#include "../cfilters.h"
#include "../connect.h"
while((sshc->state != SSH_STOP) && !result) {
bool block;
timediff_t left_ms = 1000;
- struct curltime now = curlx_now();
result = myssh_statemach_act(data, sshc, sshp, &block);
if(result)
break;
if(!disconnect) {
- if(Curl_pgrsUpdate(data))
- return CURLE_ABORTED_BY_CALLBACK;
-
- result = Curl_speedcheck(data, now);
+ result = Curl_pgrsCheck(data);
if(result)
break;
sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done);
#include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h"
#include "../url.h"
-#include "../speedcheck.h"
#include "../vtls/vtls.h"
#include "../cfilters.h"
#include "../connect.h"
break;
if(!disconnect) {
- if(Curl_pgrsUpdate(data))
- return CURLE_ABORTED_BY_CALLBACK;
-
- result = Curl_speedcheck(data, now);
+ result = Curl_pgrsCheck(data);
if(result)
break;
sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */
- Curl_pgrsSetUploadCounter(data, 0);
- Curl_pgrsSetDownloadCounter(data, 0);
- Curl_pgrsSetUploadSize(data, -1);
- Curl_pgrsSetDownloadSize(data, -1);
+ Curl_pgrsReset(data);
if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done);
int what;
timediff_t timeout_ms, remaining;
- if(Curl_pgrsUpdate(data)) {
- result = CURLE_ABORTED_BY_CALLBACK;
+ result = Curl_pgrsUpdate(data);
+ if(result)
break;
- }
elapsed = curlx_timediff_ms(curlx_now(), rs->start_time);
if(elapsed >= MAX_RENEG_BLOCK_TIME) {
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */
- Curl_http_do_pollset, /* doing_pollset */
+ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
- ZERO_NULL, /* perform_pollset */
+ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_pollset */
- Curl_http_do_pollset, /* doing_pollset */
+ Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
- ZERO_NULL, /* perform_pollset */
+ Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
test3100 test3101 test3102 test3103 test3104 test3105 \
\
test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 test3208 \
-test3209 test3210 test3211 test3212 test3213 test3214 test3215 \
+test3209 test3210 test3211 test3212 test3213 test3214 test3215 test3216 \
test4000 test4001
EXTRA_DIST = $(TESTCASES) DISABLED data-xml1
--- /dev/null
+<testcase>
+<info>
+<keywords>
+unittest
+ratelimit
+</keywords>
+</info>
+
+#
+# Client-side
+<client>
+<features>
+unittest
+</features>
+<name>
+ratelimit unit tests
+</name>
+</client>
+</testcase>
import difflib
import filecmp
import logging
-import math
import os
import re
import sys
-from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient
count = 1
url = f'https://{env.authority_for(env.domain1, proto)}/data-1m'
curl = CurlClient(env=env)
- speed_limit = 384 * 1024
- min_duration = math.floor((1024 * 1024)/speed_limit)
+ speed_limit = 256 * 1024
r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[
'--limit-rate', f'{speed_limit}'
])
r.check_response(count=count, http_status=200)
- assert r.duration > timedelta(seconds=min_duration), \
- f'rate limited transfer should take more than {min_duration}s, '\
- f'not {r.duration}'
+ dl_speed = r.stats[0]['speed_download']
+ # speed limit is only exact on long durations. Ideally this transfer
+ # would take 4 seconds, but it may end just after 3 because then
+ # we have downloaded the rest and will not wait for the rate
+ # limit to increase again.
+ assert dl_speed <= ((1024*1024)/3), f'{r.stats[0]}'
# make extreme parallel h2 upgrades, check invalid conn reuse
# before protocol switch has happened
r.check_response(count=count, http_status=200)
assert r.responses[0]['header']['received-length'] == f'{up_len}', f'{r.responses[0]}'
up_speed = r.stats[0]['speed_upload']
- assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
+ assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# speed limited on echo handler
@pytest.mark.parametrize("proto", Env.http_protos())
])
r.check_response(count=count, http_status=200)
up_speed = r.stats[0]['speed_upload']
- assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
+ assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# upload larger data, triggering "Expect: 100-continue" code paths
@pytest.mark.parametrize("proto", ['http/1.1'])
unit1979.c unit1980.c \
unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \
unit3200.c unit3205.c \
- unit3211.c unit3212.c unit3213.c unit3214.c
+ unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c
***************************************************************************/
#include "unitcheck.h"
-#include "speedcheck.h"
+#include "progress.h"
#include "urldata.h"
static CURLcode t1606_setup(struct Curl_easy **easy)
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_LIMIT, speed_limit);
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_TIME, time_limit);
- Curl_speedinit(easy);
+ Curl_pgrsReset(easy);
do {
/* fake the current transfer speed */
easy->progress.current_speed = speed;
- res = Curl_speedcheck(easy, now);
+ res = pgrs_speedcheck(easy, &now);
if(res)
break;
/* step the time */
--- /dev/null
+/***************************************************************************
+ * _ _ ____ _
+ * Project ___| | | | _ \| |
+ * / __| | | | |_) | |
+ * | (__| |_| | _ <| |___
+ * \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ * SPDX-License-Identifier: curl
+ *
+ ***************************************************************************/
+#include "unitcheck.h"
+
+#include "ratelimit.h"
+
+static CURLcode test_unit3216(const char *arg)
+{
+ UNITTEST_BEGIN_SIMPLE
+ struct Curl_rlimit r;
+ struct curltime ts;
+
+ /* A ratelimit that is unlimited */
+ ts = curlx_now();
+ Curl_rlimit_init(&r, 0, 0, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "inf");
+ Curl_rlimit_drain(&r, 1000000, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "drain keep inf");
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "inf never waits");
+
+ Curl_rlimit_block(&r, TRUE, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 0, "inf blocked to 0");
+ Curl_rlimit_drain(&r, 1000000, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 0, "blocked inf");
+ Curl_rlimit_block(&r, FALSE, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX,
+ "unblocked unlimited");
+
+ /* A ratelimit that give 10 tokens per second */
+ ts = curlx_now();
+ Curl_rlimit_init(&r, 10, 0, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
+ Curl_rlimit_drain(&r, 5, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
+ Curl_rlimit_drain(&r, 3, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
+ ts.tv_usec += 1000; /* 1ms */
+ Curl_rlimit_drain(&r, 3, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 999, "wait 999ms");
+ ts.tv_usec += 1000; /* 1ms */
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 998, "wait 998ms");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 19, "10 inc per sec(2)");
+
+ Curl_rlimit_block(&r, TRUE, curlx_now());
+ fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 0, "10 blocked to 0");
+ Curl_rlimit_block(&r, FALSE, curlx_now());
+ fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 10, "unblocked 10");
+
+ /* A ratelimit that give 10 tokens per second, max burst 15/s */
+ ts = curlx_now();
+ Curl_rlimit_init(&r, 10, 15, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
+ Curl_rlimit_drain(&r, 5, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
+ Curl_rlimit_drain(&r, 3, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
+ Curl_rlimit_drain(&r, 3, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit(2)");
+ Curl_rlimit_drain(&r, 15, ts);
+ fail_unless(Curl_rlimit_avail(&r, ts) == 0, "drain to 0");
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 1000, "wait 1 sec");
+ ts.tv_usec += 500000; /* half a sec, cheating on second carry */
+ fail_unless(Curl_rlimit_avail(&r, ts) == 0, "0 after 0.5 sec");
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 500, "wait 0.5 sec");
+ ts.tv_sec += 1;
+ fail_unless(Curl_rlimit_avail(&r, ts) == 10, "10 after 1.5 sec");
+ fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "wait 0");
+ ts.tv_usec += 500000; /* half a sec, cheating on second carry */
+ fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10 after 2 sec");
+
+ UNITTEST_END_SIMPLE
+}