]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Implement read throttling when writes backlog
authorOndřej Kuzník <okuznik@symas.com>
Fri, 29 Mar 2019 12:56:24 +0000 (12:56 +0000)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:58:15 +0000 (17:58 +0000)
Reject operations in such a case with LDAP_BUSY. If read_event feature
is on, just stop reading from the connection. However this could still
result in deadlocks in reasonable situations. Need to figure out better
ways to make it safe and still protect ourselves.

servers/lloadd/client.c
servers/lloadd/config.c
servers/lloadd/connection.c
servers/lloadd/daemon.c
servers/lloadd/extended.c
servers/lloadd/lload.h
servers/lloadd/upstream.c

index f3e2870b666f955cabf656bf1bceb7f104636035..19c15af30e82e8543d5c7537dde9e4e998a104e9 100644 (file)
@@ -266,6 +266,11 @@ handle_one_request( LloadConnection *c )
                         0 );
                 return LDAP_SUCCESS;
             }
+            if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
+                operation_send_reject( op, LDAP_BUSY,
+                        "writing side backlogged, please keep reading", 0 );
+                return LDAP_SUCCESS;
+            }
             if ( op->o_tag == LDAP_REQ_EXTENDED ) {
                 handler = request_extended;
             } else {
index e9ea23185077aa9e809f1f64667ade370a315337..638c167b70f5bdcd1d38f3662b6217f9c5db7e03 100644 (file)
@@ -1856,6 +1856,7 @@ config_feature( ConfigArgs *c )
         { BER_BVC("vc"), LLOAD_FEATURE_VC },
 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
         { BER_BVC("proxyauthz"), LLOAD_FEATURE_PROXYAUTHZ },
+        { BER_BVC("read_pause"), LLOAD_FEATURE_PAUSE },
         { BER_BVNULL, 0 }
     };
     slap_mask_t mask = 0;
index 0bcbc73fcbe429fe9f001e5794e60ea4e96820f9..3bfa2d882000058708f4a3a8f3227c066e59f449 100644 (file)
@@ -102,6 +102,10 @@ handle_pdus( void *ctx, void *arg )
         c->c_currentber = ber;
 
         checked_lock( &c->c_io_mutex );
+        if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
+                (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+            goto pause;
+        }
         tag = ber_get_next( c->c_sb, &len, ber );
         checked_unlock( &c->c_io_mutex );
         if ( tag != LDAP_TAG_MESSAGE ) {
@@ -135,10 +139,18 @@ handle_pdus( void *ctx, void *arg )
         assert( IS_ALIVE( c, c_refcnt ) );
     }
 
-    event_add( c->c_read_event, c->c_read_timeout );
-    Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
-            "re-enabled read event on connid=%lu\n",
-            c->c_connid );
+    checked_lock( &c->c_io_mutex );
+    if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
+            !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+        event_add( c->c_read_event, c->c_read_timeout );
+        Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
+                "re-enabled read event on connid=%lu\n",
+                c->c_connid );
+    }
+pause:
+    c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+    checked_unlock( &c->c_io_mutex );
+
 done:
     RELEASE_REF( c, c_refcnt, c->c_destroy );
     epoch_leave( epoch );
@@ -160,6 +172,7 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
     ber_tag_t tag;
     ber_len_t len;
     epoch_t epoch;
+    int pause;
 
     if ( !IS_ALIVE( c, c_live ) ) {
         event_del( c->c_read_event );
@@ -199,7 +212,9 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
     c->c_currentber = ber;
 
     checked_lock( &c->c_io_mutex );
+    assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
     tag = ber_get_next( c->c_sb, &len, ber );
+    pause = c->c_io_state & LLOAD_C_READ_PAUSE;
     checked_unlock( &c->c_io_mutex );
 
     if ( tag != LDAP_TAG_MESSAGE ) {
@@ -229,20 +244,34 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
             CONNECTION_LOCK_DESTROY(c);
             goto out;
         }
-        event_add( c->c_read_event, c->c_read_timeout );
-        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
-                "re-enabled read event on connid=%lu\n",
-                c->c_connid );
+        if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
+            event_add( c->c_read_event, c->c_read_timeout );
+            Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+                    "re-enabled read event on connid=%lu\n",
+                    c->c_connid );
+        }
         goto out;
     }
 
+    checked_lock( &c->c_io_mutex );
+    c->c_io_state |= LLOAD_C_READ_HANDOVER;
+    checked_unlock( &c->c_io_mutex );
     event_del( c->c_read_event );
+
     if ( !lload_conn_max_pdus_per_cycle ||
             ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
         /* If we're overloaded or configured as such, process one and resume in
          * the next cycle. */
-        event_add( c->c_read_event, c->c_read_timeout );
-        c->c_pdu_cb( c );
+        int rc = c->c_pdu_cb( c );
+
+        checked_lock( &c->c_io_mutex );
+        c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+        if ( rc == LDAP_SUCCESS &&
+                ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
+                        !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
+            event_add( c->c_read_event, c->c_read_timeout );
+        }
+        checked_unlock( &c->c_io_mutex );
         goto out;
     }
 
@@ -313,9 +342,28 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
             CONNECTION_LOCK_DESTROY(c);
             goto done;
         }
+
+        if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+            Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+                    "connection connid=%lu blocked on writing, marking "
+                    "paused\n",
+                    c->c_connid );
+        }
+        c->c_io_state |= LLOAD_C_READ_PAUSE;
+
+        /* TODO: Do not reset write timeout unless we wrote something */
         event_add( c->c_write_event, lload_write_timeout );
     } else {
         c->c_pendingber = NULL;
+        if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
+            c->c_io_state ^= LLOAD_C_READ_PAUSE;
+            Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+                    "Unpausing connection connid=%lu\n",
+                    c->c_connid );
+            if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
+                event_add( c->c_read_event, c->c_read_timeout );
+            }
+        }
     }
     checked_unlock( &c->c_io_mutex );
 
index 853b380ba51e3c72ce6401a7ddcbb9094e807712..f03e1ab48be425707175477209c587aba79e0d4c 100644 (file)
@@ -1637,6 +1637,8 @@ lload_handle_global_invalidation( LloadChange *change )
          * - ProxyAuthz:
          *   - on: nothing needed
          *   - off: clear c_auth/privileged on each client
+         * - read pause (WIP):
+         *   - nothing needed?
          */
 
         assert( change->target );
@@ -1644,6 +1646,9 @@ lload_handle_global_invalidation( LloadChange *change )
             assert(0);
             feature_diff &= ~LLOAD_FEATURE_VC;
         }
+        if ( feature_diff & LLOAD_FEATURE_PAUSE ) {
+            feature_diff &= ~LLOAD_FEATURE_PAUSE;
+        }
         if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) {
             if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
                 LloadConnection *c;
index 330523c07647533d61d31b38c06a8406b52adf2d..f8e491f950daf078b29b542d7bbe0c96567ace91 100644 (file)
@@ -89,6 +89,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
     ber_printf( output, "t{tit{ess}}", LDAP_TAG_MESSAGE,
             LDAP_TAG_MSGID, op->o_client_msgid,
             LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" );
+    c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
     checked_unlock( &c->c_io_mutex );
 
     CONNECTION_LOCK(c);
index 0de410bebe52d4aa5095fa9f5a762333030e1962..72c34511723dcfb8ec5df7b5185152418c92a1a9 100644 (file)
@@ -174,6 +174,7 @@ typedef enum {
     LLOAD_FEATURE_VC = 1 << 0,
 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
     LLOAD_FEATURE_PROXYAUTHZ = 1 << 1,
+    LLOAD_FEATURE_PAUSE = 1 << 2,
 } lload_features_t;
 
 #ifdef BALANCER_MODULE
@@ -272,7 +273,8 @@ enum sc_state {
     LLOAD_C_CLOSING,     /* closing */
     LLOAD_C_ACTIVE,      /* exclusive operation (tls setup, ...) in progress */
     LLOAD_C_BINDING,     /* binding */
-    LLOAD_C_DYING, /* part-processed dead but someone still holds a reference */
+    LLOAD_C_DYING,       /* part-processed dead waiting to be freed, someone
+                          * might still be observing it */
 };
 enum sc_type {
     LLOAD_C_OPEN = 0,  /* regular connection */
@@ -280,12 +282,22 @@ enum sc_type {
     LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */
     LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */
 };
+enum sc_io_state {
+    LLOAD_C_OPERATIONAL = 0,        /* all is good */
+    LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or
+                                     * running, do not re-enable c_read_event */
+    LLOAD_C_READ_PAUSE = 1 << 1,    /* We want to pause reading until the client
+                                     * has sufficiently caught up with what we
+                                     * sent */
+};
+
 /*
  * represents a connection from an ldap client/to ldap server
  */
 struct LloadConnection {
     enum sc_state c_state; /* connection state */
     enum sc_type c_type;
+    enum sc_io_state c_io_state;
     ber_socket_t c_fd;
 
 /*
index dd153923b5cc50d3e4b9b2ccce725329da02d389..352adec18a1b637aabcdf5867ea7c427482eb775 100644 (file)
@@ -513,6 +513,9 @@ upstream_bind_cb( LloadConnection *c )
             goto fail;
     }
 
+    checked_lock( &c->c_io_mutex );
+    c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+    checked_unlock( &c->c_io_mutex );
     event_add( c->c_read_event, c->c_read_timeout );
     ber_free( ber, 1 );
     return -1;
@@ -578,6 +581,9 @@ upstream_bind( void *ctx, void *arg )
         }
 #endif /* HAVE_CYRUS_SASL */
     }
+    /* TODO: can we be paused at this point? Then we'd have to move this line
+     * after connection_write_cb */
+    c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
     checked_unlock( &c->c_io_mutex );
 
     connection_write_cb( -1, 0, c );
@@ -832,11 +838,16 @@ upstream_starttls( LloadConnection *c )
         ber_free( ber, 1 );
         CONNECTION_UNLOCK(c);
 
+        checked_lock( &c->c_io_mutex );
+        c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+        checked_unlock( &c->c_io_mutex );
+
         return rc;
     }
 
     base = event_get_base( c->c_read_event );
 
+    c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
     event_del( c->c_read_event );
     event_del( c->c_write_event );