From 250daaf8fea43981ac2ff5e8c814a6bca05ea6af Mon Sep 17 00:00:00 2001 From: Stephan Bosch Date: Thu, 8 Aug 2019 00:47:34 +0200 Subject: [PATCH] lib-json: json-istream - Add support for reading string values as an input stream --- src/lib-json/json-istream.c | 237 +++++++++++++++++++ src/lib-json/json-istream.h | 29 +++ src/lib-json/test-json-istream.c | 393 ++++++++++++++++++++++++++++++- 3 files changed, 656 insertions(+), 3 deletions(-) diff --git a/src/lib-json/json-istream.c b/src/lib-json/json-istream.c index 17ca3e90b7..f4017775a4 100644 --- a/src/lib-json/json-istream.c +++ b/src/lib-json/json-istream.c @@ -18,6 +18,8 @@ struct json_istream { unsigned int read_node_level; unsigned int skip_nodes; + struct istream *value_stream, *seekable_stream; + char *error; bool opened:1; @@ -28,8 +30,12 @@ struct json_istream { bool end_of_list:1; /* Encountered the end of current array/object */ bool end_of_input:1; /* Encountered end of input */ bool skip_to_end:1; /* Skip to the end of the JSON text */ + bool deref_value:1; /* Value (stream) needs to be dereferenced */ }; +static void json_istream_dereference_value(struct json_istream *stream); +static int json_istream_consume_value_stream(struct json_istream *stream); + /* * Parser callbacks */ @@ -96,6 +102,8 @@ void json_istream_unref(struct json_istream **_stream) if (--stream->refcount > 0) return; + json_istream_dereference_value(stream); + json_parser_deinit(&stream->parser); i_free(stream->error); i_free(stream); @@ -108,6 +116,8 @@ void json_istream_destroy(struct json_istream **_stream) if (stream == NULL) return; + json_istream_dereference_value(stream); + json_istream_close(stream); json_istream_unref(_stream); } @@ -115,6 +125,8 @@ void json_istream_destroy(struct json_istream **_stream) void json_istream_close(struct json_istream *stream) { stream->closed = TRUE; + if (stream->value_stream != NULL) + i_stream_close(stream->value_stream); } bool json_istream_is_closed(struct json_istream *stream) @@ -371,6 +383,10 @@ json_istream_parse_value(void *context, void *parent_context ATTR_UNUSED, stream->node.type = type; stream->node.value = *value; stream->node_parsed = TRUE; + if (value->content_type == JSON_CONTENT_TYPE_STREAM) { + stream->value_stream = value->content.stream; + i_stream_ref(stream->value_stream); + } json_parser_interrupt(stream->parser); } @@ -378,6 +394,23 @@ json_istream_parse_value(void *context, void *parent_context ATTR_UNUSED, * */ +static void json_istream_dereference_value(struct json_istream *stream) +{ + if (stream->deref_value) { + stream->deref_value = FALSE; + /* These streams have destroy callbacks that guarantee that no + stale pointer can remain in the JSON istream. */ + if (stream->seekable_stream != NULL) { + struct istream *seekable_stream = + stream->seekable_stream; + i_stream_unref(&seekable_stream); + } else if (stream->value_stream != NULL) { + i_stream_unref(&stream->value_stream); + } + json_parser_disable_string_stream(stream->parser); + } +} + int json_istream_read(struct json_istream *stream, struct json_node *node_r) { const char *error; @@ -397,6 +430,10 @@ int json_istream_read(struct json_istream *stream, struct json_node *node_r) *node_r = stream->node; return 1; } + json_istream_dereference_value(stream); + ret = json_istream_consume_value_stream(stream); + if (ret <= 0) + return ret; ret = json_parse_more(stream->parser, &error); if (ret < 0) { json_istream_set_error(stream, error); @@ -447,6 +484,7 @@ static void json_istream_next_node(struct json_istream *stream) void json_istream_skip(struct json_istream *stream) { + json_istream_dereference_value(stream); json_istream_next_node(stream); } @@ -487,6 +525,10 @@ int json_istream_read_object_member(struct json_istream *stream, *name_r = NULL; return 1; } + json_istream_dereference_value(stream); + ret = json_istream_consume_value_stream(stream); + if (ret <= 0) + return ret; stream->read_member = TRUE; ret = json_parse_more(stream->parser, &error); stream->read_member = FALSE; @@ -582,3 +624,198 @@ int json_istream_walk(struct json_istream *stream, struct json_node *node_r) *node_r = node; return 1; } + +/* + * Stream values + */ + +static void json_istream_drop_seekable_stream(struct json_istream *stream) +{ + stream->deref_value = FALSE; + stream->value_stream = NULL; + stream->seekable_stream = NULL; + json_parser_disable_string_stream(stream->parser); +} + +static void json_istream_drop_value_stream(struct json_istream *stream) +{ + if (stream->deref_value) { + stream->deref_value = FALSE; + if (stream->seekable_stream != NULL) { + i_stream_remove_destroy_callback( + stream->seekable_stream, + json_istream_drop_seekable_stream); + i_stream_unref(&stream->seekable_stream); + } + } + stream->value_stream = NULL; + stream->seekable_stream = NULL; +} + +static void json_istream_consumed_value_stream(struct json_istream *stream) +{ + json_istream_dereference_value(stream); + if (stream->seekable_stream != NULL) { + i_stream_remove_destroy_callback( + stream->seekable_stream, + json_istream_drop_seekable_stream); + } + if (stream->value_stream != NULL) { + i_stream_remove_destroy_callback( + stream->value_stream, + json_istream_drop_value_stream); + } + stream->value_stream = NULL; + stream->seekable_stream = NULL; + json_parser_disable_string_stream(stream->parser); +} + +static int json_istream_consume_value_stream(struct json_istream *stream) +{ + struct istream *input = stream->seekable_stream; + const unsigned char *data; + uoff_t v_offset; + size_t size; + int ret; + + if (input == NULL) + return 1; + if (!i_stream_have_bytes_left(stream->seekable_stream)) { + json_istream_consumed_value_stream(stream); + return 1; + } + + v_offset = input->v_offset; + i_stream_seek(input, stream->value_stream->v_offset); + while ((ret = i_stream_read_more(input, &data, &size)) > 0) + i_stream_skip(input, size); + i_stream_seek(input, v_offset); + if (ret == 0) + return ret; + + if (input->stream_errno != 0) { + json_istream_set_error(stream, + t_strdup_printf("read(%s) failed: %s", + i_stream_get_name(input), + i_stream_get_error(input))); + return -1; + } + i_assert(stream->value_stream == NULL || + !i_stream_have_bytes_left(stream->value_stream)); + i_assert(stream->seekable_stream == NULL || + !i_stream_have_bytes_left(stream->seekable_stream)); + json_istream_consumed_value_stream(stream); + return 1; +} + +static void +json_istream_handle_stream(struct json_istream *stream, + const char *temp_path_prefix, + size_t max_buffer_size, + struct json_node *node) +{ + if (node->value.content_type == JSON_CONTENT_TYPE_STREAM) { + if (temp_path_prefix != NULL) { + struct istream *input[2] = { NULL, NULL }; + + i_assert(stream->value_stream != NULL); + i_assert(stream->seekable_stream == NULL); + i_assert(!stream->deref_value); + + input[0] = stream->value_stream; + stream->seekable_stream = i_stream_create_seekable_path( + input, max_buffer_size, temp_path_prefix); + i_stream_unref(&input[0]); + node->value.content.stream = stream->seekable_stream; + i_stream_set_name(stream->seekable_stream, + "(seekable JSON string)"); + + i_stream_add_destroy_callback( + stream->value_stream, + json_istream_drop_value_stream, stream); + i_stream_add_destroy_callback( + stream->seekable_stream, + json_istream_drop_seekable_stream, stream); + } + stream->deref_value = TRUE; + } +} + +int json_istream_read_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r) +{ + int ret; + + if (stream->closed) + return -1; + + if (stream->node_parsed) { + if (node_r != NULL) + *node_r = stream->node; + if (node_r->value.content_type == JSON_CONTENT_TYPE_STREAM && + stream->seekable_stream != NULL) + node_r->value.content.stream = stream->seekable_stream; + return 1; + } + + json_parser_enable_string_stream(stream->parser, threshold, + max_buffer_size); + ret = json_istream_read(stream, node_r); + if (ret <= 0 ) { + json_parser_disable_string_stream(stream->parser); + return ret; + } + + json_istream_handle_stream(stream, temp_path_prefix, max_buffer_size, + node_r); + return 1; +} + +int json_istream_read_next_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r) +{ + int ret; + + ret = json_istream_read_stream(stream, threshold, max_buffer_size, + temp_path_prefix, node_r); + if (ret <= 0) + return ret; + json_istream_next_node(stream); + return 1; +} + +int json_istream_walk_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r) +{ + int ret; + + if (stream->closed) + return -1; + + if (stream->node_parsed) { + if (node_r != NULL) + *node_r = stream->node; + if (node_r->value.content_type == JSON_CONTENT_TYPE_STREAM && + stream->seekable_stream != NULL) + node_r->value.content.stream = stream->seekable_stream; + return 1; + } + + json_parser_enable_string_stream(stream->parser, threshold, + max_buffer_size); + ret = json_istream_walk(stream, node_r); + if (ret <= 0 ) { + json_parser_disable_string_stream(stream->parser); + return ret; + } + + json_istream_handle_stream(stream, temp_path_prefix, max_buffer_size, + node_r); + return 1; +} diff --git a/src/lib-json/json-istream.h b/src/lib-json/json-istream.h index 5d85079777..0bbfd3fded 100644 --- a/src/lib-json/json-istream.h +++ b/src/lib-json/json-istream.h @@ -143,4 +143,33 @@ void json_istream_ascend_to(struct json_istream *stream, int json_istream_walk(struct json_istream *stream, struct json_node *node_r); +/* Equivalent to json_istream_read(), but reads strings bigger than + `threshold' octets as an istream with `max_buffer_size'. When + `temp_path_prefix' is not NULL, the returned stream is made seekable and + can be read at a later time. + */ +int json_istream_read_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r); +/* Equivalent to json_istream_read_next(), but reads strings bigger than + `threshold' octets as an istream with `max_buffer_size'. When + `temp_path_prefix' is not NULL, the returned stream is made seekable and + can be read at a later time. + */ +int json_istream_read_next_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r); + +/* Equivalent to json_istream_walk(), but reads strings bigger than + `threshold' octets as an istream with `max_buffer_size'. When + `temp_path_prefix' is not NULL, the returned stream is made seekable and + can be read at a later time. + */ +int json_istream_walk_stream(struct json_istream *stream, + size_t threshold, size_t max_buffer_size, + const char *temp_path_prefix, + struct json_node *node_r); + #endif diff --git a/src/lib-json/test-json-istream.c b/src/lib-json/test-json-istream.c index 185d9e8e49..f80d9411a3 100644 --- a/src/lib-json/test-json-istream.c +++ b/src/lib-json/test-json-istream.c @@ -2353,6 +2353,279 @@ static void test_json_istream_finish(void) i_stream_unref(&input); } +/* + * Test: read stream + */ + +static void test_json_istream_read_stream(void) +{ + struct istream *input, *val_input; + struct json_istream *jinput; + const char *str_text, *text; + struct json_node jnode; + unsigned int pos, text_len, state; + string_t *buffer; + int ret = 0; + + buffer = str_new(default_pool, 256); + + str_texttext = "[\"012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789\"]"; + text_len = strlen(text); + + input = test_istream_create_data(text, text_len); + jinput = json_istream_create(input, 0, NULL, 0); + + test_begin("json istream read stream (array)"); + + pos = 0; state = 0; ret = 0; + while (ret >= 0 && state <= 2) { + if (pos <= text_len) + pos++; + test_istream_set_size(input, pos); + switch (state) { + case 0: + ret = json_istream_descend(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + test_assert(json_node_is_array(&jnode)); + state++; + break; + case 1: + ret = json_istream_read_stream( + jinput, 0, IO_BLOCK_SIZE, + "/tmp/dovecot-test-json.", &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + test_assert(json_node_is_string(&jnode)); + test_assert(jnode.value.content_type == + JSON_CONTENT_TYPE_STREAM); + test_assert(jnode.value.content.stream != NULL); + val_input = jnode.value.content.stream; + i_stream_ref(val_input); + json_istream_ascend(jinput); + state++; + break; + case 2: + ret = json_istream_read(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + state++; + break; + } + } + test_assert(state == 2); + + if (!test_has_failed()) { + const unsigned char *data; + size_t size; + + while ((ret = i_stream_read_more(val_input, + &data, &size)) > 0) { + buffer_append(buffer, data, size); + i_stream_skip(val_input, size); + } + if (ret < 0) { + test_assert(!i_stream_have_bytes_left(val_input)); + i_stream_unref(&val_input); + ret = 0; + } + } + test_out_quiet("stream output", strcmp(str_c(buffer), str_text) == 0); + test_json_read_success(&jinput); + + test_end(); + + json_istream_unref(&jinput); + i_stream_unref(&input); + + str_truncate(buffer, 0); + + text = "[[{\"data\": \"012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789" + "012345678901234567890123456789\"}, \"frop\"]]"; + text_len = strlen(text); + + input = test_istream_create_data(text, text_len); + jinput = json_istream_create(input, 0, NULL, 0); + + test_begin("json istream read stream (object)"); + + pos = 0; state = 0; ret = 0; + while (ret >= 0 && state <= 8) { + if (pos <= text_len) + pos++; + test_istream_set_size(input, pos); + switch (state) { + case 0: + ret = json_istream_descend(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_array(&jnode)); + state++; + break; + case 1: + ret = json_istream_descend(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_array(&jnode)); + state++; + break; + case 2: + ret = json_istream_descend(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_object(&jnode)); + state++; + break; + case 3: + ret = json_istream_read_stream( + jinput, 0, IO_BLOCK_SIZE, + "/tmp/dovecot-test-json.", &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + test_assert(json_node_is_string(&jnode)); + test_assert(jnode.value.content_type == + JSON_CONTENT_TYPE_STREAM); + test_assert(jnode.value.content.stream != NULL); + val_input = jnode.value.content.stream; + i_stream_ref(val_input); + json_istream_skip(jinput); + state++; + break; + case 4: + ret = json_istream_read_next(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_object_end(&jnode)); + json_istream_ascend(jinput); + state++; + break; + case 5: + ret = json_istream_read_next(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_string(&jnode)); + state++; + break; + case 6: + ret = json_istream_read_next(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_array_end(&jnode)); + json_istream_ascend(jinput); + state++; + break; + case 7: + ret = json_istream_read_next(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + i_assert(json_node_is_array_end(&jnode)); + json_istream_ascend(jinput); + state++; + break; + case 8: + ret = json_istream_read_next(jinput, &jnode); + if (ret == 0) + continue; + if (ret < 0) + break; + state++; + break; + } + } + test_assert(state == 8); + + if (!test_has_failed()) { + const unsigned char *data; + size_t size; + + while ((ret = i_stream_read_more(val_input, + &data, &size)) > 0) { + buffer_append(buffer, data, size); + i_stream_skip(val_input, size); + } + if (ret < 0) { + test_assert(!i_stream_have_bytes_left(val_input)); + i_stream_unref(&val_input); + ret = 0; + } + } + test_out_quiet("stream output", strcmp(str_c(buffer), str_text) == 0); + test_json_read_success(&jinput); + + test_end(); + + json_istream_unref(&jinput); + i_stream_unref(&input); + + str_free(&buffer); +} + /* * Test: tokens */ @@ -2376,6 +2649,7 @@ static const char test_json_tokens_input[] = " \"sub4\":0.456e-789" "}," "\"key9\": \"foo\\\\\\\"\\b\\f\\n\\r\\t\\u0001\\u10ff\"," + "\"key10\": \"foo\\\\\\\"\\b\\f\\n\\r\\t\\u0001\\u10ff\"," "\"key11\": []," "\"key12\": [ \"foo\" , 5.24,[true],{\"aobj\":[]}]," "\"key13\": \"\\ud801\\udc37\"," @@ -2460,6 +2734,12 @@ static struct json_node test_json_tokens_output[] = { .content_type = JSON_CONTENT_TYPE_STRING, .content = { .str = "foo\\\"\b\f\n\r\t\001\xe1\x83\xbf" } }, + }, { + .name = "key10", .type = JSON_TYPE_STRING, + .value = { + .content_type = JSON_CONTENT_TYPE_STREAM, + .content = { + .str = "foo\\\"\b\f\n\r\t\001\xe1\x83\xbf" } }, }, { .name = "key11", .type = JSON_TYPE_ARRAY, .value = { @@ -2535,10 +2815,30 @@ static struct json_node test_json_tokens_output[] = { } }; +static int +stream_read_value(struct istream **input, const char **value_r) +{ + const unsigned char *data; + size_t size; + ssize_t ret; + + while ((ret = i_stream_read(*input)) > 0) ; + if (ret == 0) + return 0; + i_assert(ret == -1); + if ((*input)->stream_errno != 0) + return -1; + + data = i_stream_get_data(*input, &size); + *value_r = t_strndup(data, size); + i_stream_unref(input); + return 1; +} + static void test_json_istream_tokens(bool full_size) { struct json_istream *jinput; - struct istream *input; + struct istream *input, *jsoninput = NULL; struct json_node jnode; const char *value; unsigned int i, pos, json_input_len = strlen(test_json_tokens_input); @@ -2565,12 +2865,30 @@ static void test_json_istream_tokens(bool full_size) json_istream_ignore(jinput, 1); pos++; continue; - } else { + } else if (pos == ntokens || + test_output->value.content_type != + JSON_CONTENT_TYPE_STREAM) { ret = json_istream_walk(jinput, &jnode); if (ret > 0 && test_output->value.content_type == JSON_CONTENT_TYPE_STRING) value = jnode.value.content.str; + } else { + if (jsoninput != NULL) + ret = 1; + else { + ret = json_istream_read_next_stream( + jinput, 0, 1024, NULL, &jnode); + if (ret > 0 && + json_node_get_stream( + &jnode, &jsoninput) < 0) + ret = -1; + } + + if (ret > 0 && jsoninput != NULL) { + ret = stream_read_value(&jsoninput, + &value); + } } if (ret <= 0) break; @@ -2808,7 +3126,7 @@ static void test_json_istream_skip_object_fields(void) static void test_json_istream_error(void) { - struct istream *input, *err_input; + struct istream *input, *err_input, *val_input; struct json_istream *jinput; const char *text, *error; struct json_node jnode; @@ -2945,6 +3263,74 @@ static void test_json_istream_error(void) json_istream_unref(&jinput); i_stream_unref(&input); + + /* bad string stream */ + text = "\"\xed\xa2\xab <-- encoded surrogate half\""; + text_len = strlen(text); + + input = i_stream_create_from_data(text, text_len); + jinput = json_istream_create(input, 0, NULL, 0); + + test_begin("json istream error - bad string stream"); + + ret = json_istream_read_stream(jinput, 0, 16, NULL, &jnode); + error = json_istream_get_error(jinput); + test_out_reason("read failure", (ret < 0 && error != NULL), error); + + test_end(); + + json_istream_unref(&jinput); + i_stream_unref(&input); + + /* bad string seekable stream */ + text = "\"\xed\xa2\xab <-- encoded surrogate half\""; + text_len = strlen(text); + + input = i_stream_create_from_data(text, text_len); + jinput = json_istream_create(input, 0, NULL, 0); + + test_begin("json istream error - bad seekable string stream"); + + ret = json_istream_read_stream(jinput, 0, IO_BLOCK_SIZE, + "/tmp/dovecot-test-json.", &jnode); + ret = json_istream_read(jinput, &jnode); + error = json_istream_get_error(jinput); + test_out_reason("read failure", (ret < 0 && error != NULL), error); + + test_end(); + + json_istream_unref(&jinput); + i_stream_unref(&input); + + /* string stream with bad end */ + text = "\"bladiebladiebladiebladiebladiebladiebladiebla \xed\xa2\xab\""; + text_len = strlen(text); + + input = i_stream_create_from_data(text, text_len); + jinput = json_istream_create(input, 0, NULL, 0); + + test_begin("json istream error - string stream with bad end"); + + ret = json_istream_read_stream(jinput, 0, 16, + "/tmp/dovecot-test-json.", &jnode); + test_out_reason_quiet("read success", ret > 0, + json_istream_get_error(jinput)); + test_assert(json_node_is_string(&jnode)); + test_assert(jnode.value.content_type == JSON_CONTENT_TYPE_STREAM); + test_assert(jnode.value.content.stream != NULL); + val_input = jnode.value.content.stream; + if (val_input != NULL) + i_stream_ref(val_input); + json_istream_skip(jinput); + ret = json_istream_read(jinput, &jnode); + error = json_istream_get_error(jinput); + test_out_reason("read failure", (ret < 0 && error != NULL), error); + + test_end(); + + i_stream_unref(&val_input); + json_istream_unref(&jinput); + i_stream_unref(&input); } /* @@ -2961,6 +3347,7 @@ int main(int argc, char *argv[]) test_json_istream_read_buffer, test_json_istream_read_trickle, test_json_istream_finish, + test_json_istream_read_stream, test_json_istream_tokens_buffer, test_json_istream_tokens_trickle, test_json_istream_skip_array, -- 2.47.3