]> git.ipfire.org Git - thirdparty/squid.git/blame - src/log/TcpLogger.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / log / TcpLogger.cc
CommitLineData
bbc27441
AJ
1/*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
fb0c2f17
NH
9#include "squid.h"
10#include "comm.h"
fb0c2f17 11#include "comm/Connection.h"
602d9612 12#include "comm/ConnOpener.h"
fb0c2f17
NH
13#include "comm/Loops.h"
14#include "comm/Write.h"
15#include "fde.h"
16#include "globals.h" // for shutting_down
17#include "log/CustomLog.h"
18#include "log/File.h"
19#include "log/TcpLogger.h"
20#include "MemBlob.h"
21#include "Parsing.h"
22#include "SquidConfig.h"
23#include "SquidTime.h"
24
25// a single I/O buffer should be large enough to store any access.log record
26const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
27
28// We need at least two buffers because when we write the first buffer,
29// we have to use the second buffer to accumulate new entries.
30const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize;
31
32#define MY_DEBUG_SECTION 50 /* Log file handling */
33
34CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger);
35
36Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
b4bb4694
A
37 AsyncJob("TcpLogger"),
38 dieOnError(dieOnErr),
39 bufferCapacity(bufCap),
40 bufferedSize(0),
41 flushDebt(0),
42 quitOnEmpty(false),
43 reconnectScheduled(false),
44 writeScheduled(false),
45 conn(NULL),
46 remote(them),
47 connectFailures(0),
48 drops(0)
fb0c2f17
NH
49{
50 if (bufferCapacity < BufferCapacityMin) {
51 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT,
52 "WARNING: tcp:" << remote << " logger configured buffer " <<
53 "size " << bufferCapacity << " is smaller than the " <<
54 BufferCapacityMin << "-byte" << " minimum. " <<
55 "Using the minimum instead.");
56 bufferCapacity = BufferCapacityMin;
57 }
58}
59
60Log::TcpLogger::~TcpLogger()
61{
62 // make sure Comm::Write does not have our buffer pointer
63 assert(!writeScheduled);
64}
65
66void
67Log::TcpLogger::start()
68{
d4c669ca 69 doConnect();
fb0c2f17
NH
70}
71
72bool
73Log::TcpLogger::doneAll() const
74{
75 debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
76 " buffered: " << bufferedSize <<
77 " conn: " << conn << ' ' << connectFailures);
78
79 // we do not quit unless we are told that we may
80 if (!quitOnEmpty)
81 return false;
82
83 /* We were asked to quit after we are done writing buffers. Are we done? */
84
85 // If we have records but are failing to connect, quit. Otherwise, we may
86 // be trying to connect forever due to a [since fixed] misconfiguration!
87 const bool failingToConnect = !conn && connectFailures;
88 if (bufferedSize && !failingToConnect)
89 return false;
90
91 return AsyncJob::doneAll();
92}
93
94void
95Log::TcpLogger::swanSong()
96{
97 disconnect(); // optional: refcounting should close/delete conn eventually
98 AsyncJob::swanSong();
99}
100
101void
102Log::TcpLogger::endGracefully()
103{
104 // job call protection must end our job if we are done logging current bufs
105 assert(inCall != NULL);
106 quitOnEmpty = true;
107 flush();
108}
109
110void
111Log::TcpLogger::flush()
112{
113 flushDebt = bufferedSize;
114 writeIfNeeded();
115}
116
117void
118Log::TcpLogger::logRecord(const char *buf, const size_t len)
119{
120 appendRecord(buf, len);
121 writeIfNeeded();
122}
123
124/// starts writing if and only if it is time to write accumulated records
125void
126Log::TcpLogger::writeIfNeeded()
127{
128 // write if an earlier flush command forces us to write or
129 // if we have filled at least one I/O buffer
130 if (flushDebt > 0 || buffers.size() > 1)
131 writeIfPossible();
132}
133
134/// starts writing if possible
135void Log::TcpLogger::writeIfPossible()
136{
137 debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
138 (bufferedSize > 0) << (conn != NULL) <<
139 (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " <<
140 bufferedSize << '/' << buffers.size());
141
142 // XXX: Squid shutdown sequence starts closing our connection before
143 // calling LogfileClose, leading to loss of log records during shutdown.
144 if (!writeScheduled && bufferedSize > 0 && conn != NULL &&
b4bb4694 145 !fd_table[conn->fd].closing()) {
fb0c2f17
NH
146 debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
147
148 typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
149 AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone);
150 const MemBlob::Pointer &buffer = buffers.front();
151 Comm::Write(conn, buffer->mem, buffer->size, callback, NULL);
152 writeScheduled = true;
153 }
154}
155
156/// whether len more bytes can be buffered
157bool
158Log::TcpLogger::canFit(const size_t len) const
159{
160 // TODO: limit reporting frequency in addition to reporting only changes
161
162 if (bufferedSize+len <= bufferCapacity) {
163 if (drops) {
164 // We can get here if a shorter record accidentally fits after we
165 // started dropping records. When that happens, the following
166 // DBG_IMPORTANT message will mislead admin into thinking that
167 // the problem was resolved (for a brief period of time, until
168 // another record comes in and overflows the buffer). It is
169 // difficult to prevent this without also creating the opposite
170 // problem: A huge record that does not fit and is dropped blocks
171 // subsequent regular records from being buffered until we write.
172 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
b4bb4694
A
173 " logger stops dropping records after " << drops << " drops" <<
174 "; current buffer use: " << (bufferedSize+len) <<
175 " out of " << bufferCapacity << " bytes");
fb0c2f17
NH
176 }
177 return true;
178 }
179
180 if (!drops || dieOnError) {
181 debugs(MY_DEBUG_SECTION,
182 dieOnError ? DBG_CRITICAL : DBG_IMPORTANT,
183 "tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
184 "buffer overflowed; cannot fit " <<
185 (bufferedSize+len-bufferCapacity) << " bytes");
186 }
187
188 if (dieOnError)
189 fatal("tcp logger buffer overflowed");
190
191 if (!drops) {
192 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
193 " logger starts dropping records.");
194 }
195
196 return false;
197}
198
199/// buffer a record that might exceed IoBufSize
200void
201Log::TcpLogger::appendRecord(const char *record, const size_t len)
202{
203 // they should not happen, but to be safe, let's protect drop start/stop
204 // monitoring algorithm from empty records (which can never be dropped)
205 if (!len)
206 return;
207
208 if (!canFit(len)) {
209 ++drops;
210 return;
211 }
212
213 drops = 0;
214 // append without spliting buf, unless it exceeds IoBufSize
215 for (size_t off = 0; off < len; off += IoBufSize)
216 appendChunk(record + off, min(len - off, IoBufSize));
217}
218
219/// buffer a record chunk without splitting it across buffers
220void
221Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
222{
223 Must(len <= IoBufSize);
224 // add a buffer if there is not one that can accomodate len bytes
225 bool addBuffer = buffers.empty() ||
226 (buffers.back()->size+len > IoBufSize);
227 // also add a buffer if there is only one and that one is being written
228 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
229
230 if (addBuffer) {
231 buffers.push_back(new MemBlob(IoBufSize));
232 debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
233 }
234
235 Must(!buffers.empty());
236 buffers.back()->append(chunk, len);
237 bufferedSize += len;
238}
239
240/// starts [re]connecting to the remote logger
241void
d4c669ca 242Log::TcpLogger::doConnect()
fb0c2f17
NH
243{
244 if (shutting_down)
245 return;
246
247 debugs(MY_DEBUG_SECTION, 3, "connecting");
248 Must(!conn);
249
250 Comm::ConnectionPointer futureConn = new Comm::Connection;
251 futureConn->remote = remote;
4dd643d5
AJ
252 futureConn->local.setAnyAddr();
253 if (futureConn->remote.isIPv4())
254 futureConn->local.setIPv4();
fb0c2f17
NH
255
256 typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
257 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
258 AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
259}
260
261/// Comm::ConnOpener callback
262void
263Log::TcpLogger::connectDone(const CommConnectCbParams &params)
264{
c8407295 265 if (params.flag != Comm::OK) {
fb0c2f17
NH
266 const double delay = 0.5; // seconds
267 if (connectFailures++ % 100 == 0) {
268 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
269 " logger connection attempt #" << connectFailures <<
270 " failed. Will keep trying every " << delay << " seconds.");
271 }
272
273 if (!reconnectScheduled) {
274 reconnectScheduled = true;
275 eventAdd("Log::TcpLogger::DelayedReconnect",
276 Log::TcpLogger::DelayedReconnect,
277 new Pointer(this), 0.5, 0, false);
278 }
279 } else {
280 if (connectFailures > 0) {
281 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
282 " logger connectivity restored after " <<
283 (connectFailures+1) << " attempts.");
284 connectFailures = 0;
285 }
286
287 Must(!conn);
288 conn = params.conn;
289
290 Must(!closer);
291 typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer;
292 closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
293 comm_add_close_handler(conn->fd, closer);
294
295 writeIfNeeded();
296 }
297}
298
299// XXX: Needed until eventAdd() starts accepting Async calls directly.
300/// Log::TcpLogger::delayedReconnect() wrapper.
301void
302Log::TcpLogger::DelayedReconnect(void *data)
303{
304 Pointer *ptr = static_cast<Pointer*>(data);
305 assert(ptr);
306 if (TcpLogger *logger = ptr->valid()) {
307 // Get back inside AsyncJob protections by scheduling another call.
308 typedef NullaryMemFunT<TcpLogger> Dialer;
309 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer,
310 logger,
311 Log::TcpLogger::delayedReconnect);
312 ScheduleCallHere(call);
313 }
314 delete ptr;
315}
316
317/// "sleep a little before trying to connect again" event callback
318void
b4bb4694
A
319Log::TcpLogger::delayedReconnect()
320{
fb0c2f17
NH
321 Must(reconnectScheduled);
322 Must(!conn);
323 reconnectScheduled = false;
d4c669ca 324 doConnect();
fb0c2f17
NH
325}
326
327/// Comm::Write callback
328void
329Log::TcpLogger::writeDone(const CommIoCbParams &io)
330{
331 writeScheduled = false;
c8407295 332 if (io.flag == Comm::ERR_CLOSING) {
fb0c2f17
NH
333 debugs(MY_DEBUG_SECTION, 7, "closing");
334 // do nothing here -- our comm_close_handler will be called to clean up
c8407295 335 } else if (io.flag != Comm::OK) {
fb0c2f17
NH
336 debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
337 // keep the first buffer (the one we failed to write)
338 disconnect();
d4c669ca 339 doConnect();
fb0c2f17
NH
340 } else {
341 debugs(MY_DEBUG_SECTION, 5, "write successful");
342
343 Must(!buffers.empty()); // we had a buffer to write
344 const MemBlob::Pointer &written = buffers.front();
345 const size_t writtenSize = static_cast<size_t>(written->size);
346 // and we wrote the whole buffer
16b1fd57 347 Must(io.size == writtenSize);
fb0c2f17
NH
348 Must(bufferedSize >= writtenSize);
349 bufferedSize -= writtenSize;
350
351 buffers.pop_front();
352
353 if (flushDebt > io.size)
354 flushDebt -= io.size;
355 else
356 flushDebt = 0; // wrote everything we owed (or more)
357
358 writeIfNeeded();
359 }
360}
361
b4bb4694 362/// This is our comm_close_handler. It is called when some external force
fb0c2f17
NH
363/// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
364void
365Log::TcpLogger::handleClosure(const CommCloseCbParams &io)
366{
367 assert(inCall != NULL);
368 closer = NULL;
369 conn = NULL;
370 // in all current use cases, we should not try to reconnect
371 mustStop("Log::TcpLogger::handleClosure");
372}
373
374/// close our connection now, without flushing
375void
376Log::TcpLogger::disconnect()
377{
378 if (conn != NULL) {
379 if (closer != NULL) {
380 comm_remove_close_handler(conn->fd, closer);
381 closer = NULL;
382 }
383 conn->close();
384 conn = NULL;
385 }
386}
387
b4bb4694 388/// Converts Logfile into a pointer to a valid TcpLogger job or,
fb0c2f17
NH
389/// if the logger job has quit, into a nill pointer
390Log::TcpLogger *
391Log::TcpLogger::StillLogging(Logfile *lf)
392{
393 if (Pointer *pptr = static_cast<Pointer*>(lf->data))
394 return pptr->get(); // may be nil
395 return NULL;
396}
397
398void
399Log::TcpLogger::Flush(Logfile * lf)
400{
401 if (TcpLogger *logger = StillLogging(lf))
402 logger->flush();
403}
404
405void
406Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
407{
408 if (TcpLogger *logger = StillLogging(lf))
409 logger->logRecord(buf, len);
410}
411
412void
413Log::TcpLogger::StartLine(Logfile * lf)
414{
415}
416
417void
418Log::TcpLogger::EndLine(Logfile * lf)
419{
420 if (!Config.onoff.buffered_logs)
421 Flush(lf);
422}
423
424void
425Log::TcpLogger::Rotate(Logfile * lf)
426{
427}
428
429void
430Log::TcpLogger::Close(Logfile * lf)
431{
432 if (TcpLogger *logger = StillLogging(lf)) {
433 debugs(50, 3, "Closing " << logger);
434 typedef NullaryMemFunT<TcpLogger> Dialer;
435 Dialer dialer(logger, &Log::TcpLogger::endGracefully);
436 AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
437 ScheduleCallHere(call);
438 }
439 delete static_cast<Pointer*>(lf->data);
440 lf->data = NULL;
441}
442
443/*
444 * This code expects the path to be //host:port
445 */
446int
447Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
448{
449 assert(!StillLogging(lf));
450 debugs(5, 3, "Tcp Open called");
451
452 Ip::Address addr;
453
454 if (strncmp(path, "//", 2) == 0)
455 path += 2;
456 char *strAddr = xstrdup(path);
457 if (!GetHostWithPort(strAddr, &addr)) {
458 if (lf->flags.fatal) {
459 fatalf("Invalid TCP logging address '%s'\n", lf->path);
460 } else {
461 debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'");
462 safe_free(strAddr);
463 return FALSE;
464 }
465 }
466 safe_free(strAddr);
467
468 TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
469 lf->data = new Pointer(logger);
470 lf->f_close = &Close;
471 lf->f_linewrite = &WriteLine;
472 lf->f_linestart = &StartLine;
473 lf->f_lineend = &EndLine;
474 lf->f_flush = &Flush;
475 lf->f_rotate = &Rotate;
476 AsyncJob::Start(logger);
477
478 return 1;
479}