]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
driver-cassandra: Add support for speculative execution
authorAki Tuomi <aki.tuomi@dovecot.fi>
Tue, 31 Jan 2017 17:43:30 +0000 (19:43 +0200)
committerGitLab <gitlab@git.dovecot.net>
Mon, 13 Feb 2017 18:20:42 +0000 (20:20 +0200)
m4/want_cassandra.m4
src/lib-sql/driver-cassandra.c

index 22159e9fc790c92932d375a03d4185c1eda34bd8..0648d5228e4f132754c4d048e48795e663ec21c0 100644 (file)
@@ -6,6 +6,9 @@ AC_DEFUN([DOVECOT_WANT_CASSANDRA], [
   
                           AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support])
                           found_sql_drivers="$found_sql_drivers cassandra"
+                          AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [
+                                       AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy])
+                          ])
                   ], [
                     if test $want_cassandra = yes; then
                       AC_ERROR([Can't build with Cassandra support: cassandra.h not found])
index a30eb0e50cc51dc7a2b19b18042055fe1d63167c..0f80113d971827e70297b625ca9047fecb089cc9 100644 (file)
@@ -85,6 +85,7 @@ struct cassandra_db {
        unsigned int connect_timeout_msecs, request_timeout_msecs;
        unsigned int warn_timeout_msecs;
        unsigned int heartbeat_interval_secs, idle_timeout_secs;
+       unsigned int execution_retry_interval_msecs, execution_retry_times;
        in_port_t port;
 
        CassCluster *cluster;
@@ -494,6 +495,18 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
                } else if (strcmp(key, "metrics") == 0) {
                        i_free(db->metrics_path);
                        db->metrics_path = i_strdup(value);
+               } else if (strcmp(key, "execution_retry_interval") == 0) {
+                       if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
+                               i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+                       i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
+#endif
+               } else if (strcmp(key, "execution_retry_times") == 0) {
+                       if (str_to_uint(value, &db->execution_retry_times) < 0)
+                               i_fatal("cassandra: Invalid execution_retry_times %s", value);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+                       i_fatal("cassandra: This cassandra version does not support execution_retry_times");
+#endif
                } else {
                        i_fatal("cassandra: Unknown connect string: %s", key);
                }
@@ -623,6 +636,10 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string)
                cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
        if (db->idle_timeout_secs != 0)
                cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+       if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
+               cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
+#endif
        db->session = cass_session_new();
        if (db->metrics_path != NULL)
                db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
@@ -853,6 +870,9 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result)
        result->statement = cass_statement_new(result->query, 0);
        cass_statement_set_consistency(result->statement, result->consistency);
 
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+       cass_statement_set_is_idempotent(result->statement, cass_true);
+#endif
        future = cass_session_execute(db->session, result->statement);
        driver_cassandra_set_callback(future, db, query_callback, result);
 }