]> git.ipfire.org Git - thirdparty/squid.git/blob - src/log/TcpLogger.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / log / TcpLogger.cc
1 #include "squid.h"
2 #include "comm.h"
3 #include "comm/Connection.h"
4 #include "comm/ConnOpener.h"
5 #include "comm/Loops.h"
6 #include "comm/Write.h"
7 #include "fde.h"
8 #include "globals.h" // for shutting_down
9 #include "log/CustomLog.h"
10 #include "log/File.h"
11 #include "log/TcpLogger.h"
12 #include "MemBlob.h"
13 #include "Parsing.h"
14 #include "SquidConfig.h"
15 #include "SquidTime.h"
16
17 // a single I/O buffer should be large enough to store any access.log record
18 const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
19
20 // We need at least two buffers because when we write the first buffer,
21 // we have to use the second buffer to accumulate new entries.
22 const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize;
23
24 #define MY_DEBUG_SECTION 50 /* Log file handling */
25
26 CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger);
27
28 Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
29 AsyncJob("TcpLogger"),
30 dieOnError(dieOnErr),
31 bufferCapacity(bufCap),
32 bufferedSize(0),
33 flushDebt(0),
34 quitOnEmpty(false),
35 reconnectScheduled(false),
36 writeScheduled(false),
37 conn(NULL),
38 remote(them),
39 connectFailures(0),
40 drops(0)
41 {
42 if (bufferCapacity < BufferCapacityMin) {
43 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT,
44 "WARNING: tcp:" << remote << " logger configured buffer " <<
45 "size " << bufferCapacity << " is smaller than the " <<
46 BufferCapacityMin << "-byte" << " minimum. " <<
47 "Using the minimum instead.");
48 bufferCapacity = BufferCapacityMin;
49 }
50 }
51
52 Log::TcpLogger::~TcpLogger()
53 {
54 // make sure Comm::Write does not have our buffer pointer
55 assert(!writeScheduled);
56 }
57
58 void
59 Log::TcpLogger::start()
60 {
61 connect();
62 }
63
64 bool
65 Log::TcpLogger::doneAll() const
66 {
67 debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
68 " buffered: " << bufferedSize <<
69 " conn: " << conn << ' ' << connectFailures);
70
71 // we do not quit unless we are told that we may
72 if (!quitOnEmpty)
73 return false;
74
75 /* We were asked to quit after we are done writing buffers. Are we done? */
76
77 // If we have records but are failing to connect, quit. Otherwise, we may
78 // be trying to connect forever due to a [since fixed] misconfiguration!
79 const bool failingToConnect = !conn && connectFailures;
80 if (bufferedSize && !failingToConnect)
81 return false;
82
83 return AsyncJob::doneAll();
84 }
85
86 void
87 Log::TcpLogger::swanSong()
88 {
89 disconnect(); // optional: refcounting should close/delete conn eventually
90 AsyncJob::swanSong();
91 }
92
93 void
94 Log::TcpLogger::endGracefully()
95 {
96 // job call protection must end our job if we are done logging current bufs
97 assert(inCall != NULL);
98 quitOnEmpty = true;
99 flush();
100 }
101
102 void
103 Log::TcpLogger::flush()
104 {
105 flushDebt = bufferedSize;
106 writeIfNeeded();
107 }
108
109 void
110 Log::TcpLogger::logRecord(const char *buf, const size_t len)
111 {
112 appendRecord(buf, len);
113 writeIfNeeded();
114 }
115
116 /// starts writing if and only if it is time to write accumulated records
117 void
118 Log::TcpLogger::writeIfNeeded()
119 {
120 // write if an earlier flush command forces us to write or
121 // if we have filled at least one I/O buffer
122 if (flushDebt > 0 || buffers.size() > 1)
123 writeIfPossible();
124 }
125
126 /// starts writing if possible
127 void Log::TcpLogger::writeIfPossible()
128 {
129 debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
130 (bufferedSize > 0) << (conn != NULL) <<
131 (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " <<
132 bufferedSize << '/' << buffers.size());
133
134 // XXX: Squid shutdown sequence starts closing our connection before
135 // calling LogfileClose, leading to loss of log records during shutdown.
136 if (!writeScheduled && bufferedSize > 0 && conn != NULL &&
137 !fd_table[conn->fd].closing()) {
138 debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
139
140 typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
141 AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone);
142 const MemBlob::Pointer &buffer = buffers.front();
143 Comm::Write(conn, buffer->mem, buffer->size, callback, NULL);
144 writeScheduled = true;
145 }
146 }
147
148 /// whether len more bytes can be buffered
149 bool
150 Log::TcpLogger::canFit(const size_t len) const
151 {
152 // TODO: limit reporting frequency in addition to reporting only changes
153
154 if (bufferedSize+len <= bufferCapacity) {
155 if (drops) {
156 // We can get here if a shorter record accidentally fits after we
157 // started dropping records. When that happens, the following
158 // DBG_IMPORTANT message will mislead admin into thinking that
159 // the problem was resolved (for a brief period of time, until
160 // another record comes in and overflows the buffer). It is
161 // difficult to prevent this without also creating the opposite
162 // problem: A huge record that does not fit and is dropped blocks
163 // subsequent regular records from being buffered until we write.
164 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
165 " logger stops dropping records after " << drops << " drops" <<
166 "; current buffer use: " << (bufferedSize+len) <<
167 " out of " << bufferCapacity << " bytes");
168 }
169 return true;
170 }
171
172 if (!drops || dieOnError) {
173 debugs(MY_DEBUG_SECTION,
174 dieOnError ? DBG_CRITICAL : DBG_IMPORTANT,
175 "tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
176 "buffer overflowed; cannot fit " <<
177 (bufferedSize+len-bufferCapacity) << " bytes");
178 }
179
180 if (dieOnError)
181 fatal("tcp logger buffer overflowed");
182
183 if (!drops) {
184 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
185 " logger starts dropping records.");
186 }
187
188 return false;
189 }
190
191 /// buffer a record that might exceed IoBufSize
192 void
193 Log::TcpLogger::appendRecord(const char *record, const size_t len)
194 {
195 // they should not happen, but to be safe, let's protect drop start/stop
196 // monitoring algorithm from empty records (which can never be dropped)
197 if (!len)
198 return;
199
200 if (!canFit(len)) {
201 ++drops;
202 return;
203 }
204
205 drops = 0;
206 // append without spliting buf, unless it exceeds IoBufSize
207 for (size_t off = 0; off < len; off += IoBufSize)
208 appendChunk(record + off, min(len - off, IoBufSize));
209 }
210
211 /// buffer a record chunk without splitting it across buffers
212 void
213 Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
214 {
215 Must(len <= IoBufSize);
216 // add a buffer if there is not one that can accomodate len bytes
217 bool addBuffer = buffers.empty() ||
218 (buffers.back()->size+len > IoBufSize);
219 // also add a buffer if there is only one and that one is being written
220 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
221
222 if (addBuffer) {
223 buffers.push_back(new MemBlob(IoBufSize));
224 debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
225 }
226
227 Must(!buffers.empty());
228 buffers.back()->append(chunk, len);
229 bufferedSize += len;
230 }
231
232 /// starts [re]connecting to the remote logger
233 void
234 Log::TcpLogger::connect()
235 {
236 if (shutting_down)
237 return;
238
239 debugs(MY_DEBUG_SECTION, 3, "connecting");
240 Must(!conn);
241
242 Comm::ConnectionPointer futureConn = new Comm::Connection;
243 futureConn->remote = remote;
244 futureConn->local.setAnyAddr();
245 if (futureConn->remote.isIPv4())
246 futureConn->local.setIPv4();
247
248 typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
249 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
250 AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
251 }
252
253 /// Comm::ConnOpener callback
254 void
255 Log::TcpLogger::connectDone(const CommConnectCbParams &params)
256 {
257 if (params.flag != COMM_OK) {
258 const double delay = 0.5; // seconds
259 if (connectFailures++ % 100 == 0) {
260 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
261 " logger connection attempt #" << connectFailures <<
262 " failed. Will keep trying every " << delay << " seconds.");
263 }
264
265 if (!reconnectScheduled) {
266 reconnectScheduled = true;
267 eventAdd("Log::TcpLogger::DelayedReconnect",
268 Log::TcpLogger::DelayedReconnect,
269 new Pointer(this), 0.5, 0, false);
270 }
271 } else {
272 if (connectFailures > 0) {
273 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
274 " logger connectivity restored after " <<
275 (connectFailures+1) << " attempts.");
276 connectFailures = 0;
277 }
278
279 Must(!conn);
280 conn = params.conn;
281
282 Must(!closer);
283 typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer;
284 closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
285 comm_add_close_handler(conn->fd, closer);
286
287 writeIfNeeded();
288 }
289 }
290
291 // XXX: Needed until eventAdd() starts accepting Async calls directly.
292 /// Log::TcpLogger::delayedReconnect() wrapper.
293 void
294 Log::TcpLogger::DelayedReconnect(void *data)
295 {
296 Pointer *ptr = static_cast<Pointer*>(data);
297 assert(ptr);
298 if (TcpLogger *logger = ptr->valid()) {
299 // Get back inside AsyncJob protections by scheduling another call.
300 typedef NullaryMemFunT<TcpLogger> Dialer;
301 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer,
302 logger,
303 Log::TcpLogger::delayedReconnect);
304 ScheduleCallHere(call);
305 }
306 delete ptr;
307 }
308
309 /// "sleep a little before trying to connect again" event callback
310 void
311 Log::TcpLogger::delayedReconnect()
312 {
313 Must(reconnectScheduled);
314 Must(!conn);
315 reconnectScheduled = false;
316 connect();
317 }
318
319 /// Comm::Write callback
320 void
321 Log::TcpLogger::writeDone(const CommIoCbParams &io)
322 {
323 writeScheduled = false;
324 if (io.flag == COMM_ERR_CLOSING) {
325 debugs(MY_DEBUG_SECTION, 7, "closing");
326 // do nothing here -- our comm_close_handler will be called to clean up
327 } else if (io.flag != COMM_OK) {
328 debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
329 // keep the first buffer (the one we failed to write)
330 disconnect();
331 connect();
332 } else {
333 debugs(MY_DEBUG_SECTION, 5, "write successful");
334
335 Must(!buffers.empty()); // we had a buffer to write
336 const MemBlob::Pointer &written = buffers.front();
337 const size_t writtenSize = static_cast<size_t>(written->size);
338 // and we wrote the whole buffer
339 Must(io.size == writtenSize);
340 Must(bufferedSize >= writtenSize);
341 bufferedSize -= writtenSize;
342
343 buffers.pop_front();
344
345 if (flushDebt > io.size)
346 flushDebt -= io.size;
347 else
348 flushDebt = 0; // wrote everything we owed (or more)
349
350 writeIfNeeded();
351 }
352 }
353
354 /// This is our comm_close_handler. It is called when some external force
355 /// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
356 void
357 Log::TcpLogger::handleClosure(const CommCloseCbParams &io)
358 {
359 assert(inCall != NULL);
360 closer = NULL;
361 conn = NULL;
362 // in all current use cases, we should not try to reconnect
363 mustStop("Log::TcpLogger::handleClosure");
364 }
365
366 /// close our connection now, without flushing
367 void
368 Log::TcpLogger::disconnect()
369 {
370 if (conn != NULL) {
371 if (closer != NULL) {
372 comm_remove_close_handler(conn->fd, closer);
373 closer = NULL;
374 }
375 conn->close();
376 conn = NULL;
377 }
378 }
379
380 /// Converts Logfile into a pointer to a valid TcpLogger job or,
381 /// if the logger job has quit, into a nill pointer
382 Log::TcpLogger *
383 Log::TcpLogger::StillLogging(Logfile *lf)
384 {
385 if (Pointer *pptr = static_cast<Pointer*>(lf->data))
386 return pptr->get(); // may be nil
387 return NULL;
388 }
389
390 void
391 Log::TcpLogger::Flush(Logfile * lf)
392 {
393 if (TcpLogger *logger = StillLogging(lf))
394 logger->flush();
395 }
396
397 void
398 Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
399 {
400 if (TcpLogger *logger = StillLogging(lf))
401 logger->logRecord(buf, len);
402 }
403
404 void
405 Log::TcpLogger::StartLine(Logfile * lf)
406 {
407 }
408
409 void
410 Log::TcpLogger::EndLine(Logfile * lf)
411 {
412 if (!Config.onoff.buffered_logs)
413 Flush(lf);
414 }
415
416 void
417 Log::TcpLogger::Rotate(Logfile * lf)
418 {
419 }
420
421 void
422 Log::TcpLogger::Close(Logfile * lf)
423 {
424 if (TcpLogger *logger = StillLogging(lf)) {
425 debugs(50, 3, "Closing " << logger);
426 typedef NullaryMemFunT<TcpLogger> Dialer;
427 Dialer dialer(logger, &Log::TcpLogger::endGracefully);
428 AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
429 ScheduleCallHere(call);
430 }
431 delete static_cast<Pointer*>(lf->data);
432 lf->data = NULL;
433 }
434
435 /*
436 * This code expects the path to be //host:port
437 */
438 int
439 Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
440 {
441 assert(!StillLogging(lf));
442 debugs(5, 3, "Tcp Open called");
443
444 Ip::Address addr;
445
446 if (strncmp(path, "//", 2) == 0)
447 path += 2;
448 char *strAddr = xstrdup(path);
449 if (!GetHostWithPort(strAddr, &addr)) {
450 if (lf->flags.fatal) {
451 fatalf("Invalid TCP logging address '%s'\n", lf->path);
452 } else {
453 debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'");
454 safe_free(strAddr);
455 return FALSE;
456 }
457 }
458 safe_free(strAddr);
459
460 TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
461 lf->data = new Pointer(logger);
462 lf->f_close = &Close;
463 lf->f_linewrite = &WriteLine;
464 lf->f_linestart = &StartLine;
465 lf->f_lineend = &EndLine;
466 lf->f_flush = &Flush;
467 lf->f_rotate = &Rotate;
468 AsyncJob::Start(logger);
469
470 return 1;
471 }