]> git.ipfire.org Git - thirdparty/libvirt.git/commitdiff
Dynamically adjust worker threads in daemon
authorDaniel P. Berrange <berrange@redhat.com>
Tue, 20 Jan 2009 19:27:11 +0000 (19:27 +0000)
committerDaniel P. Berrange <berrange@redhat.com>
Tue, 20 Jan 2009 19:27:11 +0000 (19:27 +0000)
ChangeLog
qemud/qemud.c
qemud/qemud.h

index 0855957e6b67edaffaa1f0632b8c8e0c8bb5f45a..9f7ae1f0b20cb8f2734b0d4e6510562f04ed5754 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,8 @@
+Tue Jan 20 19:26:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
+
+       * qemud/qemud.c, qemud/qemud.h: Dynamic spawn/cleanup threads
+       for processing RPC calls as number of clients changes
+
 Tue Jan 20 19:24:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
 
        * qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Allow the
index 21cecf23970f603c17f7dbb99039e5f72430da88..d60cb3526cf5e2b8666803d39e7082f1d825e87b 100644 (file)
@@ -167,7 +167,7 @@ static void sig_handler(int sig, siginfo_t * siginfo,
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-
+static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -1247,6 +1247,20 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 
     server->clients[server->nclients++] = client;
 
+    if (server->nclients > server->nactiveworkers &&
+        server->nactiveworkers < server->nworkers) {
+        int i;
+        for (i = 0 ; i < server->nworkers ; i++) {
+            if (!server->workers[i].hasThread) {
+                if (qemudStartWorker(server, &server->workers[i]) < 0)
+                    return -1;
+                server->nactiveworkers++;
+                break;
+            }
+        }
+    }
+
+
     return 0;
 
  cleanup:
@@ -1302,19 +1316,28 @@ static struct qemud_client *qemudPendingJob(struct qemud_server *server)
 
 static void *qemudWorker(void *data)
 {
-    struct qemud_server *server = data;
+    struct qemud_worker *worker = data;
+    struct qemud_server *server = worker->server;
 
     while (1) {
         struct qemud_client *client = NULL;
         struct qemud_client_message *reply;
 
         virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
+        while (((client = qemudPendingJob(server)) == NULL) &&
+               !worker->quitRequest) {
             if (virCondWait(&server->job, &server->lock) < 0) {
                 virMutexUnlock(&server->lock);
                 return NULL;
             }
         }
+        if (worker->quitRequest) {
+            if (client)
+                virMutexUnlock(&client->lock);
+            virMutexUnlock(&server->lock);
+            return NULL;
+        }
+        worker->processingCall = 1;
         virMutexUnlock(&server->lock);
 
         /* We own a locked client now... */
@@ -1341,7 +1364,38 @@ static void *qemudWorker(void *data)
 
         client->refs--;
         virMutexUnlock(&client->lock);
+
+        virMutexLock(&server->lock);
+        worker->processingCall = 0;
+        virMutexUnlock(&server->lock);
+    }
+}
+
+static int qemudStartWorker(struct qemud_server *server,
+                            struct qemud_worker *worker) {
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    /* We want to join workers, so don't detach them */
+    /*pthread_attr_setdetachstate(&attr, 1);*/
+
+    if (worker->hasThread)
+        return -1;
+
+    worker->server = server;
+    worker->hasThread = 1;
+    worker->quitRequest = 0;
+    worker->processingCall = 0;
+
+    if (pthread_create(&worker->thread,
+                       &attr,
+                       qemudWorker,
+                       worker) != 0) {
+        worker->hasThread = 0;
+        worker->server = NULL;
+        return -1;
     }
+
+    return 0;
 }
 
 
@@ -1940,21 +1994,19 @@ static int qemudRunLoop(struct qemud_server *server) {
 
     virMutexLock(&server->lock);
 
-    server->nworkers = min_workers;
+    if (min_workers > max_workers)
+        max_workers = min_workers;
+
+    server->nworkers = max_workers;
     if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
         VIR_ERROR0(_("Failed to allocate workers"));
         return -1;
     }
 
-    for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_attr_t attr;
-        pthread_attr_init(&attr);
-        pthread_attr_setdetachstate(&attr, 1);
-
-        pthread_create(&server->workers[i],
-                       &attr,
-                       qemudWorker,
-                       server);
+    for (i = 0 ; i < min_workers ; i++) {
+        if (qemudStartWorker(server, &server->workers[i]) < 0)
+            goto cleanup;
+        server->nactiveworkers++;
     }
 
     for (;;) {
@@ -2000,6 +2052,26 @@ static int qemudRunLoop(struct qemud_server *server) {
             }
         }
 
+        /* If number of active workers exceeds both the min_workers
+         * threshold and the number of clients, then kill some
+         * off */
+        for (i = 0 ; (i < server->nworkers &&
+                      server->nactiveworkers > server->nclients &&
+                      server->nactiveworkers > min_workers) ; i++) {
+
+            if (server->workers[i].hasThread &&
+                !server->workers[i].processingCall) {
+                server->workers[i].quitRequest = 1;
+
+                virCondBroadcast(&server->job);
+                virMutexUnlock(&server->lock);
+                pthread_join(server->workers[i].thread, NULL);
+                virMutexLock(&server->lock);
+                server->workers[i].hasThread = 0;
+                server->nactiveworkers--;
+            }
+        }
+
         /* Unregister any timeout that's active, since we
          * just had an event processed
          */
@@ -2015,11 +2087,18 @@ static int qemudRunLoop(struct qemud_server *server) {
         }
     }
 
+cleanup:
     for (i = 0 ; i < server->nworkers ; i++) {
-        pthread_t thread = server->workers[i];
+        if (!server->workers[i].hasThread)
+            continue;
+
+        server->workers[i].quitRequest = 1;
+        virCondBroadcast(&server->job);
+
         virMutexUnlock(&server->lock);
-        pthread_join(thread, NULL);
+        pthread_join(server->workers[i].thread, NULL);
         virMutexLock(&server->lock);
+        server->workers[i].hasThread = 0;
     }
     VIR_FREE(server->workers);
 
index 9a2ff80f23189c550fdca18c71f8090ba9fc5db7..7938f89ed8213797d712be2988621b32c536531a 100644 (file)
@@ -157,13 +157,24 @@ struct qemud_socket {
     struct qemud_socket *next;
 };
 
+struct qemud_worker {
+    pthread_t thread;
+    int hasThread :1;
+    int processingCall :1;
+    int quitRequest : 1;
+
+    /* back-pointer to our server */
+    struct qemud_server *server;
+};
+
 /* Main server state */
 struct qemud_server {
     virMutex lock;
     virCond job;
 
     int nworkers;
-    pthread_t *workers;
+    int nactiveworkers;
+    struct qemud_worker *workers;
     int nsockets;
     struct qemud_socket *sockets;
     int nclients;