pthread_cond_broadcast(&cond->cond);
}
+int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+ int err = pthread_key_create(&threadlocal->key, NULL);
+ return err ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+ pthread_key_delete(threadlocal->key);
+ memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+ return pthread_getspecific(threadlocal->key);
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+ int err = pthread_setspecific(threadlocal->key, value);
+ tor_assert(err == 0);
+}
+
/** Set up common structures for use by threading. */
void
tor_threads_init(void)
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags);
void alert_sockets_close(alert_sockets_t *socks);
+typedef struct tor_threadlocal_s {
+#ifdef _WIN32
+ DWORD index;
+#else
+ pthread_key_t key;
+#endif
+} tor_threadlocal_t;
+
+int tor_threadlocal_init(tor_threadlocal_t *threadlocal);
+void tor_threadlocal_destroy(tor_threadlocal_t *threadlocal);
+void *tor_threadlocal_get(tor_threadlocal_t *threadlocal);
+void tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value);
+
#endif
tor_cond_signal_impl(cond, 1);
}
+int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+ threadlocal->index = TlsAlloc();
+ return (threadlocal->index == TLS_OUT_OF_INDEXES) ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+ TlsFree(threadlocal->index);
+ memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+ return TlsGetValue(threadlocal->index);
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+ BOOL ok = TlsSetValue(threadlocal->index, value);
+ tor_assert(ok);
+}
+
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv)
{
char *msg;
} queued_event_t;
-/** If this is greater than 0, we don't allow new events to be queued.
- * XXXX This should be thread-local. */
-static int block_event_queue = 0;
+/** Pointer to int. If this is greater than 0, we don't allow new events to be
+ * queued. */
+static tor_threadlocal_t block_event_queue;
/** Holds a smartlist of queued_event_t objects that may need to be sent
* to one or more controllers */
if (queued_control_events_lock == NULL) {
queued_control_events_lock = tor_mutex_new();
+ tor_threadlocal_init(&block_event_queue);
}
}
+static int *
+get_block_event_queue(void)
+{
+ int *val = tor_threadlocal_get(&block_event_queue);
+ if (PREDICT_UNLIKELY(val == NULL)) {
+ val = tor_malloc_zero(sizeof(int));
+ tor_threadlocal_set(&block_event_queue, val);
+ }
+ return val;
+}
+
/** Helper: inserts an event on the list of events queued to be sent to
* one or more controllers, and schedules the events to be flushed if needed.
*
return;
}
- queued_event_t *ev = tor_malloc(sizeof(*ev));
- ev->event = event;
- ev->msg = msg;
-
- tor_mutex_acquire(queued_control_events_lock);
- if (block_event_queue) { /* XXXX This should be thread-specific. */
- tor_mutex_release(queued_control_events_lock);
+ int *block_event_queue = get_block_event_queue();
+ if (*block_event_queue) {
tor_free(msg);
- tor_free(ev);
return;
}
+ queued_event_t *ev = tor_malloc(sizeof(*ev));
+ ev->event = event;
+ ev->msg = msg;
+
/* No queueing an event while queueing an event */
- ++block_event_queue;
+ ++*block_event_queue;
+ tor_mutex_acquire(queued_control_events_lock);
tor_assert(queued_control_events);
smartlist_add(queued_control_events, ev);
flush_queued_event_pending = 1;
}
- --block_event_queue;
-
tor_mutex_release(queued_control_events_lock);
+ --*block_event_queue;
+
/* We just put an event on the queue; mark the queue to be
* flushed. We only do this from the main thread for now; otherwise,
* we'd need to incur locking overhead in Libevent or use a socket.
smartlist_t *controllers = smartlist_new();
smartlist_t *queued_events;
+ int *block_event_queue = get_block_event_queue();
+ ++*block_event_queue;
+
tor_mutex_acquire(queued_control_events_lock);
/* No queueing an event while flushing events. */
- ++block_event_queue;
flush_queued_event_pending = 0;
queued_events = queued_control_events;
queued_control_events = smartlist_new();
smartlist_free(queued_events);
smartlist_free(controllers);
- tor_mutex_acquire(queued_control_events_lock);
- --block_event_queue;
- tor_mutex_release(queued_control_events_lock);
+ --*block_event_queue;
}
/** Libevent callback: Flushes pending events to controllers that are
static void thread_test_func_(void* _s) ATTR_NORETURN;
/** How many iterations have the threads in the unit test run? */
-static int t1_count = 0, t2_count = 0;
+static tor_threadlocal_t count;
/** Helper function for threading unit tests: This function runs in a
* subthread. It grabs its own mutex (start1 or start2) to make sure that it
thread_test_func_(void* _s)
{
char *s = _s;
- int i, *count;
+ int i;
tor_mutex_t *m;
char buf[64];
char **cp;
+ int *mycount = tor_malloc_zero(sizeof(int));
+ tor_threadlocal_set(&count, mycount);
if (!strcmp(s, "thread 1")) {
m = thread_test_start1_;
cp = &thread1_name_;
- count = &t1_count;
thread_fn_tid1 = tor_get_thread_id();
} else {
m = thread_test_start2_;
cp = &thread2_name_;
- count = &t2_count;
thread_fn_tid2 = tor_get_thread_id();
}
for (i=0; i<10000; ++i) {
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, "last to run", *cp);
- ++*count;
tor_mutex_release(thread_test_mutex_);
+ int *tls_count = tor_threadlocal_get(&count);
+ tor_assert(tls_count == mycount);
+ ++*tls_count;
}
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, s, *cp);
tv.tv_usec=100*1000;
#endif
(void) arg;
+ tt_int_op(tor_threadlocal_init(&count), OP_EQ, 0);
set_main_thread();
tor_mutex_free(thread_test_mutex_);
if (timedout) {
- printf("\nTimed out: %d %d", t1_count, t2_count);
tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
tt_assert(!timedout);