From: Aki Tuomi Date: Tue, 31 Jan 2017 17:43:30 +0000 (+0200) Subject: driver-cassandra: Add support for speculative execution X-Git-Tag: 2.3.0.rc1~2128 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=07038d3a12a915e98f794566f56a0ed12e0653eb;p=thirdparty%2Fdovecot%2Fcore.git driver-cassandra: Add support for speculative execution --- diff --git a/m4/want_cassandra.m4 b/m4/want_cassandra.m4 index 22159e9fc7..0648d5228e 100644 --- a/m4/want_cassandra.m4 +++ b/m4/want_cassandra.m4 @@ -6,6 +6,9 @@ AC_DEFUN([DOVECOT_WANT_CASSANDRA], [ AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support]) found_sql_drivers="$found_sql_drivers cassandra" + AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [ + AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy]) + ]) ], [ if test $want_cassandra = yes; then AC_ERROR([Can't build with Cassandra support: cassandra.h not found]) diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index a30eb0e50c..0f80113d97 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -85,6 +85,7 @@ struct cassandra_db { unsigned int connect_timeout_msecs, request_timeout_msecs; unsigned int warn_timeout_msecs; unsigned int heartbeat_interval_secs, idle_timeout_secs; + unsigned int execution_retry_interval_msecs, execution_retry_times; in_port_t port; CassCluster *cluster; @@ -494,6 +495,18 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db, } else if (strcmp(key, "metrics") == 0) { i_free(db->metrics_path); db->metrics_path = i_strdup(value); + } else if (strcmp(key, "execution_retry_interval") == 0) { + if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0) + i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error); +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + i_fatal("cassandra: This cassandra version does not support execution_retry_interval"); +#endif + } else if (strcmp(key, "execution_retry_times") == 0) { + if (str_to_uint(value, &db->execution_retry_times) < 0) + i_fatal("cassandra: Invalid execution_retry_times %s", value); +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + i_fatal("cassandra: This cassandra version does not support execution_retry_times"); +#endif } else { i_fatal("cassandra: Unknown connect string: %s", key); } @@ -623,6 +636,10 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string) cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs); if (db->idle_timeout_secs != 0) cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs); +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0) + cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times); +#endif db->session = cass_session_new(); if (db->metrics_path != NULL) db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db); @@ -853,6 +870,9 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result) result->statement = cass_statement_new(result->query, 0); cass_statement_set_consistency(result->statement, result->consistency); +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + cass_statement_set_is_idempotent(result->statement, cass_true); +#endif future = cass_session_execute(db->session, result->statement); driver_cassandra_set_callback(future, db, query_callback, result); }