enumerator->destroy(enumerator);
}
+/**
+ * Process messages of type STATUS
+ */
+static void process_status(private_ha_sync_dispatcher_t *this,
+ ha_sync_message_t *message)
+{
+ ha_sync_message_attribute_t attribute;
+ ha_sync_message_value_t value;
+ enumerator_t *enumerator;
+ segment_mask_t mask = 0;
+
+ enumerator = message->create_attribute_enumerator(message);
+ while (enumerator->enumerate(enumerator, &attribute, &value))
+ {
+ switch (attribute)
+ {
+ case HA_SYNC_SEGMENT:
+ mask |= SEGMENTS_BIT(value.u16);
+ break;
+ default:
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ this->segments->handle_status(this->segments, mask);
+}
+
/**
* Dispatcher job function
*/
case HA_SYNC_SEGMENT_TAKE:
process_segment(this, message, TRUE);
break;
+ case HA_SYNC_STATUS:
+ process_status(this, message);
+ break;
default:
DBG1(DBG_CFG, "received unknown HA sync message type %d",
message->get_type(message));
*
* @param socket socket to pull messages from
* @param segments segments to control based on received messages
+ * @param manager distributed management logic for segment control
* @return dispatcher object
*/
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,
/**
* Total number of ClusterIP segments
*/
- u_int segment_count;
+ u_int count;
/**
* List of virtual addresses, as host_t*
addr = *(u_int32_t*)host->get_address(host).ptr;
hash = jhash_1word(ntohl(addr), this->initval);
- if ((((u_int64_t)hash * this->segment_count) >> 32) + 1 == segment)
+ if ((((u_int64_t)hash * this->count) >> 32) + 1 == segment)
{
return TRUE;
}
/**
* Mangle IPtable rules for virtual addresses
*/
-static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
- segment_mask_t active)
+static bool mangle_rules(private_ha_sync_kernel_t *this, bool add)
{
enumerator_t *enumerator;
host_t *host;
host->destroy(host);
continue;
}
- /* iptables insists of a local node specification. We add '1' but drop
- * it afterwards. */
+ /* iptables insists of a local node specification, enable node 1 */
snprintf(buf, sizeof(buf),
"/sbin/iptables -%c INPUT -i %s -d %H -j CLUSTERIP --new "
"--hashmode sourceip --clustermac 01:00:5e:00:00:%2x "
"--total-nodes %d --local-node 1",
- add ? 'A' : 'D', iface, host, mac++, this->segment_count);
+ add ? 'A' : 'D', iface, host, mac++, this->count);
free(iface);
if (system(buf) != 0)
{
if (add)
{
- deactivate(this, 1);
- for (i = 0; i < SEGMENTS_MAX; i++)
+ for (i = 2; i <= this->count; i++)
{
- if (active & SEGMENTS_BIT(i))
- {
- activate(this, i);
- }
+ activate(this, i);
}
}
return TRUE;
*/
static void destroy(private_ha_sync_kernel_t *this)
{
- mangle_rules(this, FALSE, 0);
+ mangle_rules(this, FALSE);
this->virtuals->destroy_offset(this->virtuals, offsetof(host_t, destroy));
free(this);
}
/**
* See header
*/
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
- char *virtuals)
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals)
{
private_ha_sync_kernel_t *this = malloc_thing(private_ha_sync_kernel_t);
+ segment_mask_t active;
+ int i;
this->public.in_segment = (bool(*)(ha_sync_kernel_t*, host_t *host, u_int segment))in_segment;
this->public.activate = (void(*)(ha_sync_kernel_t*, u_int segment))activate;
this->public.destroy = (void(*)(ha_sync_kernel_t*))destroy;
this->initval = 0;
- this->segment_count = count;
+ this->count = count;
this->virtuals = linked_list_create();
parse_virtuals(this, virtuals);
- if (!mangle_rules(this, TRUE, active))
+ if (!mangle_rules(this, TRUE))
{
destroy(this);
return NULL;
* @param active bitmask of initially active segments
* @param virtuals comma separated list of virtual cluster addresses
*/
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
- char *virtuals);
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals);
#endif /* HA_SYNC_KERNEL_ @}*/
HA_SYNC_SEGMENT_DROP,
/** segments the sending node is taking over */
HA_SYNC_SEGMENT_TAKE,
+ /** status with the segments the sending node is currently serving */
+ HA_SYNC_STATUS,
};
/**
free(this);
}
-/**
- * Convert segment string to mask
- */
-static segment_mask_t parse_active(char *active)
-{
- enumerator_t *enumerator;
- u_int segment;
- segment_mask_t mask = 0;
-
- enumerator = enumerator_create_token(active, ",", " ");
- while (enumerator->enumerate(enumerator, &active))
- {
- segment = atoi(active);
- if (segment > 0 && segment < SEGMENTS_MAX)
- {
- mask |= SEGMENTS_BIT(segment);
- }
- }
- enumerator->destroy(enumerator);
-
- return mask;
-}
-
/*
* see header file
*/
{
private_ha_sync_plugin_t *this;
char *local, *remote, *secret, *virtuals;
- segment_mask_t active;
u_int count;
bool fifo;
"charon.plugins.ha_sync.fifo_interface", FALSE);
count = min(SEGMENTS_MAX, lib->settings->get_int(lib->settings,
"charon.plugins.ha_sync.segment_count", 1));
- active = parse_active(lib->settings->get_str(lib->settings,
- "charon.plugins.ha_sync.active_segments", "1"));
if (!local || !remote)
{
DBG1(DBG_CFG, "HA sync config misses local/remote address");
free(this);
return NULL;
}
- this->kernel = ha_sync_kernel_create(count, active, virtuals);
+ this->kernel = ha_sync_kernel_create(count, virtuals);
if (!this->kernel)
{
this->socket->destroy(this->socket);
this->tunnel = ha_sync_tunnel_create(local, remote, secret);
}
this->segments = ha_sync_segments_create(this->socket, this->kernel,
- this->tunnel, count, active);
+ this->tunnel, local, remote, count);
if (fifo)
{
this->ctl = ha_sync_ctl_create(this->segments);
#include <utils/mutex.h>
#include <utils/linked_list.h>
+#include <processing/jobs/callback_job.h>
typedef struct private_ha_sync_segments_t private_ha_sync_segments_t;
/**
* Total number of ClusterIP segments
*/
- u_int segment_count;
+ u_int count;
/**
* mask of active segments
*/
segment_mask_t active;
+
+ /**
+ * Are we the master node handling segment assignement?
+ */
+ bool master;
};
/**
int i;
bool first = TRUE;
- for (i = 0; i < this->segment_count; i++)
+ for (i = 1; i <= this->count; i++)
{
- if (this->active & 0x01 << i)
+ if (this->active & SEGMENTS_BIT(i))
{
if (first)
{
{
pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
}
- pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i+1);
+ pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
}
}
DBG1(DBG_CFG, "HA sync segment %d %sactivated, now active: %s",
{
ike_sa_t *ike_sa;
enumerator_t *enumerator;
- u_int i, limit;
+ u_int i, from, to;
this->lock->write_lock(this->lock);
- if (segment == 0 || segment <= this->segment_count)
+ if (segment == 0 || segment <= this->count)
{
if (segment)
{ /* loop once for single segment ... */
- limit = segment + 1;
+ from = to = segment;
}
else
- { /* or segment_count times for all segments */
- limit = this->segment_count;
+ { /* or count times for all segments */
+ from = 1;
+ to = this->count;
}
enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
while (enumerator->enumerate(enumerator, &ike_sa))
{
continue;
}
- for (i = segment; i < limit; i++)
+ for (i = from; i <= to; i++)
{
if (this->kernel->in_segment(this->kernel,
ike_sa->get_other_host(ike_sa), i))
}
}
enumerator->destroy(enumerator);
- for (i = segment; i < limit; i++)
+ for (i = from; i <= to; i++)
{
if (enable)
{
}
}
}
-
log_segments(this, enable, segment);
}
list = linked_list_create();
this->lock->read_lock(this->lock);
- if (segment > 0 && segment <= this->segment_count && (this->active & mask))
+ if (segment > 0 && segment <= this->count && (this->active & mask))
{
this->active &= ~mask;
{
int i;
- for (i = 0; i < SEGMENTS_MAX; i++)
+ for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
{
return TRUE;
}
+/**
+ * Implementation of ha_sync_segments_t.handle_status
+ */
+static void handle_status(private_ha_sync_segments_t *this, segment_mask_t mask)
+{
+ segment_mask_t missing, overlap;
+ int i, active = 0;
+
+ this->lock->read_lock(this->lock);
+ missing = ~(this->active | mask);
+ overlap = this->active & mask;
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ active++;
+ }
+ }
+ this->lock->unlock(this->lock);
+
+ /* Activate any missing segment. The master will disable overlapping
+ * segments if both nodes activate the missing segments simultaneously. */
+ for (i = 1; i <= this->count; i++)
+ {
+ if (missing & SEGMENTS_BIT(i))
+ {
+ DBG1(DBG_CFG, "HA segment %d was not handled", i);
+ activate(this, i, TRUE);
+ }
+ }
+ if (this->master && overlap)
+ {
+ /* Disable overlapping segment on one node, controlled by master */
+ for (i = 1; i <= this->count; i++)
+ {
+ if (overlap & SEGMENTS_BIT(i))
+ {
+ DBG1(DBG_CFG, "HA segment %d handled twice", i);
+ if (active > this->count)
+ {
+ deactivate(this, i, TRUE);
+ active--;
+ }
+ else
+ {
+ activate(this, i, TRUE);
+ active++;
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Send a status message with our active segments
+ */
+static job_requeue_t send_status(private_ha_sync_segments_t *this)
+{
+ ha_sync_message_t *message;
+ int i;
+
+ message = ha_sync_message_create(HA_SYNC_STATUS);
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ message->add_attribute(message, HA_SYNC_SEGMENT, i);
+ }
+ }
+
+ this->socket->push(this->socket, message);
+
+ /* schedule next invocation */
+ charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
+ callback_job_create((callback_job_cb_t)
+ send_status, this, NULL, NULL),
+ 1000);
+
+ return JOB_REQUEUE_NONE;
+}
+
/**
* Implementation of ha_sync_segments_t.destroy.
*/
* See header
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
- ha_sync_kernel_t *kernel,
- ha_sync_tunnel_t *tunnel,
- u_int count, segment_mask_t active)
+ ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+ char *local, char *remote, u_int count)
{
private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t);
+ int i;
memset(&this->public.listener, 0, sizeof(listener_t));
this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook;
this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate;
this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate;
this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync;
+ this->public.handle_status = (void(*)(ha_sync_segments_t*, segment_mask_t mask))handle_status;
this->public.destroy = (void(*)(ha_sync_segments_t*))destroy;
this->socket = socket;
this->tunnel = tunnel;
this->kernel = kernel;
this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
- this->active = active;
- this->segment_count = count;
+ this->count = count;
+ this->master = strcmp(local, remote) > 0;
+
+ /* initially all segments are active */
+ this->active = 0;
+ for (i = 1; i <= count; i++)
+ {
+ this->active |= SEGMENTS_BIT(i);
+ }
+
+ send_status(this);
return &this->public;
}
*/
void (*resync)(ha_sync_segments_t *this, u_int segment);
+ /**
+ * Handle a status message from the remote node.
+ *
+ * @param mask segments the remote node is serving actively
+ */
+ void (*handle_status)(ha_sync_segments_t *this, segment_mask_t mask);
+
/**
* Destroy a ha_sync_segments_t.
*/
* @return segment object
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
- ha_sync_kernel_t *kernel,
- ha_sync_tunnel_t *tunnel,
- u_int count, segment_mask_t active);
+ ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+ char *local, char *remote, u_int count);
#endif /* HA_SYNC_SEGMENTS_ @}*/
job = callback_job_create((callback_job_cb_t)send_message,
data, (void*)job_data_destroy, NULL);
charon->processor->queue_job(charon->processor, (job_t*)job);
+ sched_yield();
}
/**