From: Stephan Bosch Date: Fri, 22 Nov 2019 18:55:48 +0000 (+0100) Subject: stats: stats-service-openmetrics - Make composition of response payload scalable. X-Git-Tag: 2.3.11.2~242 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a2851c580a5a9cb941cd8248f30f731f9229b15e;p=thirdparty%2Fdovecot%2Fcore.git stats: stats-service-openmetrics - Make composition of response payload scalable. Use the new HTTP server response payload ostream API. --- diff --git a/src/stats/stats-service-openmetrics.c b/src/stats/stats-service-openmetrics.c index e5714e0651..45d7ce142c 100644 --- a/src/stats/stats-service-openmetrics.c +++ b/src/stats/stats-service-openmetrics.c @@ -45,6 +45,8 @@ struct openmetrics_request_sub_metric { }; struct openmetrics_request { + struct ostream *output; + enum openmetrics_request_state state; struct stats_metrics_iter *stats_iter; const struct metric *metric; @@ -447,6 +449,26 @@ openmetrics_export_metric_body(struct openmetrics_request *req, string_t *out, openmetrics_export_metric_value(req, out, req->metric, timestamp); } +static int +openmetrics_send_buffer(struct openmetrics_request *req, buffer_t *buffer) +{ + ssize_t sent; + + if (buffer->used == 0) + return 1; + + sent = o_stream_send(req->output, buffer->data, buffer->used); + if (sent < 0) + return -1; + + /* Max buffer size is enormous */ + i_assert((size_t)sent == buffer->used); + + if (o_stream_get_buffer_used_size(req->output) >= IO_BLOCK_SIZE) + return 0; + return 1; +} + static void openmetrics_export_next(struct openmetrics_request *req) { /* Determine what to export next. */ @@ -532,6 +554,14 @@ static void openmetrics_export_continue(struct openmetrics_request *req, } } +static void openmetrics_handle_write_error(struct openmetrics_request *req) +{ + i_info("openmetrics: write(%s) failed: %s", + o_stream_get_name(req->output), + o_stream_get_error(req->output)); + o_stream_destroy(&req->output); +} + static void openmetrics_request_deinit(struct openmetrics_request *req) { stats_metrics_iterate_deinit(&req->stats_iter); @@ -539,24 +569,78 @@ static void openmetrics_request_deinit(struct openmetrics_request *req) array_free(&req->sub_metric_stack); } -static void -openmetrics_export(struct openmetrics_request *req, - struct http_server_response *resp) +static int openmetrics_export(struct openmetrics_request *req) { - string_t *out = t_str_new(2048); int64_t timestamp; + string_t *out; + int ret; + ret = o_stream_flush(req->output); + if (ret < 0) { + openmetrics_handle_write_error(req); + return -1; + } + if (ret == 0) { + /* Output stream buffer needs to be flushed further */ + return 0; + } + + if (req->state == OPENMETRICS_REQUEST_STATE_FINISHED) { + /* All metrics were exported already, so we can finish the + HTTP request now. */ + o_stream_destroy(&req->output); + return 1; + } + + /* Record timestamp for metrics export */ i_assert(ioloop_timeval.tv_usec < 1000000); timestamp = ((int64_t)ioloop_timeval.tv_sec * 1000 + (int64_t)ioloop_timeval.tv_usec / 1000); - while (req->state != OPENMETRICS_REQUEST_STATE_FINISHED) + /* Export metrics into a string buffer and write that buffer to the + output stream after each (sub-)metric, so that the string buffer + stays small. The output stream buffer can grow bigger, but writing is + stopped for later resumption when the output stream buffer has grown + beyond an optimal size. */ + out = t_str_new(1024); + for (;;) { + str_truncate(out, 0); + openmetrics_export_continue(req, out, timestamp); + ret = openmetrics_send_buffer(req, out); + if (ret < 0) { + openmetrics_handle_write_error(req); + return -1; + } + if (req->state == OPENMETRICS_REQUEST_STATE_FINISHED) { + /* Finished export of metrics, but the output stream + buffer may still contain data. */ + break; + } + if (ret == 0) { + /* Output stream buffer is filled up beyond the optimal + size; wait until we can write more. */ + return ret; + } + } + + /* Cleanup everything except the output stream */ openmetrics_request_deinit(req); - http_server_response_set_payload_data( - resp, str_data(out), str_len(out)); + /* Finished; flush output */ + ret = o_stream_finish(req->output); + if (ret < 0) { + openmetrics_handle_write_error(req); + return -1; + } + return ret; +} + +static void openmetrics_request_destroy(struct openmetrics_request *req) +{ + o_stream_destroy(&req->output); + openmetrics_request_deinit(req); } static void @@ -566,7 +650,8 @@ stats_service_openmetrics_request(void *context ATTR_UNUSED, { const struct http_request *hreq = http_server_request_get(hsreq); struct http_server_response *hsresp; - struct openmetrics_request req; + struct openmetrics_request *req; + pool_t pool; if (strcmp(hreq->method, "OPTIONS") == 0) { hsresp = http_server_response_create(hsreq, 200, "OK"); @@ -583,16 +668,23 @@ stats_service_openmetrics_request(void *context ATTR_UNUSED, return; } + pool = http_server_request_get_pool(hsreq); + req = p_new(pool, struct openmetrics_request, 1); + + http_server_request_set_destroy_callback( + hsreq, openmetrics_request_destroy, req); + hsresp = http_server_response_create(hsreq, 200, "OK"); http_server_response_add_header( hsresp, "Content-Type", "text/plain; version="OPENMETRICS_CONTENT_VERSION"; " "charset=utf-8"); - i_zero(&req); - openmetrics_export(&req, hsresp); + req->output = http_server_response_get_payload_output( + hsresp, SIZE_MAX, FALSE); - http_server_response_submit(hsresp); + o_stream_set_flush_callback(req->output, openmetrics_export, req); + o_stream_set_flush_pending(req->output, TRUE); } void stats_service_openmetrics_init(void)