static inline ssize_t sink_write(struct sink *sink, struct log_header hdr,
size_t maxlen, const struct ist msg[], size_t nmsg)
{
- ssize_t sent;
+ ssize_t sent = 0;
- if (unlikely(sink->ctx.dropped > 0)) {
- /* We need to take an exclusive lock so that other producers
- * don't do the same thing at the same time and above all we
- * want to be sure others have finished sending their messages
- * so that the dropped event arrives exactly at the right
- * position.
- */
- HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.lock);
+ if (unlikely(HA_ATOMIC_LOAD(&sink->ctx.dropped) > 0)) {
sent = sink_announce_dropped(sink, hdr);
- HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.lock);
if (!sent) {
/* we failed, we don't try to send our log as if it
}
}
- HA_RWLOCK_RDLOCK(RING_LOCK, &sink->ctx.lock);
sent = __sink_write(sink, hdr, maxlen, msg, nmsg);
- HA_RWLOCK_RDUNLOCK(RING_LOCK, &sink->ctx.lock);
fail:
if (unlikely(sent <= 0))
- HA_ATOMIC_INC(&sink->ctx.dropped);
+ HA_ATOMIC_ADD(&sink->ctx.dropped, 2);
return sent;
}
/* address will be filled by the caller if needed */
sink->ctx.fd = -1;
sink->ctx.dropped = 0;
- HA_RWLOCK_INIT(&sink->ctx.lock);
LIST_APPEND(&sink_list, &sink->sink_list);
end:
return sink;
* here with the only difference that we override the log level. This is
* possible since the announce message will be sent from the same context.
*
- * In case of success, the amount of drops is reduced by as much. It's supposed
- * to be called under an exclusive lock on the sink to avoid multiple producers
- * doing the same. On success, >0 is returned, otherwise <=0 on failure.
+ * In case of success, the amount of drops is reduced by as much.
+ * The function ensures that a single thread will do that work at once, other
+ * ones will only report a failure if a thread is dumping, so that no thread
+ * waits. A pair od atomic OR and AND is performed around the code so the
+ * caller would be advised to only call this function AFTER having verified
+ * that sink->ctx.dropped is not zero in order to avoid a memory write. On
+ * success, >0 is returned, otherwise <=0 on failure, indicating that it could
+ * not eliminate the pending drop counter. It may loop up to 10 times trying
+ * to catch up with failing competing threads.
*/
int sink_announce_dropped(struct sink *sink, struct log_header hdr)
{
uint dropped, last_dropped;
struct ist msgvec[1];
uint retries = 10;
+ int ret = 0;
+
+ /* Explanation. ctx.dropped is made of:
+ * bit0 = 1 if dropped dump in progress
+ * bit1..31 = dropped counter
+ * If non-zero there have been some drops. If not &1, it means
+ * nobody's taking care of them and we'll have to, otherwise
+ * another thread is already on them and we can just pass and
+ * count another drop (hence add 2).
+ */
+ dropped = HA_ATOMIC_FETCH_OR(&sink->ctx.dropped, 1);
+ if (dropped & 1) {
+ /* another thread was already on it */
+ goto leave;
+ }
last_dropped = 0;
+ dropped >>= 1;
while (1) {
- while (unlikely((dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped)) > last_dropped) && retries-- > 0) {
+ while (unlikely(dropped > last_dropped) && retries-- > 0) {
/* try to aggregate multiple messages if other threads arrive while
* we're producing the dropped message.
*/
msglen = msg_dropped2 + sizeof(msg_dropped2) - msg;
}
msgvec[0] = ist2(msg, msglen);
+ dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1;
}
+
if (!dropped)
break;
hdr.level = LOG_NOTICE; /* override level but keep original log header data */
if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0)
- return 0;
+ goto done;
+
/* success! */
- HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
+ HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1);
}
- return 1;
+
+ /* done! */
+ ret = 1;
+done:
+ /* unlock the counter */
+ HA_ATOMIC_AND(&sink->ctx.dropped, ~1);
+leave:
+ return ret;
}
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */