Input data should look something like this::
- [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx 5: IFP c0 01 ...
+ [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx 5: IFP c0 ...
Output data will look like a valid pcap file ;-)
or the git master branch: https://github.com/SIPp/sipp
-Author: Walter Doekes, OSSO B.V. (2013,2015,2016)
+Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019)
License: Public Domain
'''
from base64 import b16decode
+from collections import namedtuple
from datetime import datetime, timedelta
from re import search
from time import mktime
from struct import pack
+import os
import sys
EMPTY_RECOVERY = False
+IFP = namedtuple('IFP', 'date seqno data') # datetime, int, bytearray
+
+
def n2b(text):
- return b16decode(text.replace(' ', '').replace('\n', '').upper())
+ """
+ Convert "aa bb cc" to bytearray('\xaa\xbb\xcc').
+ """
+ return bytearray(
+ b16decode(text.replace(' ', '').replace('\n', '').upper()))
+
+
+class SkipPacket(Exception):
+ pass
class FaxPcap(object):
- PCAP_PREAMBLE = n2b('d4 c3 b2 a1 02 00 04 00'
- '00 00 00 00 00 00 00 00'
- 'ff ff 00 00 71 00 00 00')
+ PCAP_PREAMBLE = n2b(
+ 'd4 c3 b2 a1 02 00 04 00'
+ '00 00 00 00 00 00 00 00'
+ 'ff ff 00 00 71 00 00 00')
def __init__(self, outfile):
self.outfile = outfile
self.date = None
- self.dateoff = timedelta(seconds=0)
self.seqno = None
self.udpseqno = 128
self.prev_data = None
# Only do this if at pos 0?
- self.outfile.write(self.PCAP_PREAMBLE)
-
- def data2packet(self, date, udpseqno, seqno, data, prev_data):
- sum16 = '\x43\x21' # checksum is irrelevant for sipp sending
-
- new_prev = data # without seqno..
- data = '%s%s' % (pack('>H', seqno), data)
- if prev_data:
- if LOSSY and (seqno % 3) == 2:
- return '', new_prev
- if EMPTY_RECOVERY:
- # struct ast_frame f[16], we have room for a few
- # packets.
- packets = 14
- data += '\x00%c%s%s' % (
- chr(packets + 1), '\x00' * packets, prev_data)
- else:
- # Add 1 previous packet, without the seqno.
- data += '\x00\x01' + prev_data
- kwargs = {'udpseqno': pack('>H', udpseqno), 'sum16': sum16}
+ def add(self, ifp):
+ """
+ Add the IFP packet.
+
+ T.38 basic format of UDPTL payload section with redundancy:
+
+ UDPTL_SEQNO
+ - 2 sequence number (big endian)
+ UDPTL_PRIMARY_PAYLOAD (T30?)
+ - 1 subpacket length (excluding this byte)
+ - 1 type of message (e.g. 0xd0 for data(?))
+ - 1 items in data field (e.g. 0x01)
+ - 2 length of data (big endian)
+ - N data
+ RECOVERY (optional)
+ - 2 count of previous seqno packets (big endian)
+ - N UDPTL_PRIMARY_PAYLOAD of (seqno-1)
+ - N UDPTL_PRIMARY_PAYLOAD of (seqno-2)
+ - ...
+ """
+ # First packet?
+ if self.seqno is None:
+ # Add preamble.
+ self._add_preamble()
+ # Start a second late (optional).
+ self._add_garbage(ifp.date)
- kwargs['data'] = data
- kwargs['lenb16'] = pack('>H', len(kwargs['data']) + 8)
- udp = '\x00\x01\x00\x02%(lenb16)s%(sum16)s%(data)s' % kwargs
+ # Set sequence, and fill with missing leading zeroes.
+ self.seqno = 0
+ for i in range(ifp.seqno):
+ self.add(IFP(date=ifp.date, seqno=i, data=bytearray([0])))
- kwargs['data'] = udp
- kwargs['lenb16'] = pack('>H', len(kwargs['data']) + 20)
- ip = ('\x45\xb8%(lenb16)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s\x01'
- '\x01\x01\x01\x02\x02\x02\x02%(data)s') % kwargs
+ # Auto-increasing dates
+ if self.date is None or ifp.date > self.date:
+ self.date = ifp.date
+ elif ifp.date < self.date.replace(microsecond=0):
+ assert False, 'More packets than expected in 1s? {!r}/{!r}'.format(
+ ifp.date, self.date)
+ else:
+ self.date += timedelta(microseconds=9000)
- kwargs['data'] = ip
- frame = ('\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00'
- '\x08\x00%(data)s') % kwargs
+ # Add packet.
+ self.seqno = ifp.seqno
+ try:
+ self.outfile.write(self._make_packet(ifp.data))
+ except SkipPacket:
+ pass
- kwargs['data'] = frame
- sec = mktime(date.timetuple())
- msec = date.microsecond
- datalen = len(kwargs['data'])
- kwargs['pre'] = pack('<IIII', sec, msec, datalen, datalen)
- packet = '%(pre)s%(data)s' % kwargs
+ def _add_preamble(self):
+ self.outfile.write(self.PCAP_PREAMBLE)
- return (packet, new_prev)
+ def _add_garbage(self, date):
+ if self.date is None or date > self.date:
+ self.date = date
- def add(self, date, seqno, data):
- if self.seqno is None:
- self.seqno = 0
- for i in range(seqno):
- # In case the first zeroes were dropped, add them.
- self.add(date, i, '\x00')
- assert seqno == self.seqno, '%s != %s' % (seqno, self.seqno)
+ self.seqno = 0xffff
+ self.outfile.write(self._make_packet(
+ bytearray(b'GARBAGE'), is_ifp=False))
- # Data is prepended by len(data).
- data = chr(len(data)) + data
+ def _make_packet(self, ifp_data, is_ifp=True):
+ sum16 = bytearray(b'\x43\x21') # the OS fixes the checksums for us
- # Auto-increasing dates
- if self.date is None or date > self.date:
- # print 'date is larger', date, self.date
- self.date = date
- elif (date < self.date.replace(microsecond=0)):
- assert False, ('We increased too fast.. decrease delta: %r/%r' %
- (date, self.date))
+ data = bytearray()
+ if is_ifp:
+ data.append(len(ifp_data)) # length
+ data.extend(ifp_data) # data
+ self.prev_data, prev_data = data[:], self.prev_data
else:
- self.date += timedelta(microseconds=9000)
+ data.extend(ifp_data)
+ prev_data = None
- print(seqno, '\t', self.date + self.dateoff)
+ if prev_data:
+ if LOSSY and (self.seqno % 3) == 2:
+ self.udpseqno += 1
+ raise SkipPacket()
- # Make packet.
- packet, prev_data = self.data2packet(self.date + self.dateoff,
- self.udpseqno, self.seqno,
- data, self.prev_data)
- self.outfile.write(packet)
+ if EMPTY_RECOVERY:
+ # struct ast_frame f[16], we have room for a few
+ # packets.
+ packets = 14
+ data.extend([0, packets + 1] + [0] * packets)
+ data.extend(prev_data)
+ else:
+ # Add 1 previous packet, without the seqno.
+ data.extend([0, 1])
+ data.extend(prev_data)
+
+ # Wrap it in UDP
+ udp = bytearray(
+ b'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % {
+ b'len': pack('>H', len(data) + 10),
+ b'sum16': sum16,
+ b'seqno': pack('>H', self.seqno),
+ b'data': data})
+
+ # Wrap it in IP
+ ip = bytearray(
+ b'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s'
+ b'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % {
+ b'len': pack('>H', len(udp) + 20),
+ b'udpseqno': pack('>H', self.udpseqno),
+ b'sum16': sum16,
+ b'udp': udp})
+
+ # Wrap it in Ethernet
+ ethernet = bytearray(
+ b'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00'
+ b'\x08\x00%(ip)s' % {b'ip': ip})
+
+ # Wrap it in a pcap packet
+ packet = bytearray(b'%(prelude)s%(ethernet)s' % {
+ b'prelude': pack(
+ '<IIII', int(mktime(self.date.timetuple())),
+ self.date.microsecond, len(ethernet), len(ethernet)),
+ b'ethernet': ethernet})
# Increase values.
self.udpseqno += 1
- self.seqno += 1
- self.prev_data = prev_data
- def add_garbage(self, date):
- if self.date is None or date > self.date:
- self.date = date
+ return packet
- packet, ignored = self.data2packet(self.date, self.udpseqno,
- 0xffff, 'GARBAGE', '')
- self.udpseqno += 1
- self.outfile.write(packet)
+class SpandspLog:
+ def __init__(self, fp):
+ self._fp = fp
+
+ def __iter__(self):
+ r"""
+ Looks for lines line:
+ [2013-08-07 15:17:34] FAX[23479] res_fax.c: \
+ FLOW T.38 Rx 5: IFP c0 01 80 00 00 ff
-with open(sys.argv[1], 'r') as infile:
- with open(sys.argv[2], 'wb') as outfile:
- first = True
- p = FaxPcap(outfile)
- # p.add(datetime.now(), 0, n2b('06'))
- # p.add(datetime.now(), 1, n2b('c0 01 80 00 00 ff'))
+ And yields:
- for lineno, line in enumerate(infile):
- # Look for lines like:
- # [2013-08-07 15:17:34] FAX[23479] res_fax.c: \
- # FLOW T.38 Rx 5: IFP c0 01 80 00 00 ff
+ IFP(date=..., seqno=..., data=...)
+ """
+ prev_seqno = None
+
+ for lineno, line in enumerate(self._fp):
if 'FLOW T.38 Rx' not in line:
continue
if 'IFP' not in line:
assert match
data = n2b(match.groups()[0])
- # Have the file start a second early.
- if first:
- p.add_garbage(date)
- first = False
-
- # Add the packets.
- #
- # T.38 basic format of UDPTL payload section with redundancy:
- #
- # UDPTL_SEQNO
- # - 2 sequence number (big endian)
- # UDPTL_PRIMARY_PAYLOAD (T30?)
- # - 1 subpacket length (excluding this byte)
- # - 1 type of message (e.g. 0xd0 for data(?))
- # - 1 items in data field (e.g. 0x01)
- # - 2 length of data (big endian)
- # - N data
- # RECOVERY (optional)
- # - 2 count of previous seqno packets (big endian)
- # - N UDPTL_PRIMARY_PAYLOAD of (seqno-1)
- # - N UDPTL_PRIMARY_PAYLOAD of (seqno-2)
- # - ...
- #
- p.add(date, seqno, data)
+ if prev_seqno is not None:
+ # Expected all sequence numbers. But you can safely disable
+ # this check.
+ assert seqno == prev_seqno + 1, '%s+1 != %s' % (
+ seqno, prev_seqno)
+ pass
+ prev_seqno = seqno
+
+ yield IFP(date=date, seqno=seqno, data=data)
+
+
+def main(logname, pcapname):
+ with open(sys.argv[1], 'r') as infile:
+ log = SpandspLog(infile)
+
+ # with open(sys.argv[2], 'xb') as outfile: # py3 exclusive write, bin
+ create_or_fail = os.O_CREAT | os.O_EXCL | os.O_WRONLY
+ try:
+ fd = os.open(sys.argv[2], create_or_fail, 0o600)
+ except Exception:
+ raise
+ else:
+ with os.fdopen(fd, 'wb') as outfile:
+ pcap = FaxPcap(outfile)
+ for data in log:
+ pcap.add(data)
+
+
+if __name__ == '__main__':
+ if len(sys.argv) != 3:
+ sys.stderr.write('Usage: {} LOGFILE PCAP\n'.format(sys.argv[0]))
+ sys.exit(1)
+
+ main(sys.argv[1], sys.argv[2])