]> git.ipfire.org Git - thirdparty/squid.git/blame - src/log/TcpLogger.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / log / TcpLogger.cc
CommitLineData
bbc27441 1/*
77b1029d 2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
fb0c2f17
NH
9#include "squid.h"
10#include "comm.h"
fb0c2f17 11#include "comm/Connection.h"
602d9612 12#include "comm/ConnOpener.h"
fb0c2f17
NH
13#include "comm/Loops.h"
14#include "comm/Write.h"
ed6e9fb9 15#include "fatal.h"
fb0c2f17
NH
16#include "fde.h"
17#include "globals.h" // for shutting_down
18#include "log/CustomLog.h"
19#include "log/File.h"
20#include "log/TcpLogger.h"
fb0c2f17 21#include "Parsing.h"
323547b0 22#include "sbuf/MemBlob.h"
fb0c2f17
NH
23#include "SquidConfig.h"
24#include "SquidTime.h"
25
26// a single I/O buffer should be large enough to store any access.log record
27const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
28
29// We need at least two buffers because when we write the first buffer,
30// we have to use the second buffer to accumulate new entries.
31const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize;
32
33#define MY_DEBUG_SECTION 50 /* Log file handling */
34
35CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger);
36
37Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
f53969cc
SM
38 AsyncJob("TcpLogger"),
39 dieOnError(dieOnErr),
40 bufferCapacity(bufCap),
41 bufferedSize(0),
42 flushDebt(0),
43 quitOnEmpty(false),
44 reconnectScheduled(false),
45 writeScheduled(false),
46 conn(NULL),
47 remote(them),
48 connectFailures(0),
49 drops(0)
fb0c2f17
NH
50{
51 if (bufferCapacity < BufferCapacityMin) {
52 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT,
53 "WARNING: tcp:" << remote << " logger configured buffer " <<
54 "size " << bufferCapacity << " is smaller than the " <<
55 BufferCapacityMin << "-byte" << " minimum. " <<
56 "Using the minimum instead.");
57 bufferCapacity = BufferCapacityMin;
58 }
59}
60
61Log::TcpLogger::~TcpLogger()
62{
63 // make sure Comm::Write does not have our buffer pointer
64 assert(!writeScheduled);
65}
66
67void
68Log::TcpLogger::start()
69{
d4c669ca 70 doConnect();
fb0c2f17
NH
71}
72
73bool
74Log::TcpLogger::doneAll() const
75{
76 debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
77 " buffered: " << bufferedSize <<
78 " conn: " << conn << ' ' << connectFailures);
79
80 // we do not quit unless we are told that we may
81 if (!quitOnEmpty)
82 return false;
83
84 /* We were asked to quit after we are done writing buffers. Are we done? */
85
86 // If we have records but are failing to connect, quit. Otherwise, we may
87 // be trying to connect forever due to a [since fixed] misconfiguration!
88 const bool failingToConnect = !conn && connectFailures;
89 if (bufferedSize && !failingToConnect)
90 return false;
91
92 return AsyncJob::doneAll();
93}
94
95void
96Log::TcpLogger::swanSong()
97{
98 disconnect(); // optional: refcounting should close/delete conn eventually
99 AsyncJob::swanSong();
100}
101
102void
103Log::TcpLogger::endGracefully()
104{
105 // job call protection must end our job if we are done logging current bufs
106 assert(inCall != NULL);
107 quitOnEmpty = true;
108 flush();
109}
110
111void
112Log::TcpLogger::flush()
113{
114 flushDebt = bufferedSize;
115 writeIfNeeded();
116}
117
118void
119Log::TcpLogger::logRecord(const char *buf, const size_t len)
120{
121 appendRecord(buf, len);
122 writeIfNeeded();
123}
124
125/// starts writing if and only if it is time to write accumulated records
126void
127Log::TcpLogger::writeIfNeeded()
128{
129 // write if an earlier flush command forces us to write or
130 // if we have filled at least one I/O buffer
131 if (flushDebt > 0 || buffers.size() > 1)
132 writeIfPossible();
133}
134
135/// starts writing if possible
136void Log::TcpLogger::writeIfPossible()
137{
138 debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
139 (bufferedSize > 0) << (conn != NULL) <<
140 (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " <<
141 bufferedSize << '/' << buffers.size());
142
143 // XXX: Squid shutdown sequence starts closing our connection before
144 // calling LogfileClose, leading to loss of log records during shutdown.
145 if (!writeScheduled && bufferedSize > 0 && conn != NULL &&
b4bb4694 146 !fd_table[conn->fd].closing()) {
fb0c2f17
NH
147 debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
148
149 typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
150 AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone);
151 const MemBlob::Pointer &buffer = buffers.front();
152 Comm::Write(conn, buffer->mem, buffer->size, callback, NULL);
153 writeScheduled = true;
154 }
155}
156
157/// whether len more bytes can be buffered
158bool
159Log::TcpLogger::canFit(const size_t len) const
160{
161 // TODO: limit reporting frequency in addition to reporting only changes
162
163 if (bufferedSize+len <= bufferCapacity) {
164 if (drops) {
165 // We can get here if a shorter record accidentally fits after we
166 // started dropping records. When that happens, the following
167 // DBG_IMPORTANT message will mislead admin into thinking that
168 // the problem was resolved (for a brief period of time, until
169 // another record comes in and overflows the buffer). It is
170 // difficult to prevent this without also creating the opposite
171 // problem: A huge record that does not fit and is dropped blocks
172 // subsequent regular records from being buffered until we write.
173 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
b4bb4694
A
174 " logger stops dropping records after " << drops << " drops" <<
175 "; current buffer use: " << (bufferedSize+len) <<
176 " out of " << bufferCapacity << " bytes");
fb0c2f17
NH
177 }
178 return true;
179 }
180
181 if (!drops || dieOnError) {
182 debugs(MY_DEBUG_SECTION,
183 dieOnError ? DBG_CRITICAL : DBG_IMPORTANT,
184 "tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
185 "buffer overflowed; cannot fit " <<
186 (bufferedSize+len-bufferCapacity) << " bytes");
187 }
188
189 if (dieOnError)
190 fatal("tcp logger buffer overflowed");
191
192 if (!drops) {
193 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
194 " logger starts dropping records.");
195 }
196
197 return false;
198}
199
200/// buffer a record that might exceed IoBufSize
201void
202Log::TcpLogger::appendRecord(const char *record, const size_t len)
203{
204 // they should not happen, but to be safe, let's protect drop start/stop
205 // monitoring algorithm from empty records (which can never be dropped)
206 if (!len)
207 return;
208
209 if (!canFit(len)) {
210 ++drops;
211 return;
212 }
213
214 drops = 0;
215 // append without spliting buf, unless it exceeds IoBufSize
216 for (size_t off = 0; off < len; off += IoBufSize)
217 appendChunk(record + off, min(len - off, IoBufSize));
218}
219
220/// buffer a record chunk without splitting it across buffers
221void
222Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
223{
224 Must(len <= IoBufSize);
225 // add a buffer if there is not one that can accomodate len bytes
226 bool addBuffer = buffers.empty() ||
227 (buffers.back()->size+len > IoBufSize);
228 // also add a buffer if there is only one and that one is being written
229 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
230
231 if (addBuffer) {
232 buffers.push_back(new MemBlob(IoBufSize));
233 debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
234 }
235
236 Must(!buffers.empty());
237 buffers.back()->append(chunk, len);
238 bufferedSize += len;
239}
240
241/// starts [re]connecting to the remote logger
242void
d4c669ca 243Log::TcpLogger::doConnect()
fb0c2f17
NH
244{
245 if (shutting_down)
246 return;
247
248 debugs(MY_DEBUG_SECTION, 3, "connecting");
249 Must(!conn);
250
251 Comm::ConnectionPointer futureConn = new Comm::Connection;
252 futureConn->remote = remote;
4dd643d5
AJ
253 futureConn->local.setAnyAddr();
254 if (futureConn->remote.isIPv4())
255 futureConn->local.setIPv4();
fb0c2f17
NH
256
257 typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
258 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
259 AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
260}
261
262/// Comm::ConnOpener callback
263void
264Log::TcpLogger::connectDone(const CommConnectCbParams &params)
265{
c8407295 266 if (params.flag != Comm::OK) {
fb0c2f17
NH
267 const double delay = 0.5; // seconds
268 if (connectFailures++ % 100 == 0) {
269 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
270 " logger connection attempt #" << connectFailures <<
271 " failed. Will keep trying every " << delay << " seconds.");
272 }
273
274 if (!reconnectScheduled) {
275 reconnectScheduled = true;
276 eventAdd("Log::TcpLogger::DelayedReconnect",
277 Log::TcpLogger::DelayedReconnect,
278 new Pointer(this), 0.5, 0, false);
279 }
280 } else {
281 if (connectFailures > 0) {
282 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
283 " logger connectivity restored after " <<
284 (connectFailures+1) << " attempts.");
285 connectFailures = 0;
286 }
287
288 Must(!conn);
289 conn = params.conn;
290
291 Must(!closer);
292 typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer;
293 closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
294 comm_add_close_handler(conn->fd, closer);
295
296 writeIfNeeded();
297 }
298}
299
300// XXX: Needed until eventAdd() starts accepting Async calls directly.
301/// Log::TcpLogger::delayedReconnect() wrapper.
302void
303Log::TcpLogger::DelayedReconnect(void *data)
304{
305 Pointer *ptr = static_cast<Pointer*>(data);
306 assert(ptr);
307 if (TcpLogger *logger = ptr->valid()) {
308 // Get back inside AsyncJob protections by scheduling another call.
309 typedef NullaryMemFunT<TcpLogger> Dialer;
310 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer,
311 logger,
312 Log::TcpLogger::delayedReconnect);
313 ScheduleCallHere(call);
314 }
315 delete ptr;
316}
317
318/// "sleep a little before trying to connect again" event callback
319void
b4bb4694
A
320Log::TcpLogger::delayedReconnect()
321{
fb0c2f17
NH
322 Must(reconnectScheduled);
323 Must(!conn);
324 reconnectScheduled = false;
d4c669ca 325 doConnect();
fb0c2f17
NH
326}
327
328/// Comm::Write callback
329void
330Log::TcpLogger::writeDone(const CommIoCbParams &io)
331{
332 writeScheduled = false;
c8407295 333 if (io.flag == Comm::ERR_CLOSING) {
fb0c2f17
NH
334 debugs(MY_DEBUG_SECTION, 7, "closing");
335 // do nothing here -- our comm_close_handler will be called to clean up
c8407295 336 } else if (io.flag != Comm::OK) {
fb0c2f17
NH
337 debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
338 // keep the first buffer (the one we failed to write)
339 disconnect();
d4c669ca 340 doConnect();
fb0c2f17
NH
341 } else {
342 debugs(MY_DEBUG_SECTION, 5, "write successful");
343
344 Must(!buffers.empty()); // we had a buffer to write
345 const MemBlob::Pointer &written = buffers.front();
346 const size_t writtenSize = static_cast<size_t>(written->size);
347 // and we wrote the whole buffer
16b1fd57 348 Must(io.size == writtenSize);
fb0c2f17
NH
349 Must(bufferedSize >= writtenSize);
350 bufferedSize -= writtenSize;
351
352 buffers.pop_front();
353
354 if (flushDebt > io.size)
355 flushDebt -= io.size;
356 else
357 flushDebt = 0; // wrote everything we owed (or more)
358
359 writeIfNeeded();
360 }
361}
362
b4bb4694 363/// This is our comm_close_handler. It is called when some external force
fb0c2f17
NH
364/// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
365void
ced8def3 366Log::TcpLogger::handleClosure(const CommCloseCbParams &)
fb0c2f17
NH
367{
368 assert(inCall != NULL);
369 closer = NULL;
370 conn = NULL;
371 // in all current use cases, we should not try to reconnect
372 mustStop("Log::TcpLogger::handleClosure");
373}
374
375/// close our connection now, without flushing
376void
377Log::TcpLogger::disconnect()
378{
379 if (conn != NULL) {
380 if (closer != NULL) {
381 comm_remove_close_handler(conn->fd, closer);
382 closer = NULL;
383 }
384 conn->close();
385 conn = NULL;
386 }
387}
388
b4bb4694 389/// Converts Logfile into a pointer to a valid TcpLogger job or,
fb0c2f17
NH
390/// if the logger job has quit, into a nill pointer
391Log::TcpLogger *
392Log::TcpLogger::StillLogging(Logfile *lf)
393{
394 if (Pointer *pptr = static_cast<Pointer*>(lf->data))
395 return pptr->get(); // may be nil
396 return NULL;
397}
398
399void
400Log::TcpLogger::Flush(Logfile * lf)
401{
402 if (TcpLogger *logger = StillLogging(lf))
403 logger->flush();
404}
405
406void
407Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
408{
409 if (TcpLogger *logger = StillLogging(lf))
410 logger->logRecord(buf, len);
411}
412
413void
ced8def3 414Log::TcpLogger::StartLine(Logfile *)
fb0c2f17
NH
415{
416}
417
418void
419Log::TcpLogger::EndLine(Logfile * lf)
420{
421 if (!Config.onoff.buffered_logs)
422 Flush(lf);
423}
424
425void
efc23871 426Log::TcpLogger::Rotate(Logfile *, const int16_t)
fb0c2f17
NH
427{
428}
429
430void
431Log::TcpLogger::Close(Logfile * lf)
432{
433 if (TcpLogger *logger = StillLogging(lf)) {
434 debugs(50, 3, "Closing " << logger);
435 typedef NullaryMemFunT<TcpLogger> Dialer;
436 Dialer dialer(logger, &Log::TcpLogger::endGracefully);
437 AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
438 ScheduleCallHere(call);
439 }
440 delete static_cast<Pointer*>(lf->data);
441 lf->data = NULL;
442}
443
444/*
445 * This code expects the path to be //host:port
446 */
447int
448Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
449{
450 assert(!StillLogging(lf));
451 debugs(5, 3, "Tcp Open called");
452
453 Ip::Address addr;
454
455 if (strncmp(path, "//", 2) == 0)
456 path += 2;
457 char *strAddr = xstrdup(path);
458 if (!GetHostWithPort(strAddr, &addr)) {
459 if (lf->flags.fatal) {
460 fatalf("Invalid TCP logging address '%s'\n", lf->path);
461 } else {
462 debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'");
463 safe_free(strAddr);
464 return FALSE;
465 }
466 }
467 safe_free(strAddr);
468
469 TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
470 lf->data = new Pointer(logger);
471 lf->f_close = &Close;
472 lf->f_linewrite = &WriteLine;
473 lf->f_linestart = &StartLine;
474 lf->f_lineend = &EndLine;
475 lf->f_flush = &Flush;
476 lf->f_rotate = &Rotate;
477 AsyncJob::Start(logger);
478
479 return 1;
480}
f53969cc 481