static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-
+static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
server->clients[server->nclients++] = client;
+ if (server->nclients > server->nactiveworkers &&
+ server->nactiveworkers < server->nworkers) {
+ int i;
+ for (i = 0 ; i < server->nworkers ; i++) {
+ if (!server->workers[i].hasThread) {
+ if (qemudStartWorker(server, &server->workers[i]) < 0)
+ return -1;
+ server->nactiveworkers++;
+ break;
+ }
+ }
+ }
+
+
return 0;
cleanup:
static void *qemudWorker(void *data)
{
- struct qemud_server *server = data;
+ struct qemud_worker *worker = data;
+ struct qemud_server *server = worker->server;
while (1) {
struct qemud_client *client = NULL;
struct qemud_client_message *reply;
virMutexLock(&server->lock);
- while ((client = qemudPendingJob(server)) == NULL) {
+ while (((client = qemudPendingJob(server)) == NULL) &&
+ !worker->quitRequest) {
if (virCondWait(&server->job, &server->lock) < 0) {
virMutexUnlock(&server->lock);
return NULL;
}
}
+ if (worker->quitRequest) {
+ if (client)
+ virMutexUnlock(&client->lock);
+ virMutexUnlock(&server->lock);
+ return NULL;
+ }
+ worker->processingCall = 1;
virMutexUnlock(&server->lock);
/* We own a locked client now... */
client->refs--;
virMutexUnlock(&client->lock);
+
+ virMutexLock(&server->lock);
+ worker->processingCall = 0;
+ virMutexUnlock(&server->lock);
+ }
+}
+
+static int qemudStartWorker(struct qemud_server *server,
+ struct qemud_worker *worker) {
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ /* We want to join workers, so don't detach them */
+ /*pthread_attr_setdetachstate(&attr, 1);*/
+
+ if (worker->hasThread)
+ return -1;
+
+ worker->server = server;
+ worker->hasThread = 1;
+ worker->quitRequest = 0;
+ worker->processingCall = 0;
+
+ if (pthread_create(&worker->thread,
+ &attr,
+ qemudWorker,
+ worker) != 0) {
+ worker->hasThread = 0;
+ worker->server = NULL;
+ return -1;
}
+
+ return 0;
}
virMutexLock(&server->lock);
- server->nworkers = min_workers;
+ if (min_workers > max_workers)
+ max_workers = min_workers;
+
+ server->nworkers = max_workers;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
VIR_ERROR0(_("Failed to allocate workers"));
return -1;
}
- for (i = 0 ; i < server->nworkers ; i++) {
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, 1);
-
- pthread_create(&server->workers[i],
- &attr,
- qemudWorker,
- server);
+ for (i = 0 ; i < min_workers ; i++) {
+ if (qemudStartWorker(server, &server->workers[i]) < 0)
+ goto cleanup;
+ server->nactiveworkers++;
}
for (;;) {
}
}
+ /* If number of active workers exceeds both the min_workers
+ * threshold and the number of clients, then kill some
+ * off */
+ for (i = 0 ; (i < server->nworkers &&
+ server->nactiveworkers > server->nclients &&
+ server->nactiveworkers > min_workers) ; i++) {
+
+ if (server->workers[i].hasThread &&
+ !server->workers[i].processingCall) {
+ server->workers[i].quitRequest = 1;
+
+ virCondBroadcast(&server->job);
+ virMutexUnlock(&server->lock);
+ pthread_join(server->workers[i].thread, NULL);
+ virMutexLock(&server->lock);
+ server->workers[i].hasThread = 0;
+ server->nactiveworkers--;
+ }
+ }
+
/* Unregister any timeout that's active, since we
* just had an event processed
*/
}
}
+cleanup:
for (i = 0 ; i < server->nworkers ; i++) {
- pthread_t thread = server->workers[i];
+ if (!server->workers[i].hasThread)
+ continue;
+
+ server->workers[i].quitRequest = 1;
+ virCondBroadcast(&server->job);
+
virMutexUnlock(&server->lock);
- pthread_join(thread, NULL);
+ pthread_join(server->workers[i].thread, NULL);
virMutexLock(&server->lock);
+ server->workers[i].hasThread = 0;
}
VIR_FREE(server->workers);