From bae5c928c16c32644eb69eeabd4d661e083a6915 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 1 Sep 2012 17:44:39 +0000 Subject: [PATCH] Add latency plugin. --- collecty/ping.py | 323 +++++++++++++++++++++++++++++++++++ collecty/plugins/__init__.py | 2 + collecty/plugins/latency.py | 133 +++++++++++++++ 3 files changed, 458 insertions(+) create mode 100644 collecty/ping.py create mode 100644 collecty/plugins/latency.py diff --git a/collecty/ping.py b/collecty/ping.py new file mode 100644 index 0000000..7982b6a --- /dev/null +++ b/collecty/ping.py @@ -0,0 +1,323 @@ +#!/usr/bin/python + +from __future__ import division + +import array +import math +import os +import random +import select +import socket +import struct +import sys +import time + +ICMP_TYPE_ECHO_REPLY = 0 +ICMP_TYPE_ECHO_REQUEST = 8 +ICMP_MAX_RECV = 2048 + +MAX_SLEEP = 1000 + +class PingError(Exception): + msg = None + + +class PingResolveError(PingError): + msg = "Could not resolve hostname" + + +class Ping(object): + def __init__(self, destination, timeout=1000, packet_size=56): + self.destination = self._resolve(destination) + self.timeout = timeout + self.packet_size = packet_size + + self.id = os.getpid() & 0xffff # XXX ? Is this a good idea? + + self.seq_number = 0 + + # Number of sent packets. + self.send_count = 0 + + # Save the delay of all responses. + self.times = [] + + def run(self, count=None, deadline=None): + while True: + delay = self.do() + + self.seq_number += 1 + + if count and self.seq_number >= count: + break + + if deadline and self.total_time >= deadline: + break + + if delay == None: + delay = 0 + + if MAX_SLEEP > delay: + time.sleep((MAX_SLEEP - delay) / 1000) + + def do(self): + s = None + try: + # Open a socket for ICMP communication. + s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) + + # Send one packet. + send_time = self.send_icmp_echo_request(s) + + # Increase number of sent packets (even if it could not be sent). + self.send_count += 1 + + # If the packet could not be sent, we may stop here. + if send_time is None: + return + + # Wait for the reply. + receive_time, packet_size, ip, ip_header, icmp_header = self.receive_icmp_echo_reply(s) + + finally: + # Close the socket. + if s: + s.close() + + # If a packet has been received... + if receive_time: + delay = (receive_time - send_time) * 1000 + self.times.append(delay) + + return delay + + def send_icmp_echo_request(self, s): + # Header is type (8), code (8), checksum (16), id (16), sequence (16) + checksum = 0 + + # Create a header with checksum == 0. + header = struct.pack("!BBHHH", ICMP_TYPE_ECHO_REQUEST, 0, + checksum, self.id, self.seq_number) + + # Get some bytes for padding. + padding = os.urandom(self.packet_size) + + # Calculate the checksum for header + padding data. + checksum = self._calculate_checksum(header + padding) + + # Rebuild the header with the new checksum. + header = struct.pack("!BBHHH", ICMP_TYPE_ECHO_REQUEST, 0, + checksum, self.id, self.seq_number) + + # Build the packet. + packet = header + padding + + # Save the time when the packet has been sent. + send_time = time.time() + + # Send the packet. + try: + s.sendto(packet, (self.destination, 0)) + except socket.error, (errno, msg): + if errno == 1: # Operation not permitted + # The packet could not be sent, probably because of + # wrong firewall settings. + return + + return send_time + + def receive_icmp_echo_reply(self, s): + timeout = self.timeout / 1000.0 + + # Wait until the reply packet arrived or until we hit timeout. + while True: + select_start = time.time() + + inputready, outputready, exceptready = select.select([s], [], [], timeout) + select_duration = (time.time() - select_start) + + if inputready == []: # Timeout + return None, 0, 0, 0, 0 + + # Save the time when the packet has been received. + receive_time = time.time() + + # Read the packet from the socket. + packet_data, address = s.recvfrom(ICMP_MAX_RECV) + + # Parse the ICMP header. + icmp_header = self._header2dict( + ["type", "code", "checksum", "packet_id", "seq_number"], + "!BBHHH", packet_data[20:28] + ) + + # This is the reply to our packet if the ID matches. + if icmp_header["packet_id"] == self.id: + # Parse the IP header. + ip_header = self._header2dict( + ["version", "type", "length", "id", "flags", + "ttl", "protocol", "checksum", "src_ip", "dst_ip"], + "!BBHHHBBHII", packet_data[:20] + ) + + packet_size = len(packet_data) - 28 + ip = socket.inet_ntoa(struct.pack("!I", ip_header["src_ip"])) + + return receive_time, packet_size, ip, ip_header, icmp_header + + # Check if the timeout has already been hit. + timeout = timeout - select_duration + if timeout <= 0: + return None, 0, 0, 0, 0 + + def _header2dict(self, names, struct_format, data): + """ + Unpack tghe raw received IP and ICMP header informations to a dict + """ + unpacked_data = struct.unpack(struct_format, data) + return dict(zip(names, unpacked_data)) + + def _calculate_checksum(self, source_string): + if len(source_string) % 2: + source_string += "\x00" + + converted = array.array("H", source_string) + if sys.byteorder == "big": + converted.byteswap() + + val = sum(converted) + + # Truncate val to 32 bits (a variance from ping.c, which uses signed + # integers, but overflow is unlinkely in ping). + val &= 0xffffffff + + # Add high 16 bits to low 16 bits. + val = (val >> 16) + (val & 0xffff) + + # Add carry from above (if any). + val += (val >> 16) + + # Invert and truncate to 16 bits. + answer = ~val & 0xffff + + return socket.htons(answer) + + def _resolve(self, host): + """ + Resolve host. + """ + if self._is_valid_ipv4_address(host): + return host + + try: + return socket.gethostbyname(host) + except PingResolvError: + raise PingResolveError + + def _is_valid_ipv4_address(self, addr): + """ + Check addr to be a valid IPv4 address. + """ + parts = addr.split(".") + + if not len(parts) == 4: + return False + + for part in parts: + try: + number = int(part) + except ValueError: + return False + + if number > 255: + return False + + return True + + @property + def receive_count(self): + """ + The number of received packets. + """ + return len(self.times) + + @property + def total_time(self): + """ + The total time of all roundtrips. + """ + try: + return sum(self.times) + except ValueError: + return + + @property + def min_time(self): + """ + The smallest roundtrip time. + """ + try: + return min(self.times) + except ValueError: + return + + @property + def max_time(self): + """ + The biggest roundtrip time. + """ + try: + return max(self.times) + except ValueError: + return + + @property + def avg_time(self): + """ + Calculate the average response time. + """ + try: + return self.total_time / self.receive_count + except ZeroDivisionError: + return + + @property + def variance(self): + """ + Calculate the variance of all roundtrips. + """ + if self.avg_time is None: + return + + var = 0 + + for t in self.times: + var += (t - self.avg_time) ** 2 + + var /= self.receive_count + return var + + @property + def stddev(self): + """ + Standard deviation of all roundtrips. + """ + return math.sqrt(self.variance) + + @property + def loss(self): + """ + Outputs the percentage of dropped packets. + """ + dropped = self.send_count - self.receive_count + + return dropped / self.send_count + + +if __name__ == "__main__": + p = Ping("ping.ipfire.org") + p.run(count=5) + + print "Min/Avg/Max/Stddev: %.2f/%.2f/%.2f/%.2f" % \ + (p.min_time, p.avg_time, p.max_time, p.stddev) + print "Sent/Recv/Loss: %d/%d/%.2f" % (p.send_count, p.receive_count, p.loss) diff --git a/collecty/plugins/__init__.py b/collecty/plugins/__init__.py index 1e85485..172c9e1 100644 --- a/collecty/plugins/__init__.py +++ b/collecty/plugins/__init__.py @@ -24,6 +24,7 @@ from base import Timer import cpu import entropy import interface +import latency import loadavg import memory @@ -31,6 +32,7 @@ data_sources = [ cpu.DataSourceCPU, entropy.DataSourceEntropy, interface.DataSourceInterface, + latency.DataSourceLatency, loadavg.DataSourceLoadAvg, memory.DataSourceMemory, ] diff --git a/collecty/plugins/latency.py b/collecty/plugins/latency.py new file mode 100644 index 0000000..9b55bb9 --- /dev/null +++ b/collecty/plugins/latency.py @@ -0,0 +1,133 @@ +#!/usr/bin/python +############################################################################### +# # +# collecty - A system statistics collection daemon for IPFire # +# Copyright (C) 2012 IPFire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +import collecty.ping + +import base + +from ..i18n import _ + +PING_HOSTS = [ + "ping.ipfire.org", +] + +class GraphTemplateLatency(base.GraphTemplate): + name = "latency" + + @property + def rrd_graph(self): + return [ + "DEF:latency=%(file)s:latency:AVERAGE", + "DEF:latency_loss=%(file)s:latency_loss:AVERAGE", + "DEF:latency_stddev=%(file)s:latency_stddev:AVERAGE", + + # Compute loss in percentage. + "CDEF:latency_ploss=latency_loss,100,*", + + # Compute standard deviation. + "CDEF:stddev1=latency,latency_stddev,+", + "CDEF:stddev2=latency,latency_stddev,-", + + "CDEF:l005=latency_ploss,0,5,LIMIT,UN,UNKN,INF,IF", + "CDEF:l010=latency_ploss,5,10,LIMIT,UN,UNKN,INF,IF", + "CDEF:l025=latency_ploss,10,25,LIMIT,UN,UNKN,INF,IF", + "CDEF:l050=latency_ploss,25,50,LIMIT,UN,UNKN,INF,IF", + "CDEF:l100=latency_ploss,50,100,LIMIT,UN,UNKN,INF,IF", + + "AREA:l005#ffffff:%s" % _("0-5%%"), + "AREA:l010#000000:%s" % _("5-10%%"), + "AREA:l025#ff0000:%s" % _("10-25%%"), + "AREA:l050#00ff00:%s" % _("25-50%%"), + "AREA:l100#0000ff:%s" % _("50-100%%") + "\\n", + + "LINE1:stddev1#00660088", + "LINE1:stddev2#00660088", + + "LINE3:latency#ff0000:%s" % _("Latency"), + "VDEF:latencymin=latency,MINIMUM", + "VDEF:latencymax=latency,MAXIMUM", + "VDEF:latencyavg=latency,AVERAGE", + "GPRINT:latencymax:%12s\:" % _("Maximum") + " %6.2lf", + "GPRINT:latencymin:%12s\:" % _("Minimum") + " %6.2lf", + "GPRINT:latencyavg:%12s\:" % _("Average") + " %6.2lf\\n", + + "LINE1:latencyavg#000000:%s" % _("Average latency"), + ] + + @property + def rrd_graph_args(self): + return [ + "--title", _("Latency to %(host)s"), + "--vertical-label", _("Milliseconds"), + + "--lower-limit", "0", "--rigid", + ] + + +class DataSourceLatency(base.DataSource): + name = "latency" + description = "Latency (ICMP ping) Data Source" + + templates = [GraphTemplateLatency,] + + rrd_schema = [ + "DS:latency:GAUGE:0:U", + "DS:latency_loss:GAUGE:0:100", + "DS:latency_stddev:GAUGE:0:U", + ] + + @property + def id(self): + return "-".join((self.name, self.host)) + + @classmethod + def autocreate(cls, collecty, **kwargs): + ret = [] + for host in PING_HOSTS: + ds = cls(collecty, host=host, **kwargs) + ret.append(ds) + + return ret + + def init(self, **kwargs): + self.host = kwargs.get("host") + assert self.host + + @property + def deadline(self): + return self.interval - 10 + + def read(self): + # Send up to five ICMP echo requests. + try: + ping = collecty.ping.Ping(destination=self.host, timeout=20000) + ping.run(count=5, deadline=self.deadline) + + except collecty.ping.PingError, e: + self.log.warning(_("Could not run latency check for %(host)s: %(msg)s") \ + % { "host" : self.host, "msg" : e.msg }) + return + + return ":".join(( + "%.10f" % ping.avg_time, + "%.10f" % ping.loss, + "%.10f" % ping.stddev, + )) -- 2.39.2