smtp_server_connection_check_pipeline(struct smtp_server_connection *conn)
{
unsigned int pipeline = conn->command_queue_count;
+ struct smtp_server_command *cmd;
if (conn->command_queue_tail != NULL) {
i_assert(pipeline > 0);
pipeline, conn->set.max_pipelined_commands);
return FALSE;
}
+
+ cmd = conn->command_queue_head;
+ while (cmd != NULL) {
+ if (cmd->pipeline_blocked)
+ return FALSE;
+ cmd = cmd->next;
+ }
return TRUE;
}
void smtp_server_connection_input_resume(struct smtp_server_connection *conn)
{
struct smtp_server_command *cmd;
- bool cmd_locked = FALSE;
if (conn->conn.io == NULL) {
/* Only resume when we actually can */
if (conn->input_locked || conn->input_broken ||
conn->disconnected)
return;
- if (!smtp_server_connection_check_pipeline(conn))
- return;
+ if (!smtp_server_connection_check_pipeline(conn)) {
+ i_assert(conn->command_queue_tail != NULL);
+ if (!conn->command_queue_tail->pipeline_blocked ||
+ !smtp_server_connection_pending_command_data(conn))
+ return;
+ }
/* Is queued command still blocking input? */
cmd = conn->command_queue_head;
while (cmd != NULL) {
- if (cmd->input_locked || cmd->pipeline_blocked) {
- cmd_locked = TRUE;
- break;
- }
+ if (cmd->input_locked)
+ return;
cmd = cmd->next;
}
- if (cmd_locked)
- return;
/* Restore input handler */
connection_input_resume(&conn->conn);
}
static void
-smtp_server_connection_handle_input(struct smtp_server_connection *conn)
+smtp_server_connection_handle_input(struct smtp_server_connection *conn,
+ bool pipeline_blocked)
{
struct smtp_server_command *pending_command;
enum smtp_command_parse_error error_code;
ret = 1;
while (!conn->closing && !conn->input_locked && ret != 0) {
while ((ret = smtp_command_parse_next(
- conn->smtp_parser, &cmd_name, &cmd_params,
- &error_code, &error)) > 0) {
+ conn->smtp_parser, pipeline_blocked,
+ &cmd_name, &cmd_params, &error_code, &error)) > 0) {
if (pending_command != NULL) {
/* Previous command is now fully read and ready
pending_command = NULL;
}
+ if (pipeline_blocked) {
+ e_debug(conn->event,
+ "Discarded remaining data for previous command");
+ break;
+ }
+
e_debug(conn->event, "Received new command: %s %s",
cmd_name, cmd_params);
if (conn->disconnected)
return;
- if (conn->input_broken || conn->closing) {
+ if (conn->input_broken || conn->closing || pipeline_blocked) {
smtp_server_connection_input_halt(conn);
return;
}
ssl_iostream_is_handshaked(conn->ssl_iostream)))
smtp_server_connection_ready(conn);
+ bool pipeline_blocked = FALSE;
if (!smtp_server_connection_check_pipeline(conn)) {
- smtp_server_connection_input_halt(conn);
- return;
+ /* Check whether the last command in the queue is blocking the
+ pipeline and whether it still has pending input data. In that
+ case we read/discard that first to possibly unblock the
+ pipeline. */
+ if (conn->command_queue_tail == NULL ||
+ !conn->command_queue_tail->pipeline_blocked ||
+ !smtp_server_connection_pending_command_data(conn)) {
+ smtp_server_connection_input_halt(conn);
+ return;
+ }
+ pipeline_blocked = TRUE;
}
smtp_server_connection_ref(conn);
if (conn->callbacks != NULL &&
conn->callbacks->conn_cmd_input_pre != NULL)
conn->callbacks->conn_cmd_input_pre(conn->context);
- smtp_server_connection_handle_input(conn);
+ smtp_server_connection_handle_input(conn, pipeline_blocked);
if (conn->callbacks != NULL &&
conn->callbacks->conn_cmd_input_post != NULL)
conn->callbacks->conn_cmd_input_post(conn->context);