]> git.ipfire.org Git - people/ms/strongswan.git/commitdiff
Automatically segment cluster using periodically sent status messages
authorMartin Willi <martin@strongswan.org>
Mon, 28 Sep 2009 12:31:39 +0000 (14:31 +0200)
committerMartin Willi <martin@revosec.ch>
Wed, 7 Apr 2010 11:55:14 +0000 (13:55 +0200)
src/charon/plugins/ha_sync/ha_sync_dispatcher.c
src/charon/plugins/ha_sync/ha_sync_dispatcher.h
src/charon/plugins/ha_sync/ha_sync_kernel.c
src/charon/plugins/ha_sync/ha_sync_kernel.h
src/charon/plugins/ha_sync/ha_sync_message.h
src/charon/plugins/ha_sync/ha_sync_plugin.c
src/charon/plugins/ha_sync/ha_sync_segments.c
src/charon/plugins/ha_sync/ha_sync_segments.h
src/charon/plugins/ha_sync/ha_sync_socket.c

index 7a79fc907f289bb4e1b4fb1c2857b5b39378855f..f5d3e288f9a27ca0eeeee3a9a276ab20802b9a08 100644 (file)
@@ -606,6 +606,34 @@ static void process_segment(private_ha_sync_dispatcher_t *this,
        enumerator->destroy(enumerator);
 }
 
+/**
+ * Process messages of type STATUS
+ */
+static void process_status(private_ha_sync_dispatcher_t *this,
+                                                  ha_sync_message_t *message)
+{
+       ha_sync_message_attribute_t attribute;
+       ha_sync_message_value_t value;
+       enumerator_t *enumerator;
+       segment_mask_t mask = 0;
+
+       enumerator = message->create_attribute_enumerator(message);
+       while (enumerator->enumerate(enumerator, &attribute, &value))
+       {
+               switch (attribute)
+               {
+                       case HA_SYNC_SEGMENT:
+                               mask |= SEGMENTS_BIT(value.u16);
+                               break;
+                       default:
+                               break;
+               }
+       }
+       enumerator->destroy(enumerator);
+
+       this->segments->handle_status(this->segments, mask);
+}
+
 /**
  * Dispatcher job function
  */
@@ -637,6 +665,9 @@ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this)
                case HA_SYNC_SEGMENT_TAKE:
                        process_segment(this, message, TRUE);
                        break;
+               case HA_SYNC_STATUS:
+                       process_status(this, message);
+                       break;
                default:
                        DBG1(DBG_CFG, "received unknown HA sync message type %d",
                                 message->get_type(message));
index e9c92b8caf10f37096f004517fb715a40f20bdeb..a34b1f9719025045dde88ac898fb87b541501b6f 100644 (file)
@@ -42,6 +42,7 @@ struct ha_sync_dispatcher_t {
  *
  * @param socket               socket to pull messages from
  * @param segments             segments to control based on received messages
+ * @param manager              distributed management logic for segment control
  * @return                             dispatcher object
  */
 ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,
index caba2b0be3820b9326e89c3d1f8a89011ad4f429..19918202221aa48d9bab319b1fbafc1a994842e6 100644 (file)
@@ -46,7 +46,7 @@ struct private_ha_sync_kernel_t {
        /**
         * Total number of ClusterIP segments
         */
-       u_int segment_count;
+       u_int count;
 
        /**
         * List of virtual addresses, as host_t*
@@ -68,7 +68,7 @@ static bool in_segment(private_ha_sync_kernel_t *this,
                addr = *(u_int32_t*)host->get_address(host).ptr;
                hash = jhash_1word(ntohl(addr), this->initval);
 
-               if ((((u_int64_t)hash * this->segment_count) >> 32) + 1 == segment)
+               if ((((u_int64_t)hash * this->count) >> 32) + 1 == segment)
                {
                        return TRUE;
                }
@@ -128,8 +128,7 @@ static void deactivate(private_ha_sync_kernel_t *this, u_int segment)
 /**
  * Mangle IPtable rules for virtual addresses
  */
-static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
-                                                segment_mask_t active)
+static bool mangle_rules(private_ha_sync_kernel_t *this, bool add)
 {
        enumerator_t *enumerator;
        host_t *host;
@@ -148,13 +147,12 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
                        host->destroy(host);
                        continue;
                }
-               /* iptables insists of a local node specification. We add '1' but drop
-                * it afterwards. */
+               /* iptables insists of a local node specification, enable node 1 */
                snprintf(buf, sizeof(buf),
                                 "/sbin/iptables -%c INPUT -i %s -d %H -j CLUSTERIP --new "
                                 "--hashmode sourceip --clustermac 01:00:5e:00:00:%2x "
                                 "--total-nodes %d --local-node 1",
-                                add ? 'A' : 'D', iface, host, mac++, this->segment_count);
+                                add ? 'A' : 'D', iface, host, mac++, this->count);
                free(iface);
                if (system(buf) != 0)
                {
@@ -165,13 +163,9 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
 
        if (add)
        {
-               deactivate(this, 1);
-               for (i = 0; i < SEGMENTS_MAX; i++)
+               for (i = 2; i <= this->count; i++)
                {
-                       if (active & SEGMENTS_BIT(i))
-                       {
-                               activate(this, i);
-                       }
+                       activate(this, i);
                }
        }
        return TRUE;
@@ -207,7 +201,7 @@ static void parse_virtuals(private_ha_sync_kernel_t *this, char *virtual)
  */
 static void destroy(private_ha_sync_kernel_t *this)
 {
-       mangle_rules(this, FALSE, 0);
+       mangle_rules(this, FALSE);
        this->virtuals->destroy_offset(this->virtuals, offsetof(host_t, destroy));
        free(this);
 }
@@ -215,10 +209,11 @@ static void destroy(private_ha_sync_kernel_t *this)
 /**
  * See header
  */
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
-                                                                               char *virtuals)
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals)
 {
        private_ha_sync_kernel_t *this = malloc_thing(private_ha_sync_kernel_t);
+       segment_mask_t active;
+       int i;
 
        this->public.in_segment = (bool(*)(ha_sync_kernel_t*, host_t *host, u_int segment))in_segment;
        this->public.activate = (void(*)(ha_sync_kernel_t*, u_int segment))activate;
@@ -226,12 +221,12 @@ ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
        this->public.destroy = (void(*)(ha_sync_kernel_t*))destroy;
 
        this->initval = 0;
-       this->segment_count = count;
+       this->count = count;
        this->virtuals = linked_list_create();
 
        parse_virtuals(this, virtuals);
 
-       if (!mangle_rules(this, TRUE, active))
+       if (!mangle_rules(this, TRUE))
        {
                destroy(this);
                return NULL;
index 87ee02aec1dc54a87a06b56d31c8b1c68350b7c7..6803e58ea84ff8ec8b570d834ea3d1ffb60fadcd 100644 (file)
@@ -66,7 +66,6 @@ struct ha_sync_kernel_t {
  * @param active               bitmask of initially active segments
  * @param virtuals             comma separated list of virtual cluster addresses
  */
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
-                                                                               char *virtuals);
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals);
 
 #endif /* HA_SYNC_KERNEL_ @}*/
index 75f9b946ed0f6e2a4e1da925ea9a61078cc61cc0..20eb7eab21bfe24b8d4545db08f600af4f7e7f3f 100644 (file)
@@ -55,6 +55,8 @@ enum ha_sync_message_type_t {
        HA_SYNC_SEGMENT_DROP,
        /** segments the sending node is taking over */
        HA_SYNC_SEGMENT_TAKE,
+       /** status with the segments the sending node is currently serving */
+       HA_SYNC_STATUS,
 };
 
 /**
index ff4341e57926028102744b28fd7dfe5add544eb8..5827b39af555d56a5bf69befba420d4701835a50 100644 (file)
@@ -97,29 +97,6 @@ static void destroy(private_ha_sync_plugin_t *this)
        free(this);
 }
 
-/**
- * Convert segment string to mask
- */
-static segment_mask_t parse_active(char *active)
-{
-       enumerator_t *enumerator;
-       u_int segment;
-       segment_mask_t mask = 0;
-
-       enumerator = enumerator_create_token(active, ",", " ");
-       while (enumerator->enumerate(enumerator, &active))
-       {
-               segment = atoi(active);
-               if (segment > 0 && segment < SEGMENTS_MAX)
-               {
-                       mask |= SEGMENTS_BIT(segment);
-               }
-       }
-       enumerator->destroy(enumerator);
-
-       return mask;
-}
-
 /*
  * see header file
  */
@@ -127,7 +104,6 @@ plugin_t *plugin_create()
 {
        private_ha_sync_plugin_t *this;
        char *local, *remote, *secret, *virtuals;
-       segment_mask_t active;
        u_int count;
        bool fifo;
 
@@ -143,8 +119,6 @@ plugin_t *plugin_create()
                                                                "charon.plugins.ha_sync.fifo_interface", FALSE);
        count = min(SEGMENTS_MAX, lib->settings->get_int(lib->settings,
                                                                "charon.plugins.ha_sync.segment_count", 1));
-       active = parse_active(lib->settings->get_str(lib->settings,
-                                                               "charon.plugins.ha_sync.active_segments", "1"));
        if (!local || !remote)
        {
                DBG1(DBG_CFG, "HA sync config misses local/remote address");
@@ -163,7 +137,7 @@ plugin_t *plugin_create()
                free(this);
                return NULL;
        }
-       this->kernel = ha_sync_kernel_create(count, active, virtuals);
+       this->kernel = ha_sync_kernel_create(count, virtuals);
        if (!this->kernel)
        {
                this->socket->destroy(this->socket);
@@ -176,7 +150,7 @@ plugin_t *plugin_create()
                this->tunnel = ha_sync_tunnel_create(local, remote, secret);
        }
        this->segments = ha_sync_segments_create(this->socket, this->kernel,
-                                                                                        this->tunnel, count, active);
+                                                                               this->tunnel, local, remote, count);
        if (fifo)
        {
                this->ctl = ha_sync_ctl_create(this->segments);
index 4d458038c403f7e47051421a537b33a31c4d08f5..002061396246b7a6009fb04593e996ca0cd9ad8d 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <utils/mutex.h>
 #include <utils/linked_list.h>
+#include <processing/jobs/callback_job.h>
 
 typedef struct private_ha_sync_segments_t private_ha_sync_segments_t;
 
@@ -53,12 +54,17 @@ struct private_ha_sync_segments_t {
        /**
         * Total number of ClusterIP segments
         */
-       u_int segment_count;
+       u_int count;
 
        /**
         * mask of active segments
         */
        segment_mask_t active;
+
+       /**
+        * Are we the master node handling segment assignement?
+        */
+       bool master;
 };
 
 /**
@@ -71,9 +77,9 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
        int i;
        bool first = TRUE;
 
-       for (i = 0; i < this->segment_count; i++)
+       for (i = 1; i <= this->count; i++)
        {
-               if (this->active & 0x01 << i)
+               if (this->active & SEGMENTS_BIT(i))
                {
                        if (first)
                        {
@@ -83,7 +89,7 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
                        {
                                pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
                        }
-                       pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i+1);
+                       pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
                }
        }
        DBG1(DBG_CFG, "HA sync segment %d %sactivated, now active: %s",
@@ -98,19 +104,20 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
 {
        ike_sa_t *ike_sa;
        enumerator_t *enumerator;
-       u_int i, limit;
+       u_int i, from, to;
 
        this->lock->write_lock(this->lock);
 
-       if (segment == 0 || segment <= this->segment_count)
+       if (segment == 0 || segment <= this->count)
        {
                if (segment)
                {       /* loop once for single segment ... */
-                       limit = segment + 1;
+                       from = to = segment;
                }
                else
-               {       /* or segment_count times for all segments */
-                       limit = this->segment_count;
+               {       /* or count times for all segments */
+                       from = 1;
+                       to = this->count;
                }
                enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
                while (enumerator->enumerate(enumerator, &ike_sa))
@@ -123,7 +130,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
                        {
                                continue;
                        }
-                       for (i = segment; i < limit; i++)
+                       for (i = from; i <= to; i++)
                        {
                                if (this->kernel->in_segment(this->kernel,
                                                                                ike_sa->get_other_host(ike_sa), i))
@@ -133,7 +140,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
                        }
                }
                enumerator->destroy(enumerator);
-               for (i = segment; i < limit; i++)
+               for (i = from; i <= to; i++)
                {
                        if (enable)
                        {
@@ -152,7 +159,6 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
                                }
                        }
                }
-
                log_segments(this, enable, segment);
        }
 
@@ -233,7 +239,7 @@ static void resync(private_ha_sync_segments_t *this, u_int segment)
        list = linked_list_create();
        this->lock->read_lock(this->lock);
 
-       if (segment > 0 && segment <= this->segment_count && (this->active & mask))
+       if (segment > 0 && segment <= this->count && (this->active & mask))
        {
                this->active &= ~mask;
 
@@ -290,7 +296,7 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
        {
                int i;
 
-               for (i = 0; i < SEGMENTS_MAX; i++)
+               for (i = 1; i <= this->count; i++)
                {
                        if (this->active & SEGMENTS_BIT(i))
                        {
@@ -301,6 +307,88 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
        return TRUE;
 }
 
+/**
+ * Implementation of ha_sync_segments_t.handle_status
+ */
+static void handle_status(private_ha_sync_segments_t *this, segment_mask_t mask)
+{
+       segment_mask_t missing, overlap;
+       int i, active = 0;
+
+       this->lock->read_lock(this->lock);
+       missing = ~(this->active | mask);
+       overlap = this->active & mask;
+       for (i = 1; i <= this->count; i++)
+       {
+               if (this->active & SEGMENTS_BIT(i))
+               {
+                       active++;
+               }
+       }
+       this->lock->unlock(this->lock);
+
+       /* Activate any missing segment. The master will disable overlapping
+        * segments if both nodes activate the missing segments simultaneously. */
+       for (i = 1; i <= this->count; i++)
+       {
+               if (missing & SEGMENTS_BIT(i))
+               {
+                       DBG1(DBG_CFG, "HA segment %d was not handled", i);
+                       activate(this, i, TRUE);
+               }
+       }
+       if (this->master && overlap)
+       {
+               /* Disable overlapping segment on one node, controlled by master */
+               for (i = 1; i <= this->count; i++)
+               {
+                       if (overlap & SEGMENTS_BIT(i))
+                       {
+                               DBG1(DBG_CFG, "HA segment %d handled twice", i);
+                               if (active > this->count)
+                               {
+                                       deactivate(this, i, TRUE);
+                                       active--;
+                               }
+                               else
+                               {
+                                       activate(this, i, TRUE);
+                                       active++;
+                               }
+                       }
+               }
+       }
+}
+
+/**
+ * Send a status message with our active segments
+ */
+static job_requeue_t send_status(private_ha_sync_segments_t *this)
+{
+       ha_sync_message_t *message;
+       int i;
+
+       message = ha_sync_message_create(HA_SYNC_STATUS);
+
+       for (i = 1; i <= this->count; i++)
+       {
+               if (this->active & SEGMENTS_BIT(i))
+               {
+                       message->add_attribute(message, HA_SYNC_SEGMENT, i);
+               }
+       }
+
+       this->socket->push(this->socket, message);
+
+       /* schedule next invocation */
+       charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
+                                                                       callback_job_create((callback_job_cb_t)
+                                                                               send_status, this, NULL, NULL),
+                                                                       1000);
+
+       return JOB_REQUEUE_NONE;
+}
+
 /**
  * Implementation of ha_sync_segments_t.destroy.
  */
@@ -314,25 +402,35 @@ static void destroy(private_ha_sync_segments_t *this)
  * See header
  */
 ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
-                                                                                       ha_sync_kernel_t *kernel,
-                                                                                       ha_sync_tunnel_t *tunnel,
-                                                                                       u_int count, segment_mask_t active)
+                                                       ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+                                                       char *local, char *remote, u_int count)
 {
        private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t);
+       int i;
 
        memset(&this->public.listener, 0, sizeof(listener_t));
        this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook;
        this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate;
        this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate;
        this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync;
+       this->public.handle_status = (void(*)(ha_sync_segments_t*, segment_mask_t mask))handle_status;
        this->public.destroy = (void(*)(ha_sync_segments_t*))destroy;
 
        this->socket = socket;
        this->tunnel = tunnel;
        this->kernel = kernel;
        this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
-       this->active = active;
-       this->segment_count = count;
+       this->count = count;
+       this->master = strcmp(local, remote) > 0;
+
+       /* initially all segments are active */
+       this->active = 0;
+       for (i = 1; i <= count; i++)
+       {
+               this->active |= SEGMENTS_BIT(i);
+       }
+
+       send_status(this);
 
        return &this->public;
 }
index cf119a8e02a2d6eb2aba4acfeaca391ff7f8d1fd..5f795db1ed4464077ed4b7ffffc3876b7ef8dd6b 100644 (file)
@@ -79,6 +79,13 @@ struct ha_sync_segments_t {
         */
        void (*resync)(ha_sync_segments_t *this, u_int segment);
 
+       /**
+        * Handle a status message from the remote node.
+        *
+        * @param mask          segments the remote node is serving actively
+        */
+       void (*handle_status)(ha_sync_segments_t *this, segment_mask_t mask);
+
        /**
         * Destroy a ha_sync_segments_t.
         */
@@ -95,8 +102,7 @@ struct ha_sync_segments_t {
  * @return                             segment object
  */
 ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
-                                                                                       ha_sync_kernel_t *kernel,
-                                                                                       ha_sync_tunnel_t *tunnel,
-                                                                                       u_int count, segment_mask_t active);
+                                                       ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+                                                       char *local, char *remote, u_int count);
 
 #endif /* HA_SYNC_SEGMENTS_ @}*/
index 41e8185281075893ac056555543efb3249f13140..bc010b76d209079117c30550d3b3825fc2bc118b 100644 (file)
@@ -152,6 +152,7 @@ static void push(private_ha_sync_socket_t *this, ha_sync_message_t *message)
        job = callback_job_create((callback_job_cb_t)send_message,
                                                          data, (void*)job_data_destroy, NULL);
        charon->processor->queue_job(charon->processor, (job_t*)job);
+       sched_yield();
 }
 
 /**