]> git.ipfire.org Git - thirdparty/libvirt.git/commitdiff
Allow concurrent processing of RPC calls in daemon
authorDaniel P. Berrange <berrange@redhat.com>
Tue, 20 Jan 2009 19:25:15 +0000 (19:25 +0000)
committerDaniel P. Berrange <berrange@redhat.com>
Tue, 20 Jan 2009 19:25:15 +0000 (19:25 +0000)
ChangeLog
qemud/libvirtd.aug
qemud/libvirtd.conf
qemud/qemud.c
qemud/qemud.h
qemud/remote.c
qemud/test_libvirtd.aug
src/remote_internal.c

index d12c1b3600bad21759391dbd8869cad3676012b6..0855957e6b67edaffaa1f0632b8c8e0c8bb5f45a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
+
+       * qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the
+       processing of multiple concurrent RPC calls per client
+       connection.
+       * qemud/libvirtd.conf, qemud/libvirtd.aug,
+       qemud/test_libvirtd.aug: Add config param for controlling
+       number of requests per client.
+
 Tue Jan 20 18:16:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
 
        * src/xm_internal.c: Fix 2 misleading comments & potential
index 7cfd458efb5787195037a1838854704f9b3be5ac..40acd9363af26ef89fd624f514e5f4bcb01e4618 100644 (file)
@@ -53,6 +53,8 @@ module Libvirtd =
    let processing_entry = int_entry "min_workers"
                         | int_entry "max_workers"
                         | int_entry "max_clients"
+                        | int_entry "max_requests"
+                        | int_entry "max_client_requests"
 
    let logging_entry = int_entry "log_level"
                      | str_entry "log_filters"
index ecb28dcdf3e519a9930f24112aa87318d1f8af80..49320843ba2fab4be2d2cdeab859d06d11ef50d7 100644 (file)
 #min_workers = 5
 #max_workers = 20
 
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+#
+# XXX this isn't actually enforced yet, only the per-client
+# limit is used so far
+#max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+#max_client_requests = 5
+
 #################################################################
 #
 # Logging controls
index 5eec6c0590019b9b4158f0e2e3c833f7f20d7d85..21cecf23970f603c17f7dbb99039e5f72430da88 100644 (file)
@@ -138,6 +138,11 @@ static int min_workers = 5;
 static int max_workers = 20;
 static int max_clients = 20;
 
+/* Total number of 'in-process' RPC calls allowed across all clients */
+static int max_requests = 20;
+/* Total number of 'in-process' RPC calls allowed by a single client*/
+static int max_client_requests = 5;
+
 #define DH_BITS 1024
 
 static sig_atomic_t sig_errors = 0;
@@ -162,9 +167,35 @@ static void sig_handler(int sig, siginfo_t * siginfo,
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int removeFirst);
+
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp) {
+        while (tmp->next)
+            tmp = tmp->next;
+        tmp->next = msg;
+    } else {
+        *queue = msg;
+    }
+}
+
+static struct qemud_client_message *
+qemudClientMessageQueueServe(struct qemud_client_message **queue)
+{
+    struct qemud_client_message *tmp = *queue;
+
+    if (tmp) {
+        *queue = tmp->next;
+        tmp->next = NULL;
+    }
+
+    return tmp;
+}
 
 static int
 remoteCheckCertFile(const char *type, const char *file)
@@ -1042,6 +1073,8 @@ remoteCheckCertificate (gnutls_session_t session)
 static int
 remoteCheckAccess (struct qemud_client *client)
 {
+    struct qemud_client_message *confirm;
+
     /* Verify client certificate. */
     if (remoteCheckCertificate (client->tlssession) == -1) {
         VIR_ERROR0(_("remoteCheckCertificate: "
@@ -1051,14 +1084,25 @@ remoteCheckAccess (struct qemud_client *client)
                           "is set so the bad certificate is ignored"));
     }
 
+    if (client->tx) {
+        VIR_INFO("%s",
+                 _("client had unexpected data pending tx after access check"));
+        return -1;
+    }
+
+    if (VIR_ALLOC(confirm) < 0)
+        return -1;
+
     /* Checks have succeeded.  Write a '\1' byte back to the client to
      * indicate this (otherwise the socket is abruptly closed).
      * (NB. The '\1' byte is sent in an encrypted record).
      */
-    client->bufferLength = 1;
-    client->bufferOffset = 0;
-    client->buffer[0] = '\1';
-    client->mode = QEMUD_MODE_TX_PACKET;
+    confirm->async = 1;
+    confirm->bufferLength = 1;
+    confirm->bufferOffset = 0;
+    confirm->buffer[0] = '\1';
+
+    client->tx = confirm;
     return 0;
 }
 
@@ -1084,6 +1128,7 @@ int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid) {
 }
 #endif
 
+
 static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket *sock) {
     int fd;
     struct sockaddr_storage addr;
@@ -1099,7 +1144,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     }
 
     if (server->nclients >= max_clients) {
-        VIR_ERROR0(_("Too many active clients, dropping connection"));
+        VIR_ERROR(_("Too many active clients (%d), dropping connection"), max_clients);
         close(fd);
         return -1;
     }
@@ -1137,6 +1182,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     client->addrlen = addrlen;
     client->server = server;
 
+    /* Prepare one for packet receive */
+    if (VIR_ALLOC(client->rx) < 0)
+        goto cleanup;
+    client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+
 #if HAVE_POLKIT
     /* Only do policy checks for non-root - allow root user
        through with no checks, as a fail-safe - root can easily
@@ -1158,9 +1209,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 #endif
 
     if (client->type != QEMUD_SOCK_TYPE_TLS) {
-        client->mode = QEMUD_MODE_RX_HEADER;
-        client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-
+        /* Plain socket, so prepare to read first message */
         if (qemudRegisterClientEvent (server, client, 0) < 0)
             goto cleanup;
     } else {
@@ -1180,12 +1229,12 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
             if (remoteCheckAccess (client) == -1)
                 goto cleanup;
 
+            /* Handshake & cert check OK,  so prepare to read first message */
             if (qemudRegisterClientEvent(server, client, 0) < 0)
                 goto cleanup;
         } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
-            /* Most likely. */
-            client->mode = QEMUD_MODE_TLS_HANDSHAKE;
-            client->bufferLength = -1;
+            /* Most likely, need to do more handshake data */
+            client->handshake = 1;
 
             if (qemudRegisterClientEvent (server, client, 0) < 0)
                 goto cleanup;
@@ -1204,7 +1253,8 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
     if (client &&
         client->tlssession) gnutls_deinit (client->tlssession);
     close (fd);
-    free (client);
+    VIR_FREE(client->rx);
+    VIR_FREE(client);
     return -1;
 }
 
@@ -1216,8 +1266,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
  * We keep the libvirt connection open until any async
  * jobs have finished, then clean it up elsehwere
  */
-static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
-                                       struct qemud_client *client) {
+void qemudDispatchClientFailure(struct qemud_client *client) {
     virEventRemoveHandleImpl(client->watch);
 
     /* Deregister event delivery callback */
@@ -1242,7 +1291,7 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server)
     int i;
     for (i = 0 ; i < server->nclients ; i++) {
         virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
+        if (server->clients[i]->dx) {
             /* Delibrately don't unlock client - caller wants the lock */
             return server->clients[i];
         }
@@ -1256,8 +1305,9 @@ static void *qemudWorker(void *data)
     struct qemud_server *server = data;
 
     while (1) {
-        struct qemud_client *client;
-        int len;
+        struct qemud_client *client = NULL;
+        struct qemud_client_message *reply;
+
         virMutexLock(&server->lock);
         while ((client = qemudPendingJob(server)) == NULL) {
             if (virCondWait(&server->job, &server->lock) < 0) {
@@ -1268,55 +1318,75 @@ static void *qemudWorker(void *data)
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
-        client->mode = QEMUD_MODE_IN_DISPATCH;
         client->refs++;
 
-        if ((len = remoteDispatchClientRequest (server, client)) == 0)
-            qemudDispatchClientFailure(server, client);
+        /* Remove our message from dispatch queue while we use it */
+        reply = qemudClientMessageQueueServe(&client->dx);
+
+        /* This function drops the lock during dispatch,
+         * and re-acquires it before returning */
+        if (remoteDispatchClientRequest (server, client, reply) < 0) {
+            VIR_FREE(reply);
+            qemudDispatchClientFailure(client);
+            client->refs--;
+            virMutexUnlock(&client->lock);
+            continue;
+        }
 
-        /* Set up the output buffer. */
-        client->mode = QEMUD_MODE_TX_PACKET;
-        client->bufferLength = len;
-        client->bufferOffset = 0;
+        /* Put reply on end of tx queue to send out  */
+        qemudClientMessageQueuePush(&client->tx, reply);
 
         if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
 
         client->refs--;
         virMutexUnlock(&client->lock);
-        virMutexUnlock(&server->lock);
     }
 }
 
 
-static int qemudClientReadBuf(struct qemud_server *server,
-                              struct qemud_client *client,
-                              char *data, unsigned len) {
-    int ret;
+/*
+ * Read data into buffer using wire decoding (plain or TLS)
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadBuf(struct qemud_client *client,
+                                  char *data, ssize_t len) {
+    ssize_t ret;
+
+    if (len < 0) {
+        VIR_ERROR(_("unexpected negative length request %d"), len);
+        qemudDispatchClientFailure(client);
+        return -1;
+    }
 
     /*qemudDebug ("qemudClientRead: len = %d", len);*/
 
     if (!client->tlssession) {
-        if ((ret = read (client->fd, data, len)) <= 0) {
-            if (ret == 0 || errno != EAGAIN) {
-                if (ret != 0)
-                    VIR_ERROR(_("read: %s"), strerror (errno));
-                qemudDispatchClientFailure(server, client);
-            }
+        ret = read (client->fd, data, len);
+        if (ret == -1 && (errno == EAGAIN ||
+                          errno == EINTR))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("read: %s"), strerror (errno));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_recv (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret <= 0) {
-            if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
-                             ret != GNUTLS_E_INTERRUPTED)) {
-                if (ret != 0)
-                    VIR_ERROR(_("gnutls_record_recv: %s"),
-                              gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+
+        if (ret < 0 && (ret == GNUTLS_E_AGAIN ||
+                        ret == GNUTLS_E_INTERRUPTED))
+            return 0;
+        if (ret <= 0) {
+            if (ret != 0)
+                VIR_ERROR(_("gnutls_record_recv: %s"),
+                          gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1324,22 +1394,37 @@ static int qemudClientReadBuf(struct qemud_server *server,
     return ret;
 }
 
-static int qemudClientReadPlain(struct qemud_server *server,
-                                struct qemud_client *client) {
-    int ret;
-    ret = qemudClientReadBuf(server, client,
-                             client->buffer + client->bufferOffset,
-                             client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return ret;
-    client->bufferOffset += ret;
-    return 0;
+/*
+ * Read data into buffer without decoding
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadPlain(struct qemud_client *client) {
+    ssize_t ret;
+    ret = qemudClientReadBuf(client,
+                             client->rx->buffer + client->rx->bufferOffset,
+                             client->rx->bufferLength - client->rx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 eagain */
+
+    client->rx->bufferOffset += ret;
+    return ret;
 }
 
 #if HAVE_SASL
-static int qemudClientReadSASL(struct qemud_server *server,
-                               struct qemud_client *client) {
-    int got, want;
+/*
+ * Read data into buffer decoding with SASL
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientReadSASL(struct qemud_client *client) {
+    ssize_t got, want;
 
     /* We're doing a SSF data read, so now its times to ensure
      * future writes are under SSF too.
@@ -1350,166 +1435,176 @@ static int qemudClientReadSASL(struct qemud_server *server,
 
     /* Need to read some more data off the wire */
     if (client->saslDecoded == NULL) {
+        int ret;
         char encoded[8192];
-        int encodedLen = sizeof(encoded);
-        encodedLen = qemudClientReadBuf(server, client, encoded, encodedLen);
-
-        if (encodedLen < 0)
+        ssize_t encodedLen = sizeof(encoded);
+        encodedLen = qemudClientReadBuf(client, encoded, encodedLen);
+
+        if (encodedLen <= 0)
+            return encodedLen;
+
+        ret = sasl_decode(client->saslconn, encoded, encodedLen,
+                          &client->saslDecoded, &client->saslDecodedLength);
+        if (ret != SASL_OK) {
+            VIR_ERROR(_("failed to decode SASL data %s"),
+                      sasl_errstring(ret, NULL, NULL));
+            qemudDispatchClientFailure(client);
             return -1;
-
-        sasl_decode(client->saslconn, encoded, encodedLen,
-                    &client->saslDecoded, &client->saslDecodedLength);
+        }
 
         client->saslDecodedOffset = 0;
     }
 
     /* Some buffered decoded data to return now */
     got = client->saslDecodedLength - client->saslDecodedOffset;
-    want = client->bufferLength - client->bufferOffset;
+    want = client->rx->bufferLength - client->rx->bufferOffset;
 
     if (want > got)
         want = got;
 
-    memcpy(client->buffer + client->bufferOffset,
+    memcpy(client->rx->buffer + client->rx->bufferOffset,
            client->saslDecoded + client->saslDecodedOffset, want);
     client->saslDecodedOffset += want;
-    client->bufferOffset += want;
+    client->rx->bufferOffset += want;
 
     if (client->saslDecodedOffset == client->saslDecodedLength) {
         client->saslDecoded = NULL;
         client->saslDecodedOffset = client->saslDecodedLength = 0;
     }
 
-    return 0;
+    return want;
 }
 #endif
 
-static int qemudClientRead(struct qemud_server *server,
-                           struct qemud_client *client) {
+/*
+ * Read as much data off wire as possible till we fill our
+ * buffer, or would block on I/O
+ */
+static ssize_t qemudClientRead(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_READ)
-        return qemudClientReadSASL(server, client);
+        return qemudClientReadSASL(client);
     else
 #endif
-        return qemudClientReadPlain(server, client);
+        return qemudClientReadPlain(client);
 }
 
 
-static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_client *client) {
-    unsigned int len;
+/*
+ * Read data until we get a complete message to process
+ */
+static void qemudDispatchClientRead(struct qemud_server *server,
+                                    struct qemud_client *client) {
     /*qemudDebug ("qemudDispatchClientRead: mode = %d", client->mode);*/
 
-    switch (client->mode) {
-    case QEMUD_MODE_RX_HEADER: {
-        XDR x;
+readmore:
+    if (qemudClientRead(client) < 0)
+        return; /* Error */
 
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
+    if (client->rx->bufferOffset < client->rx->bufferLength)
+        return; /* Still not read enough */
 
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
+    /* Either done with length word header */
+    if (client->rx->bufferLength == REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        unsigned int len;
+        XDR x;
 
-        xdrmem_create(&x, client->buffer, client->bufferLength, XDR_DECODE);
+        xdrmem_create(&x, client->rx->buffer, client->rx->bufferLength, XDR_DECODE);
 
         if (!xdr_u_int(&x, &len)) {
             xdr_destroy (&x);
             DEBUG0("Failed to decode packet length");
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
         xdr_destroy (&x);
 
-        if (len > REMOTE_MESSAGE_MAX) {
-            DEBUG("Packet length %u too large", len);
-            qemudDispatchClientFailure(server, client);
+        if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
+            DEBUG("Packet length %u too small", len);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        /* Length include length of the length field itself, so
-         * check minimum size requirements */
-        if (len <= REMOTE_MESSAGE_HEADER_XDR_LEN) {
-            DEBUG("Packet length %u too small", len);
-            qemudDispatchClientFailure(server, client);
+        /* Length includes the size of the length word itself */
+        len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+        if (len > REMOTE_MESSAGE_MAX) {
+            DEBUG("Packet length %u too large", len);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        client->mode = QEMUD_MODE_RX_PAYLOAD;
-        client->bufferLength = len - REMOTE_MESSAGE_HEADER_XDR_LEN;
-        client->bufferOffset = 0;
+        /* Prepare to read rest of message */
+        client->rx->bufferLength += len;
 
         if (qemudRegisterClientEvent(server, client, 1) < 0) {
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return;
         }
 
-        /* Fall through */
-    }
-
-    case QEMUD_MODE_RX_PAYLOAD: {
-        if (qemudClientRead(server, client) < 0)
-            return; /* Error, or blocking */
-
-        if (client->bufferOffset < client->bufferLength)
-            return; /* Not read enough */
-
-        client->mode = QEMUD_MODE_WAIT_DISPATCH;
-        if (qemudRegisterClientEvent(server, client, 1) < 0)
-            qemudDispatchClientFailure(server, client);
-
-        virCondSignal(&server->job);
-
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
-        int ret;
-
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"),
-                      gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
+        /* Try and read payload immediately instead of going back
+           into poll() because chances are the data is already
+           waiting for us */
+        goto readmore;
+    } else {
+        /* Move completed message to the end of the dispatch queue */
+        qemudClientMessageQueuePush(&client->dx, client->rx);
+        client->rx = NULL;
+        client->nrequests++;
+
+        /* Possibly need to create another receive buffer */
+        if ((client->nrequests < max_client_requests &&
+             VIR_ALLOC(client->rx) < 0)) {
+            qemudDispatchClientFailure(client);
         } else {
-            if (qemudRegisterClientEvent (server ,client, 1) < 0)
-                qemudDispatchClientFailure (server, client);
+            if (client->rx)
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(client);
+            else
+                /* Tell one of the workers to get on with it... */
+                virCondSignal(&server->job);
         }
-
-        break;
-    }
-
-    default:
-        DEBUG("Got unexpected data read while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
     }
 }
 
 
-static int qemudClientWriteBuf(struct qemud_server *server,
-                               struct qemud_client *client,
-                               const char *data, int len) {
-    int ret;
+/*
+ * Send a chunk of data using wire encoding (plain or TLS)
+ *
+ * Returns:
+ *   -1 on error
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientWriteBuf(struct qemud_client *client,
+                                   const char *data, ssize_t len) {
+    ssize_t ret;
+
+    if (len < 0) {
+        VIR_ERROR(_("unexpected negative length request %d"), len);
+        qemudDispatchClientFailure(client);
+        return -1;
+    }
+
     if (!client->tlssession) {
-        if ((ret = safewrite(client->fd, data, len)) == -1) {
+        if ((ret = write(client->fd, data, len)) == -1) {
+            if (errno == EAGAIN || errno == EINTR)
+                return 0;
             VIR_ERROR(_("write: %s"), strerror (errno));
-            qemudDispatchClientFailure(server, client);
+            qemudDispatchClientFailure(client);
             return -1;
         }
     } else {
         ret = gnutls_record_send (client->tlssession, data, len);
-        if (qemudRegisterClientEvent (server, client, 1) < 0)
-            qemudDispatchClientFailure (server, client);
-        else if (ret < 0) {
-            if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
-                VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
-                qemudDispatchClientFailure (server, client);
-            }
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED ||
+                ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            VIR_ERROR(_("gnutls_record_send: %s"), gnutls_strerror (ret));
+            qemudDispatchClientFailure(client);
             return -1;
         }
     }
@@ -1517,42 +1612,62 @@ static int qemudClientWriteBuf(struct qemud_server *server,
 }
 
 
-static int qemudClientWritePlain(struct qemud_server *server,
-                                 struct qemud_client *client) {
-    int ret = qemudClientWriteBuf(server, client,
-                                  client->buffer + client->bufferOffset,
-                                  client->bufferLength - client->bufferOffset);
-    if (ret < 0)
-        return -1;
-    client->bufferOffset += ret;
-    return 0;
+/*
+ * Send client->tx using no encoding
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static int qemudClientWritePlain(struct qemud_client *client) {
+    int ret = qemudClientWriteBuf(client,
+                                  client->tx->buffer + client->tx->bufferOffset,
+                                  client->tx->bufferLength - client->tx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 = egain */
+    client->tx->bufferOffset += ret;
+    return ret;
 }
 
 
 #if HAVE_SASL
-static int qemudClientWriteSASL(struct qemud_server *server,
-                                struct qemud_client *client) {
+/*
+ * Send client->tx using SASL encoding
+ *
+ * Returns:
+ *   -1 on error
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static int qemudClientWriteSASL(struct qemud_client *client) {
     int ret;
 
     /* Not got any pending encoded data, so we need to encode raw stuff */
     if (client->saslEncoded == NULL) {
-        int err;
-        err = sasl_encode(client->saslconn,
-                          client->buffer + client->bufferOffset,
-                          client->bufferLength - client->bufferOffset,
+        ret = sasl_encode(client->saslconn,
+                          client->tx->buffer + client->tx->bufferOffset,
+                          client->tx->bufferLength - client->tx->bufferOffset,
                           &client->saslEncoded,
                           &client->saslEncodedLength);
 
+        if (ret != SASL_OK) {
+            VIR_ERROR(_("failed to encode SASL data %s"),
+                      sasl_errstring(ret, NULL, NULL));
+            qemudDispatchClientFailure(client);
+            return -1;
+        }
+
         client->saslEncodedOffset = 0;
     }
 
     /* Send some of the encoded stuff out on the wire */
-    ret = qemudClientWriteBuf(server, client,
+    ret = qemudClientWriteBuf(client,
                               client->saslEncoded + client->saslEncodedOffset,
                               client->saslEncodedLength - client->saslEncodedOffset);
 
-    if (ret < 0)
-        return -1;
+    if (ret <= 0)
+        return ret; /* -1 error, 0 == egain */
 
     /* Note how much we sent */
     client->saslEncodedOffset += ret;
@@ -1561,78 +1676,107 @@ static int qemudClientWriteSASL(struct qemud_server *server,
     if (client->saslEncodedOffset == client->saslEncodedLength) {
         client->saslEncoded = NULL;
         client->saslEncodedOffset = client->saslEncodedLength = 0;
-        client->bufferOffset = client->bufferLength;
+
+        /* Mark as complete, so caller detects completion */
+        client->tx->bufferOffset = client->tx->bufferLength;
     }
 
-    return 0;
+    return ret;
 }
 #endif
 
-static int qemudClientWrite(struct qemud_server *server,
-                            struct qemud_client *client) {
+/*
+ * Send as much data in the client->tx as possible
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t qemudClientWrite(struct qemud_client *client) {
 #if HAVE_SASL
     if (client->saslSSF & QEMUD_SASL_SSF_WRITE)
-        return qemudClientWriteSASL(server, client);
+        return qemudClientWriteSASL(client);
     else
 #endif
-        return qemudClientWritePlain(server, client);
+        return qemudClientWritePlain(client);
 }
 
 
-void
+/*
+ * Process all queued client->tx messages until
+ * we would block on I/O
+ */
+static void
 qemudDispatchClientWrite(struct qemud_server *server,
                          struct qemud_client *client) {
-    switch (client->mode) {
-    case QEMUD_MODE_TX_PACKET: {
-        if (qemudClientWrite(server, client) < 0)
-            return;
+    while (client->tx) {
+        ssize_t ret;
 
-        if (client->bufferOffset == client->bufferLength) {
-            if (client->closing) {
-                qemudDispatchClientFailure (server, client);
+        ret = qemudClientWrite(client);
+        if (ret < 0) {
+            qemudDispatchClientFailure(client);
+            return;
+        }
+        if (ret == 0)
+            return; /* Would block on write EAGAIN */
+
+        if (client->tx->bufferOffset == client->tx->bufferLength) {
+            struct qemud_client_message *reply;
+
+            /* Get finished reply from head of tx queue */
+            reply = qemudClientMessageQueueServe(&client->tx);
+
+            /* If its not an async message, then we have
+             * just completed an RPC request */
+            if (!reply->async)
+                client->nrequests--;
+
+            /* Move record to end of 'rx' ist */
+            if (!client->rx &&
+                client->nrequests < max_client_requests) {
+                /* Reset message record for next RX attempt */
+                client->rx = reply;
+                client->rx->bufferOffset = 0;
+                client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
             } else {
-                /* Done writing, switch back to receive */
-                client->mode = QEMUD_MODE_RX_HEADER;
-                client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
-                client->bufferOffset = 0;
-
-                if (qemudRegisterClientEvent (server, client, 1) < 0)
-                    qemudDispatchClientFailure (server, client);
+                VIR_FREE(reply);
             }
-        }
-        /* Still writing */
-        break;
-    }
-
-    case QEMUD_MODE_TLS_HANDSHAKE: {
-        int ret;
-
-        /* Continue the handshake. */
-        ret = gnutls_handshake (client->tlssession);
-        if (ret == 0) {
-            /* Finished.  Next step is to check the certificate. */
-            if (remoteCheckAccess (client) == -1)
-                qemudDispatchClientFailure (server, client);
-            else if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
-        } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
-            VIR_ERROR(_("TLS handshake failed: %s"), gnutls_strerror (ret));
-            qemudDispatchClientFailure (server, client);
-        } else {
-            if (qemudRegisterClientEvent (server, client, 1))
-                qemudDispatchClientFailure (server, client);
-        }
 
-        break;
+            if (client->closing ||
+                qemudRegisterClientEvent (server, client, 1) < 0)
+                 qemudDispatchClientFailure(client);
+         }
     }
+}
 
-    default:
-        DEBUG("Got unexpected data write while in %d mode", client->mode);
-        qemudDispatchClientFailure(server, client);
+static void
+qemudDispatchClientHandshake(struct qemud_server *server,
+                             struct qemud_client *client) {
+    int ret;
+    /* Continue the handshake. */
+    ret = gnutls_handshake (client->tlssession);
+    if (ret == 0) {
+        /* Finished.  Next step is to check the certificate. */
+        if (remoteCheckAccess (client) == -1)
+            qemudDispatchClientFailure(client);
+        else if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else if (ret == GNUTLS_E_AGAIN ||
+               ret == GNUTLS_E_INTERRUPTED) {
+        /* Carry on waiting for more handshake. Update
+           the events just in case handshake data flow
+           direction has changed */
+        if (qemudRegisterClientEvent (server, client, 1))
+            qemudDispatchClientFailure(client);
+    } else {
+        /* Fatal error in handshake */
+        VIR_ERROR(_("TLS handshake failed: %s"),
+                  gnutls_strerror (ret));
+        qemudDispatchClientFailure(client);
     }
 }
 
-
 static void
 qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
     struct qemud_server *server = (struct qemud_server *)opaque;
@@ -1642,59 +1786,66 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
     virMutexLock(&server->lock);
 
     for (i = 0 ; i < server->nclients ; i++) {
+        virMutexLock(&server->clients[i]->lock);
         if (server->clients[i]->watch == watch) {
             client = server->clients[i];
             break;
         }
+        virMutexUnlock(&server->clients[i]->lock);
     }
 
+    virMutexUnlock(&server->lock);
+
     if (!client) {
-        virMutexUnlock(&server->lock);
         return;
     }
 
-    virMutexLock(&client->lock);
-    virMutexUnlock(&server->lock);
-
-    if (client->fd != fd)
+    if (client->fd != fd) {
+        virMutexUnlock(&client->lock);
         return;
+    }
+
+    if (events & (VIR_EVENT_HANDLE_WRITABLE |
+                  VIR_EVENT_HANDLE_READABLE)) {
+        if (client->handshake) {
+            qemudDispatchClientHandshake(server, client);
+        } else {
+            if (events & VIR_EVENT_HANDLE_WRITABLE)
+                qemudDispatchClientWrite(server, client);
+            if (events & VIR_EVENT_HANDLE_READABLE)
+                qemudDispatchClientRead(server, client);
+        }
+    }
+
+    /* NB, will get HANGUP + READABLE at same time upon
+     * disconnect */
+    if (events & (VIR_EVENT_HANDLE_ERROR |
+                  VIR_EVENT_HANDLE_HANGUP))
+        qemudDispatchClientFailure(client);
 
-    if (events == VIR_EVENT_HANDLE_WRITABLE)
-        qemudDispatchClientWrite(server, client);
-    else if (events == VIR_EVENT_HANDLE_READABLE)
-        qemudDispatchClientRead(server, client);
-    else
-        qemudDispatchClientFailure(server, client);
     virMutexUnlock(&client->lock);
 }
 
-static int qemudRegisterClientEvent(struct qemud_server *server,
-                                    struct qemud_client *client,
-                                    int update) {
-    int mode;
-    switch (client->mode) {
-    case QEMUD_MODE_TLS_HANDSHAKE:
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update) {
+    int mode = 0;
+
+    if (client->handshake) {
         if (gnutls_record_get_direction (client->tlssession) == 0)
-            mode = VIR_EVENT_HANDLE_READABLE;
+            mode |= VIR_EVENT_HANDLE_READABLE;
         else
-            mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
-
-    case QEMUD_MODE_RX_HEADER:
-    case QEMUD_MODE_RX_PAYLOAD:
-        mode = VIR_EVENT_HANDLE_READABLE;
-        break;
-
-    case QEMUD_MODE_TX_PACKET:
-        mode = VIR_EVENT_HANDLE_WRITABLE;
-        break;
-
-    case QEMUD_MODE_WAIT_DISPATCH:
-        mode = 0;
-        break;
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
+    } else {
+        /* If there is a message on the rx queue then
+         * we're wanting more input */
+        if (client->rx)
+            mode |= VIR_EVENT_HANDLE_READABLE;
 
-    default:
-        return -1;
+        /* If there are one or more messages to send back to client,
+           then monitor for writability on socket */
+        if (client->tx)
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
     }
 
     if (update) {
@@ -1760,6 +1911,29 @@ static void qemudInactiveTimer(int timer ATTRIBUTE_UNUSED, void *data) {
     }
 }
 
+static void qemudFreeClient(struct qemud_client *client) {
+    while (client->rx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->rx);
+        VIR_FREE(msg);
+    }
+    while (client->dx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->dx);
+        VIR_FREE(msg);
+    }
+    while (client->tx) {
+        struct qemud_client_message *msg
+            = qemudClientMessageQueueServe(&client->tx);
+        VIR_FREE(msg);
+    }
+
+    if (client->conn)
+        virConnectClose(client->conn);
+    virMutexDestroy(&client->lock);
+    VIR_FREE(client);
+}
+
 static int qemudRunLoop(struct qemud_server *server) {
     int timerid = -1;
     int ret = -1, i;
@@ -1796,8 +1970,11 @@ static int qemudRunLoop(struct qemud_server *server) {
         }
 
         virMutexUnlock(&server->lock);
-        if (qemudOneLoop() < 0)
+        if (qemudOneLoop() < 0) {
+            virMutexLock(&server->lock);
+            DEBUG0("Loop iteration error, exiting\n");
             break;
+        }
         virMutexLock(&server->lock);
 
     reprocess:
@@ -1808,17 +1985,18 @@ static int qemudRunLoop(struct qemud_server *server) {
                 && server->clients[i]->refs == 0;
             virMutexUnlock(&server->clients[i]->lock);
             if (inactive) {
-                if (server->clients[i]->conn)
-                    virConnectClose(server->clients[i]->conn);
-                virMutexDestroy(&server->clients[i]->lock);
-                VIR_FREE(server->clients[i]);
+                qemudFreeClient(server->clients[i]);
                 server->nclients--;
-                if (i < server->nclients) {
+                if (i < server->nclients)
                     memmove(server->clients + i,
                             server->clients + i + 1,
-                            server->nclients - i);
-                    goto reprocess;
+                            sizeof (*server->clients) * (server->nclients - i));
+
+                if (VIR_REALLOC_N(server->clients,
+                                  server->nclients) < 0) {
+                    ; /* ignore */
                 }
+                goto reprocess;
             }
         }
 
@@ -1843,6 +2021,7 @@ static int qemudRunLoop(struct qemud_server *server) {
         pthread_join(thread, NULL);
         virMutexLock(&server->lock);
     }
+    VIR_FREE(server->workers);
 
     free(server->workers);
     virMutexUnlock(&server->lock);
@@ -2223,6 +2402,9 @@ remoteReadConfigFile (struct qemud_server *server, const char *filename)
     GET_CONF_INT (conf, filename, max_workers);
     GET_CONF_INT (conf, filename, max_clients);
 
+    GET_CONF_INT (conf, filename, max_requests);
+    GET_CONF_INT (conf, filename, max_client_requests);
+
     virConfFree (conf);
     return 0;
 
index 12897a12bfc2a616ffa01cc731b4c0857d61988f..9a2ff80f23189c550fdca18c71f8090ba9fc5db7 100644 (file)
 
 #define qemudDebug DEBUG
 
-enum qemud_mode {
-    QEMUD_MODE_RX_HEADER,       /* Receiving the fixed length RPC header data */
-    QEMUD_MODE_RX_PAYLOAD,      /* Receiving the variable length RPC payload data */
-    QEMUD_MODE_WAIT_DISPATCH,   /* Message received, waiting for worker to process */
-    QEMUD_MODE_IN_DISPATCH,     /* RPC call being processed */
-    QEMUD_MODE_TX_PACKET,       /* Transmitting reply to RPC call */
-    QEMUD_MODE_TLS_HANDSHAKE,   /* Performing TLS handshake */
-};
-
 /* Whether we're passing reads & writes through a sasl SSF */
 enum qemud_sasl_ssf {
     QEMUD_SASL_SSF_NONE = 0,
@@ -87,6 +78,16 @@ enum qemud_sock_type {
     QEMUD_SOCK_TYPE_TLS = 2,
 };
 
+struct qemud_client_message {
+    char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    int async : 1;
+
+    struct qemud_client_message *next;
+};
+
 /* Stores the per-client connection state */
 struct qemud_client {
     virMutex lock;
@@ -97,7 +98,6 @@ struct qemud_client {
     int watch;
     int readonly:1;
     int closing:1;
-    enum qemud_mode mode;
 
     struct sockaddr_storage addr;
     socklen_t addrlen;
@@ -105,6 +105,7 @@ struct qemud_client {
     int type; /* qemud_sock_type */
     gnutls_session_t tlssession;
     int auth;
+    int handshake : 1; /* If we're in progress for TLS handshake */
 #if HAVE_SASL
     sasl_conn_t *saslconn;
     int saslSSF;
@@ -117,12 +118,20 @@ struct qemud_client {
     char *saslUsername;
 #endif
 
-    unsigned int incomingSerial;
-    unsigned int outgoingSerial;
-
-    char buffer [REMOTE_MESSAGE_MAX];
-    unsigned int bufferLength;
-    unsigned int bufferOffset;
+    /* Count of meages in 'dx' or 'tx' queue
+     * ie RPC calls in progress. Does not count
+     * async events which are not used for
+     * throttling calculations */
+    int nrequests;
+    /* Zero or one messages being received. Zero if
+     * nrequests >= max_clients and throttling */
+    struct qemud_client_message *rx;
+    /* Zero or many messages waiting for a worker
+     * to process them */
+    struct qemud_client_message *dx;
+    /* Zero or many messages waiting for transmit
+     * back to client, including async events */
+    struct qemud_client_message *tx;
 
     /* This is only valid if a remote open call has been made on this
      * connection, otherwise it will be NULL.  Also if remote close is
@@ -181,16 +190,20 @@ void qemudLog(int priority, const char *fmt, ...)
 int qemudSetCloseExec(int fd);
 int qemudSetNonBlock(int fd);
 
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client);
+                             struct qemud_client *client,
+                             struct qemud_client_message *req);
 
-void qemudDispatchClientWrite(struct qemud_server *server,
-                             struct qemud_client *client);
+int qemudRegisterClientEvent(struct qemud_server *server,
+                             struct qemud_client *client,
+                             int update);
 
-#if HAVE_POLKIT
-int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
-#endif
+void qemudDispatchClientFailure(struct qemud_client *client);
+
+void
+qemudClientMessageQueuePush(struct qemud_client_message **queue,
+                            struct qemud_client_message *msg);
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
                             virDomainPtr dom,
@@ -198,4 +211,9 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
                             int detail,
                             void *opaque);
 
+
+#if HAVE_POLKIT
+int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);
+#endif
+
 #endif
index 25a6f4bd167ae21d3004241d12700801124ea8c2..e41e2ee840d3d6c47f58c2fa455efad5487049e6 100644 (file)
@@ -111,6 +111,7 @@ static const dispatch_data const dispatch_table[] = {
 /* Prototypes */
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail);
@@ -219,9 +220,10 @@ remoteDispatchConnError (remote_error *rerr,
  * Server object is unlocked
  * Client object is locked
  */
-unsigned int
+int
 remoteDispatchClientRequest (struct qemud_server *server,
-                             struct qemud_client *client)
+                             struct qemud_client *client,
+                             struct qemud_client_message *msg)
 {
     XDR xdr;
     remote_message_header req, rep;
@@ -229,7 +231,8 @@ remoteDispatchClientRequest (struct qemud_server *server,
     dispatch_args args;
     dispatch_ret ret;
     const dispatch_data *data = NULL;
-    int rv = -1, len;
+    int rv = -1;
+    unsigned int len;
     virConnectPtr conn = NULL;
 
     memset(&args, 0, sizeof args);
@@ -237,7 +240,10 @@ remoteDispatchClientRequest (struct qemud_server *server,
     memset(&rerr, 0, sizeof rerr);
 
     /* Parse the header. */
-    xdrmem_create (&xdr, client->buffer, client->bufferLength, XDR_DECODE);
+    xdrmem_create (&xdr,
+                   msg->buffer + REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   msg->bufferLength - REMOTE_MESSAGE_HEADER_XDR_LEN,
+                   XDR_DECODE);
 
     if (!xdr_remote_message_header (&xdr, &req))
         goto fatal_error;
@@ -333,10 +339,10 @@ rpc_error:
     rep.status = rv < 0 ? REMOTE_ERROR : REMOTE_OK;
 
     /* Serialise the return header. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
-    if (!xdr_int (&xdr, &len)) {
+    if (!xdr_u_int (&xdr, &len)) {
         if (rv == 0) xdr_free (data->ret_filter, (char*)&ret);
         goto fatal_error;
     }
@@ -364,17 +370,21 @@ rpc_error:
     if (xdr_setpos (&xdr, 0) == 0)
         goto fatal_error;
 
-    if (!xdr_int (&xdr, &len))
+    if (!xdr_u_int (&xdr, &len))
         goto fatal_error;
 
     xdr_destroy (&xdr);
-    return len;
+
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+
+    return 0;
 
 fatal_error:
     /* Seriously bad stuff happened, so we'll kill off this client
        and not send back any RPC error */
     xdr_destroy (&xdr);
-    return 0;
+    return -1;
 }
 
 int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
@@ -386,9 +396,20 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED,
     struct qemud_client *client = opaque;
     REMOTE_DEBUG("Relaying domain event %d %d", event, detail);
 
-    if(client) {
-        remoteDispatchDomainEventSend (client, dom, event, detail);
-        qemudDispatchClientWrite(client->server,client);
+    if (client) {
+        struct qemud_client_message *ev;
+
+        if (VIR_ALLOC(ev) < 0)
+            return -1;
+
+        virMutexLock(&client->lock);
+
+        remoteDispatchDomainEventSend (client, ev, dom, event, detail);
+
+        if (qemudRegisterClientEvent(client->server, client, 1) < 0)
+            qemudDispatchClientFailure(client);
+
+        virMutexUnlock(&client->lock);
     }
     return 0;
 }
@@ -4202,13 +4223,14 @@ remoteDispatchDomainEventsDeregister (struct qemud_server *server ATTRIBUTE_UNUS
 
 static void
 remoteDispatchDomainEventSend (struct qemud_client *client,
+                               struct qemud_client_message *msg,
                                virDomainPtr dom,
                                int event,
                                int detail)
 {
     remote_message_header rep;
     XDR xdr;
-    int len;
+    unsigned int len;
     remote_domain_event_ret data;
 
     if (!client)
@@ -4222,11 +4244,11 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
     rep.status = REMOTE_OK;
 
     /* Serialise the return header and event. */
-    xdrmem_create (&xdr, client->buffer, sizeof client->buffer, XDR_ENCODE);
+    xdrmem_create (&xdr, msg->buffer, sizeof msg->buffer, XDR_ENCODE);
 
     len = 0; /* We'll come back and write this later. */
-    if (!xdr_int (&xdr, &len)) {
-        /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (1)"));*/
+    if (!xdr_u_int (&xdr, &len)) {
+        /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (1)"));*/
         xdr_destroy (&xdr);
         return;
     }
@@ -4254,8 +4276,8 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
         return;
     }
 
-    if (!xdr_int (&xdr, &len)) {
-        /*remoteDispatchError (client, NULL, "%s", _("xdr_int failed (2)"));*/
+    if (!xdr_u_int (&xdr, &len)) {
+        /*remoteDispatchError (client, NULL, "%s", _("xdr_u_int failed (2)"));*/
         xdr_destroy (&xdr);
         return;
     }
@@ -4263,9 +4285,10 @@ remoteDispatchDomainEventSend (struct qemud_client *client,
     xdr_destroy (&xdr);
 
     /* Send it. */
-    client->mode = QEMUD_MODE_TX_PACKET;
-    client->bufferLength = len;
-    client->bufferOffset = 0;
+    msg->async = 1;
+    msg->bufferLength = len;
+    msg->bufferOffset = 0;
+    qemudClientMessageQueuePush(&client->tx, msg);
 }
 
 /*----- Helpers. -----*/
index f3c00b7ff587e2d61a8b59166727453a7d7feec9..e2ea363ef6256225f3b302d1d91885227b5b551b 100644 (file)
@@ -246,6 +246,19 @@ max_clients = 20
 # of clients allowed
 min_workers = 5
 max_workers = 20
+
+# Total global limit on concurrent RPC calls. Should be
+# at least as large as max_workers. Beyond this, RPC requests
+# will be read into memory and queued. This directly impact
+# memory usage, currently each request requires 256 KB of
+# memory. So by default upto 5 MB of memory is used
+max_requests = 20
+
+# Limit on concurrent requests from a single client
+# connection. To avoid one client monopolizing the server
+# this should be a small fraction of the global max_requests
+# and max_workers parameter
+max_client_requests = 5
 "
 
    test Libvirtd.lns get conf =
@@ -499,3 +512,16 @@ max_workers = 20
         { "#comment" = "of clients allowed"}
         { "min_workers" = "5" }
         { "max_workers" = "20" }
+       { "#empty" }
+        { "#comment" = "Total global limit on concurrent RPC calls. Should be" }
+        { "#comment" = "at least as large as max_workers. Beyond this, RPC requests" }
+        { "#comment" = "will be read into memory and queued. This directly impact" }
+        { "#comment" = "memory usage, currently each request requires 256 KB of" }
+        { "#comment" = "memory. So by default upto 5 MB of memory is used" }
+        { "max_requests" = "20" }
+       { "#empty" }
+        { "#comment" = "Limit on concurrent requests from a single client" }
+        { "#comment" = "connection. To avoid one client monopolizing the server" }
+        { "#comment" = "this should be a small fraction of the global max_requests" }
+        { "#comment" = "and max_workers parameter" }
+        { "max_client_requests" = "5" }
index 449bafa7e5b098e085ab68d23d917c1c5d6014ba..8c7dd7672b45e505e6d23fcafbf6b38234ec3988 100644 (file)
@@ -5663,13 +5663,13 @@ prepareCall(virConnectPtr conn,
     /* Length must include the length word itself (always encoded in
      * 4 bytes as per RFC 4506).
      */
-    rv->bufferLength += 4;
+    rv->bufferLength += REMOTE_MESSAGE_HEADER_XDR_LEN;
 
     /* Encode the length word. */
-    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
-    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+    xdrmem_create (&xdr, rv->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE);
+    if (!xdr_u_int (&xdr, &rv->bufferLength)) {
         error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("xdr_int (length word)"));
+               _("xdr_u_int (length word)"));
         goto error;
     }
     xdr_destroy (&xdr);
@@ -5965,20 +5965,26 @@ static int
 processCallRecvLen(virConnectPtr conn, struct private_data *priv,
                    int in_open) {
     XDR xdr;
-    int len;
+    unsigned int len;
 
     xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
+    if (!xdr_u_int (&xdr, &len)) {
         error (in_open ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+               VIR_ERR_RPC, _("xdr_u_int (length word, reply)"));
         return -1;
     }
     xdr_destroy (&xdr);
 
+    if (len < REMOTE_MESSAGE_HEADER_XDR_LEN) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("packet received from server too small"));
+        return -1;
+    }
+
     /* Length includes length word - adjust to real length to read. */
-    len -= 4;
+    len -= REMOTE_MESSAGE_HEADER_XDR_LEN;
 
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+    if (len > REMOTE_MESSAGE_MAX) {
         error (in_open ? NULL : conn,
                VIR_ERR_RPC, _("packet received from server too large"));
         return -1;