}
static apr_status_t append_bucket(h2_bucket_beam *beam,
- apr_bucket *b,
+ apr_bucket_brigade *bb,
apr_read_type_e block,
apr_size_t *pspace_left,
apr_off_t *pwritten)
{
+ apr_bucket *b;
const char *data;
apr_size_t len;
- apr_status_t status = APR_SUCCESS;
- int can_beam = 0, check_len;
+ apr_status_t rv = APR_SUCCESS;
+ int can_beam = 0;
(void)block;
if (beam->aborted) {
- return APR_ECONNABORTED;
+ rv = APR_ECONNABORTED;
+ goto cleanup;
}
-
+
+ b = APR_BRIGADE_FIRST(bb);
if (APR_BUCKET_IS_METADATA(b)) {
APR_BUCKET_REMOVE(b);
apr_bucket_setaside(b, beam->pool);
H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
*pwritten += (apr_off_t)b->length;
- return APR_SUCCESS;
+ goto cleanup;
+ }
+ /* non meta bucket */
+
+ /* in case of indeterminate length, we need to read the bucket,
+ * so that it transforms itself into something stable. */
+ if (b->length == ((apr_size_t)-1)) {
+ rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- else if (APR_BUCKET_IS_FILE(b)) {
+
+ if (APR_BUCKET_IS_FILE(b)) {
/* For file buckets the problem is their internal readpool that
* is used on the first read to allocate buffer/mmap.
* Since setting aside a file bucket will de-register the
* transport. */
apr_bucket_file *bf = b->data;
can_beam = !beam->copy_files && (bf->refcount.refcount == 1);
- check_len = !can_beam;
}
else if (bucket_is_mmap(b)) {
can_beam = !beam->copy_files;
- check_len = !can_beam;
- }
- else {
- if (b->length == ((apr_size_t)-1)) {
- const char *data2;
- status = apr_bucket_read(b, &data2, &len, APR_BLOCK_READ);
- if (status != APR_SUCCESS) {
- return status;
- }
- }
- check_len = 1;
- }
-
- if (check_len) {
- if (b->length > *pspace_left) {
- apr_bucket_split(b, *pspace_left);
- }
- *pspace_left -= b->length;
}
- /* The fundamental problem is that reading a sender bucket from
- * a receiver thread is a total NO GO, because the bucket might use
- * its pool/bucket_alloc from a foreign thread and that will
- * corrupt. */
if (b->length == 0) {
apr_bucket_delete(b);
- return APR_SUCCESS;
+ rv = APR_SUCCESS;
+ goto cleanup;
}
- else if (APR_BUCKET_IS_HEAP(b)) {
- /* For heap buckets read from a receiver thread is fine. The
+
+ if (!*pspace_left) {
+ rv = APR_EAGAIN;
+ goto cleanup;
+ }
+
+ /* bucket is accepted and added to beam->buckets_to_send */
+ if (APR_BUCKET_IS_HEAP(b)) {
+ /* For heap buckets, a read from a receiver thread is fine. The
* data will be there and live until the bucket itself is
* destroyed. */
- status = apr_bucket_setaside(b, beam->pool);
- if (status != APR_SUCCESS) goto cleanup;
+ rv = apr_bucket_setaside(b, beam->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
}
else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) {
- status = apr_bucket_setaside(b, beam->pool);
- if (status != APR_SUCCESS) goto cleanup;
+ rv = apr_bucket_setaside(b, beam->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
}
else {
/* we know of no special shortcut to transfer the bucket to
* another pool without copying. So we make it a heap bucket. */
- status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
- if (status != APR_SUCCESS) goto cleanup;
+ apr_bucket *b2;
+
+ rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) goto cleanup;
/* this allocates and copies data */
- apr_bucket_heap_make(b, data, len, NULL);
+ b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc);
+ apr_bucket_delete(b);
+ b = b2;
+ APR_BRIGADE_INSERT_HEAD(bb, b);
}
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
*pwritten += (apr_off_t)b->length;
+ if (b->length > *pspace_left) {
+ *pspace_left = 0;
+ }
+ else {
+ *pspace_left -= b->length;
+ }
cleanup:
- return status;
+ return rv;
}
apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from,
apr_read_type_e block,
apr_off_t *pwritten)
{
- apr_bucket *b;
apr_status_t rv = APR_SUCCESS;
apr_size_t space_left = 0;
int was_empty;
space_left = calc_space_left(beam);
while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
- if (!beam->aborted && space_left <= 0) {
+ rv = append_bucket(beam, sender_bb, block, &space_left, pwritten);
+ if (!beam->aborted && APR_EAGAIN == rv) {
+ /* bucket was not added, as beam buffer has no space left.
+ * Trigger event callbacks, so receiver can know there is something
+ * to receive before we do a conditional wait. */
purge_consumed_buckets(beam);
if (was_empty && beam->was_empty_cb) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
was_empty = buffer_is_empty(beam);
}
- b = APR_BRIGADE_FIRST(sender_bb);
- rv = append_bucket(beam, b, block, &space_left, pwritten);
}
if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) {
assert r.response["body"] == src, f"expected '{src}', got '{r.response['body']}'"
@pytest.mark.parametrize("name", [
- "data-1k", "data-10k", "data-100k", "data-1m"
+ # "data-1k", "data-10k", "data-100k", "data-1m"
+ "data-1m"
])
- def test_h2_004_21(self, env, name):
+ def test_h2_004_21(self, env, name, repeat):
self.nghttp_post_and_verify(env, name, [])
@pytest.mark.parametrize("name", [
"data-1k", "data-10k", "data-100k", "data-1m",
])
- @pytest.mark.skip(reason="FIXME: this fails on rare occasions")
def test_h2_004_22(self, env, name, repeat):
self.nghttp_post_and_verify(env, name, ["--no-content-length"])