Asterisk - The Open Source Telephony Project  18.5.0
spandspflow2pcap.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # vim: set ts=8 sw=4 sts=4 et ai tw=79:
3 '''
4 Usage: ./spandspflow2pcap.py SPANDSP_LOG SENDFAX_PCAP
5 
6 Takes a log from Asterisk with SpanDSP, extracts the "received" data
7 and puts it in a pcap file. Use 'fax set debug on' and configure
8 logger.conf to get fax logs.
9 
10 Input data should look something like this::
11 
12  [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx 5: IFP c0 ...
13 
14 Output data will look like a valid pcap file ;-)
15 
16 This allows you to reconstruct received faxes into replayable pcaps.
17 
18 Replaying is expected to be done by SIPp with sipp-sendfax.xml. The
19 SIPp binary used for replaying must have image (fax) support. This means
20 you'll need a version higher than 3.5.0 (unreleased when writing this),
21 or the git master branch: https://github.com/SIPp/sipp
22 
23 
24 Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019)
25 License: Public Domain
26 '''
27 from base64 import b16decode
28 from collections import namedtuple
29 from datetime import datetime, timedelta
30 from re import search
31 from time import mktime
32 from struct import pack
33 import os
34 import sys
35 
36 
37 LOSSY = False
38 EMPTY_RECOVERY = False
39 
40 
41 IFP = namedtuple('IFP', 'date seqno data') # datetime, int, bytearray
42 
43 
44 def n2b(text):
45  """
46  Convert "aa bb cc" to bytearray('\xaa\xbb\xcc').
47  """
48  return bytearray(
49  b16decode(text.replace(' ', '').replace('\n', '').upper()))
50 
51 
52 class SkipPacket(Exception):
53  pass
54 
55 
56 class FaxPcap(object):
57  PCAP_PREAMBLE = n2b(
58  'd4 c3 b2 a1 02 00 04 00'
59  '00 00 00 00 00 00 00 00'
60  'ff ff 00 00 71 00 00 00')
61 
62  def __init__(self, outfile):
63  self.outfile = outfile
64  self.date = None
65  self.seqno = None
66  self.udpseqno = 128
67  self.prev_data = None
68 
69  # Only do this if at pos 0?
70 
71  def add(self, ifp):
72  """
73  Add the IFP packet.
74 
75  T.38 basic format of UDPTL payload section with redundancy:
76 
77  UDPTL_SEQNO
78  - 2 sequence number (big endian)
79  UDPTL_PRIMARY_PAYLOAD (T30?)
80  - 1 subpacket length (excluding this byte)
81  - 1 type of message (e.g. 0xd0 for data(?))
82  - 1 items in data field (e.g. 0x01)
83  - 2 length of data (big endian)
84  - N data
85  RECOVERY (optional)
86  - 2 count of previous seqno packets (big endian)
87  - N UDPTL_PRIMARY_PAYLOAD of (seqno-1)
88  - N UDPTL_PRIMARY_PAYLOAD of (seqno-2)
89  - ...
90  """
91  # First packet?
92  if self.seqno is None:
93  # Add preamble.
94  self._add_preamble()
95  # Start a second late (optional).
96  self._add_garbage(ifp.date)
97 
98  # Set sequence, and fill with missing leading zeroes.
99  self.seqno = 0
100  for i in range(ifp.seqno):
101  self.add(IFP(date=ifp.date, seqno=i, data=bytearray([0])))
102 
103  # Auto-increasing dates
104  if self.date is None or ifp.date > self.date:
105  self.date = ifp.date
106  elif ifp.date < self.date.replace(microsecond=0):
107  assert False, 'More packets than expected in 1s? {!r}/{!r}'.format(
108  ifp.date, self.date)
109  else:
110  self.date += timedelta(microseconds=9000)
111 
112  # Add packet.
113  self.seqno = ifp.seqno
114  try:
115  self.outfile.write(self._make_packet(ifp.data))
116  except SkipPacket:
117  pass
118 
119  def _add_preamble(self):
120  self.outfile.write(self.PCAP_PREAMBLE)
121 
122  def _add_garbage(self, date):
123  if self.date is None or date > self.date:
124  self.date = date
125 
126  self.seqno = 0xffff
127  self.outfile.write(self._make_packet(
128  bytearray(b'GARBAGE'), is_ifp=False))
129 
130  def _make_packet(self, ifp_data, is_ifp=True):
131  sum16 = bytearray(b'\x43\x21') # the OS fixes the checksums for us
132 
133  data = bytearray()
134  if is_ifp:
135  data.append(len(ifp_data)) # length
136  data.extend(ifp_data) # data
137  self.prev_data, prev_data = data[:], self.prev_data
138  else:
139  data.extend(ifp_data)
140  prev_data = None
141 
142  if prev_data:
143  if LOSSY and (self.seqno % 3) == 2:
144  self.udpseqno += 1
145  raise SkipPacket()
146 
147  if EMPTY_RECOVERY:
148  # struct ast_frame f[16], we have room for a few
149  # packets.
150  packets = 14
151  data.extend([0, packets + 1] + [0] * packets)
152  data.extend(prev_data)
153  else:
154  # Add 1 previous packet, without the seqno.
155  data.extend([0, 1])
156  data.extend(prev_data)
157 
158  # Wrap it in UDP
159  udp = bytearray(
160  b'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % {
161  b'len': pack('>H', len(data) + 10),
162  b'sum16': sum16,
163  b'seqno': pack('>H', self.seqno),
164  b'data': data})
165 
166  # Wrap it in IP
167  ip = bytearray(
168  b'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s'
169  b'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % {
170  b'len': pack('>H', len(udp) + 20),
171  b'udpseqno': pack('>H', self.udpseqno),
172  b'sum16': sum16,
173  b'udp': udp})
174 
175  # Wrap it in Ethernet
176  ethernet = bytearray(
177  b'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00'
178  b'\x08\x00%(ip)s' % {b'ip': ip})
179 
180  # Wrap it in a pcap packet
181  packet = bytearray(b'%(prelude)s%(ethernet)s' % {
182  b'prelude': pack(
183  '<IIII', int(mktime(self.date.timetuple())),
184  self.date.microsecond, len(ethernet), len(ethernet)),
185  b'ethernet': ethernet})
186 
187  # Increase values.
188  self.udpseqno += 1
189 
190  return packet
191 
192 
194  def __init__(self, fp):
195  self._fp = fp
196 
197  def __iter__(self):
198  r"""
199  Looks for lines line:
200 
201  [2013-08-07 15:17:34] FAX[23479] res_fax.c: \
202  FLOW T.38 Rx 5: IFP c0 01 80 00 00 ff
203 
204  And yields:
205 
206  IFP(date=..., seqno=..., data=...)
207  """
208  prev_seqno = None
209 
210  for lineno, line in enumerate(self._fp):
211  if 'FLOW T.38 Rx' not in line:
212  continue
213  if 'IFP' not in line:
214  continue
215 
216  match = search(r'(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)', line)
217  assert match
218  date = datetime(*[int(i) for i in match.groups()])
219 
220  match = search(r'Rx\s*(\d+):', line)
221  assert match
222  seqno = int(match.groups()[0])
223 
224  match = search(r': IFP ([0-9a-f ]+)', line)
225  assert match
226  data = n2b(match.groups()[0])
227 
228  if prev_seqno is not None:
229  # Expected all sequence numbers. But you can safely disable
230  # this check.
231  assert seqno == prev_seqno + 1, '%s+1 != %s' % (
232  seqno, prev_seqno)
233  pass
234  prev_seqno = seqno
235 
236  yield IFP(date=date, seqno=seqno, data=data)
237 
238 
239 def main(logname, pcapname):
240  with open(sys.argv[1], 'r') as infile:
241  log = SpandspLog(infile)
242 
243  # with open(sys.argv[2], 'xb') as outfile: # py3 exclusive write, bin
244  create_or_fail = os.O_CREAT | os.O_EXCL | os.O_WRONLY
245  try:
246  fd = os.open(sys.argv[2], create_or_fail, 0o600)
247  except Exception:
248  raise
249  else:
250  with os.fdopen(fd, 'wb') as outfile:
251  pcap = FaxPcap(outfile)
252  for data in log:
253  pcap.add(data)
254 
255 
256 if __name__ == '__main__':
257  if len(sys.argv) != 3:
258  sys.stderr.write('Usage: {} LOGFILE PCAP\n'.format(sys.argv[0]))
259  sys.exit(1)
260 
261  main(sys.argv[1], sys.argv[2])
def main(logname, pcapname)
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int replace(struct ast_channel *chan, const char *cmd, char *data, struct ast_str **buf, ssize_t len)
Definition: func_strings.c:790
def __init__(self, outfile)
def _add_garbage(self, date)
static snd_pcm_format_t format
Definition: chan_alsa.c:102
def _make_packet(self, ifp_data, is_ifp=True)