/** Internal insertion to the kv storage from backend */
gboolean
-rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
+rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt)
{
gint steps = 0;
- struct rspamd_kv_element *elt = *pelt;
+ struct rspamd_kv_element *elt;
+ g_static_rw_lock_writer_lock (&storage->rwlock);
/* Hard limit */
if (storage->max_memory > 0) {
if (len > storage->max_memory) {
msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
len, storage->max_memory);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Now check limits */
while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
}
/* Insert elt to the cache */
- g_static_rw_lock_writer_lock (&storage->rwlock);
+
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
if (elt == NULL) {
/* Copy data */
elt->flags = flags;
elt->expire = expire;
- *pelt = elt;
+
+ if (pelt != NULL) {
+ *pelt = elt;
+ }
/* Insert to the expire */
if (storage->expire) {
glong longval;
/* Hard limit */
+ g_static_rw_lock_writer_lock (&storage->rwlock);
if (storage->max_memory > 0) {
if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) {
msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
len, storage->max_memory);
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
return FALSE;
}
/* Now check limits */
while (storage->memory + len + keylen > storage->max_memory) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
steps = 0;
while (storage->elts > storage->max_elts) {
if (storage->expire) {
- g_static_rw_lock_writer_lock (&storage->rwlock);
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
- g_static_rw_lock_writer_unlock (&storage->rwlock);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
}
if (++steps > MAX_EXPIRE_STEPS) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
return FALSE;
}
}
/* First try to search it in cache */
- g_static_rw_lock_writer_lock (&storage->rwlock);
+
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt) {
if (storage->expire) {
if (belt) {
/* Put this element into cache */
if ((belt->flags & KV_ELT_INTEGER) != 0) {
- rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+ rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
belt->size, belt->flags,
belt->expire, &elt);
+ g_static_rw_lock_writer_lock (&storage->rwlock);
}
if ((belt->flags & KV_ELT_DIRTY) == 0) {
g_free (belt);
/* First try to look at cache */
g_static_rw_lock_reader_lock (&storage->rwlock);
elt = storage->cache->lookup_func (storage->cache, key, keylen);
- g_static_rw_lock_reader_unlock (&storage->rwlock);
-
/* Next look at the backend */
if (elt == NULL && storage->backend) {
- g_static_rw_lock_reader_lock (&storage->rwlock);
belt = storage->backend->lookup_func (storage->backend, key, keylen);
- g_static_rw_lock_reader_unlock (&storage->rwlock);
+
if (belt) {
/* Put this element into cache */
- rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
- belt->size, belt->flags,
- belt->expire, &elt);
if ((belt->flags & KV_ELT_DIRTY) == 0) {
- g_free (belt);
+ belt->flags |= KV_ELT_NEED_INSERT;
+ return belt;
+ }
+ else {
+ elt = belt;
}
}
}
}
}
+ /* RWlock is still locked */
return elt;
}
}
storage->elts --;
storage->memory -= elt->size;
+ if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
}
+
g_static_rw_lock_writer_unlock (&storage->rwlock);
return elt;
es = arr_data;
*es = elt_size;
memcpy (arr_data, (gchar *)data + sizeof (guint), len);
- if (!rspamd_kv_storage_insert_internal (storage, key, keylen, arr_data, len + sizeof (guint),
+ if (!rspamd_kv_storage_insert_cache (storage, key, keylen, arr_data, len + sizeof (guint),
flags, expire, &elt)) {
g_slice_free1 (len + sizeof (guint), arr_data);
return FALSE;
storage->elts --;
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- g_slice_free1 (ELT_SIZE (elt), elt);
+ if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
res = TRUE;
/* Check other elements in this queue */
TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) {
storage->cache->steal_func (storage->cache, elt);
TAILQ_REMOVE (&expire->head, elt, entry);
/* Free memory */
- if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
elt->flags |= KV_ELT_NEED_FREE;
}
else {
storage->cache->steal_func (storage->cache, oldest_elt);
TAILQ_REMOVE (&expire->head, oldest_elt, entry);
/* Free memory */
- if ((oldest_elt->flags & KV_ELT_DIRTY) != 0) {
+ if ((oldest_elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
oldest_elt->flags |= KV_ELT_NEED_FREE;
}
else {
cur = db->ops_queue->head;
while (cur) {
op = cur->data;
- if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) {
+ if (op->op == FILE_OP_DELETE || ((op->elt->flags & KV_ELT_NEED_FREE) != 0 &&
+ (op->elt->flags & KV_ELT_NEED_INSERT) == 0)) {
/* Also clean memory */
g_slice_free1 (ELT_SIZE (op->elt), op->elt);
}
+ else {
+ /* Unset dirty flag */
+ op->elt->flags &= ~KV_ELT_DIRTY;
+ }
g_slice_free1 (sizeof (struct file_op), op);
cur = g_list_next (cur);
}
close (fd);
+ elt->flags &= ~(KV_ELT_DIRTY|KV_ELT_NEED_FREE);
+
return elt;
}
rspamd_file_delete (struct rspamd_kv_backend *backend, gpointer key, guint keylen)
{
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
- struct file_op *op;
- struct rspamd_kv_element *elt;
+ gchar filebuf[PATH_MAX];
struct rspamd_kv_element search_elt;
-
- search_elt.keylen = keylen;
- search_elt.p = key;
+ struct file_op *op;
if (!db->initialized) {
return;
}
+ search_elt.keylen = keylen;
+ search_elt.p = key;
+ /* First search in ops queue */
if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) {
op->op = FILE_OP_DELETE;
return;
}
-
- elt = rspamd_file_lookup (backend, key, keylen);
- if (elt == NULL) {
+ /* Get filename */
+ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) {
return;
}
- op = g_slice_alloc (sizeof (struct file_op));
- op->op = FILE_OP_DELETE;
- op->elt = elt;
- elt->flags |= KV_ELT_DIRTY;
-
- g_queue_push_head (db->ops_queue, op);
- g_hash_table_insert (db->ops_hash, elt, op);
-
- if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
- file_process_queue (backend);
- }
- return;
+ unlink (filebuf);
}
static void
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
}
res = rspamd_dispatcher_write (session->dispather, CRLF,
sizeof (CRLF) - 1, FALSE, TRUE);
}
+ if (!res) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ }
return res;
}
struct kvstorage_session *session = (struct kvstorage_session *) arg;
if (session->elt) {
+
+ if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
+ /* Insert to cache and free element */
+ session->elt->flags &= ~KV_ELT_NEED_INSERT;
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
+ session->elt->keylen, ELT_DATA (session->elt),
+ session->elt->size, session->elt->flags,
+ session->elt->expire, NULL);
+ g_free (session->elt);
+ session->elt = NULL;
+ return TRUE;
+ }
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
session->elt = NULL;
+
}
return TRUE;
sigprocmask (SIG_BLOCK, thr->signals, NULL);
/* Init thread specific events */
thr->ev_base = event_init ();
+
event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr);
event_base_set (thr->ev_base, &thr->bind_ev);
event_add (&thr->bind_ev, NULL);