AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support])
found_sql_drivers="$found_sql_drivers cassandra"
+ AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [
+ AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy])
+ ])
], [
if test $want_cassandra = yes; then
AC_ERROR([Can't build with Cassandra support: cassandra.h not found])
unsigned int connect_timeout_msecs, request_timeout_msecs;
unsigned int warn_timeout_msecs;
unsigned int heartbeat_interval_secs, idle_timeout_secs;
+ unsigned int execution_retry_interval_msecs, execution_retry_times;
in_port_t port;
CassCluster *cluster;
} else if (strcmp(key, "metrics") == 0) {
i_free(db->metrics_path);
db->metrics_path = i_strdup(value);
+ } else if (strcmp(key, "execution_retry_interval") == 0) {
+ if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
+ i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+ i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
+#endif
+ } else if (strcmp(key, "execution_retry_times") == 0) {
+ if (str_to_uint(value, &db->execution_retry_times) < 0)
+ i_fatal("cassandra: Invalid execution_retry_times %s", value);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+ i_fatal("cassandra: This cassandra version does not support execution_retry_times");
+#endif
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
if (db->idle_timeout_secs != 0)
cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+ if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
+ cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
+#endif
db->session = cass_session_new();
if (db->metrics_path != NULL)
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+ cass_statement_set_is_idempotent(result->statement, cass_true);
+#endif
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}