From: Olivier Houchard Date: Mon, 29 Apr 2019 16:52:06 +0000 (+0200) Subject: BUG/MEDIUM: port_range: Make the ring buffer lock-free. X-Git-Tag: v2.0-dev3~144 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=07425de71777b688e77a9c70a7088c13e66e41e9;p=thirdparty%2Fhaproxy.git BUG/MEDIUM: port_range: Make the ring buffer lock-free. Port range uses a ring buffer, and unfortunately, when making haproxy multithreaded, it's been overlooked, and the ring buffer is not thread-safe. When specifying a source range, 2 or more threads could pick the same port, and of course only one of them could use the port, the others would always fail the connection. To fix this, make it a lock-free ring buffer. This is easier than usual because we know the ring buffer can never be full. This should be backported to 1.8 and 1.9. --- diff --git a/include/proto/port_range.h b/include/proto/port_range.h index 8c63faca92..c651fa862f 100644 --- a/include/proto/port_range.h +++ b/include/proto/port_range.h @@ -24,18 +24,24 @@ #include +#define GET_NEXT_OFF(range, off) ((off) == (range)->size - 1 ? 0 : (off) + 1) + /* return an available port from range , or zero if none is left */ static inline int port_range_alloc_port(struct port_range *range) { int ret; + int get; + int put; - if (!range->avail) - return 0; - ret = range->ports[range->get]; - range->get++; - if (range->get >= range->size) - range->get = 0; - range->avail--; + get = _HA_ATOMIC_LOAD(&range->get); + do { + /* barrier ot make sure get is loaded before put */ + __ha_barrier_atomic_load(); + put = _HA_ATOMIC_LOAD(&range->put_t); + if (unlikely(put == get)) + return 0; + ret = range->ports[get]; + } while (!(_HA_ATOMIC_CAS(&range->get, &get, GET_NEXT_OFF(range, get)))); return ret; } @@ -45,14 +51,30 @@ static inline int port_range_alloc_port(struct port_range *range) */ static inline void port_range_release_port(struct port_range *range, int port) { + int put; + if (!port || !range) return; - range->ports[range->put] = port; - range->avail++; - range->put++; - if (range->put >= range->size) - range->put = 0; + put = range->put_h; + /* put_h is reserved for producers, so that they can each get a + * free slot, put_t is what is used by consumers to know if there's + * elements available or not + */ + /* First reserve or slot, we know the ring buffer can't be full, + * as we will only ever release port we allocated before + */ + while (!(_HA_ATOMIC_CAS(&range->put_h, &put, GET_NEXT_OFF(range, put)))); + _HA_ATOMIC_STORE(&range->ports[put], port); + /* Barrier to make sure the new port is visible before we change put_t */ + __ha_barrier_atomic_store(); + /* Wait until all the threads that got a slot before us are done */ + while ((volatile int)range->put_t != put) + __ha_compiler_barrier(); + /* Let the world know we're done, and any potential consumer they + * can use that port. + */ + _HA_ATOMIC_STORE(&range->put_t, GET_NEXT_OFF(range, put)); } /* return a new initialized port range of N ports. The ports are not @@ -62,8 +84,10 @@ static inline struct port_range *port_range_alloc_range(int n) { struct port_range *ret; ret = calloc(1, sizeof(struct port_range) + - n * sizeof(((struct port_range *)0)->ports[0])); - ret->size = ret->avail = n; + (n + 1) * sizeof(((struct port_range *)0)->ports[0])); + ret->size = n + 1; + /* Start at the first free element */ + ret->put_h = ret->put_t = n; return ret; } diff --git a/include/types/port_range.h b/include/types/port_range.h index 1d010f77ae..33455d2dd1 100644 --- a/include/types/port_range.h +++ b/include/types/port_range.h @@ -25,8 +25,7 @@ #include struct port_range { - int size, get, put; /* range size, and get/put positions */ - int avail; /* number of available ports left */ + int size, get, put_h, put_t; /* range size, and get/put positions */ uint16_t ports[0]; /* array of ports, in host byte order */ };