chiark / gitweb /
Cleaner subprocess shutdow in trackdb_deinit().
[disorder] / clients / rtpmon.c
1 /*
2  * This file is part of DisOrder.
3  * Copyright (C) 2009 Richard Kettlewell
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * 
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 /** @file clients/rtpmon.c
19  * @brief RTP monitor
20  *
21  * This progam monitors the rate at which data arrives by RTP and
22  * constantly display it.  It is intended for debugging only.
23  *
24  * TODO de-dupe with playrtp.
25  */
26 #include "common.h"
27
28 #include <getopt.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <netdb.h>
32 #include <netinet/in.h>
33 #include <sys/time.h>
34 #include <unistd.h>
35 #include <locale.h>
36 #include <errno.h>
37 #include <stdlib.h>
38 #include <sys/uio.h>
39
40 #include "syscalls.h"
41 #include "timeval.h"
42 #include "mem.h"
43 #include "log.h"
44 #include "version.h"
45 #include "addr.h"
46 #include "configuration.h"
47 #include "rtp.h"
48
49 /** @brief Record of one packet */
50 struct entry {
51   /** @brief When packet arrived */
52   struct timeval when;
53
54   /** @brief Serial number of first sample */
55   uint32_t serial;
56 };
57
58 /** @brief Bytes per frame */
59 static unsigned bpf = 4;
60
61 /** @brief Frame serial number */
62 static uint32_t serial;
63
64 /** @brief Size of ring buffer */
65 #define RINGSIZE 131072
66
67 /** @brief Ring buffer */
68 static struct entry ring[RINGSIZE];
69
70 /** @brief Where new packets join the ring */
71 static unsigned ringtail;
72
73 static const struct option options[] = {
74   { "help", no_argument, 0, 'h' },
75   { "version", no_argument, 0, 'V' },
76   { "bpf", required_argument, 0, 'b' },
77   { 0, 0, 0, 0 }
78 };
79
80 static void help(void) {
81   xprintf("Usage:\n"
82           "  rtpmon [OPTIONS] [ADDRESS] PORT\n"
83           "Options:\n"
84           "  --bpf, -b               Bytes/frame (default 4)\n"
85           "  --help, -h              Display usage message\n"
86           "  --version, -V           Display version number\n"
87           );
88   xfclose(stdout);
89   exit(0);
90 }
91
92 /** @brief Compute the rate by sampling at two points in the ring buffer */
93 static double rate(unsigned earlier, unsigned later) {
94   const uint32_t frames = ring[later].serial - ring[earlier].serial;
95   const int64_t us = tvsub_us(ring[later].when, ring[earlier].when);
96
97   if(us)  
98     return 1000000.0 * frames / us;
99   else
100     return 0.0;
101 }
102
103 /** @brief Called to say we received some bytes
104  * @param when When we received them
105  * @param n How many frames of audio data we received
106  */
107 static void frames(const struct timeval *when, size_t n) {
108   const time_t prev = ring[(ringtail - 1) % RINGSIZE].when.tv_sec;
109
110   ring[ringtail].when = *when;
111   ring[ringtail].serial = serial;
112   serial += n;
113   ringtail = (ringtail + 1) % RINGSIZE;
114   // Report once a second
115   if(prev != when->tv_sec) {
116     if(printf("%8.2f  %8.2f  %8.2f  %8.2f  %8.2f  %8.2f  %8.2f\n",
117               rate((ringtail - RINGSIZE / 128) % RINGSIZE,
118                    (ringtail - 1) % RINGSIZE),
119               rate((ringtail - RINGSIZE / 64) % RINGSIZE,
120                    (ringtail - 1) % RINGSIZE),
121               rate((ringtail - RINGSIZE / 32) % RINGSIZE,
122                    (ringtail - 1) % RINGSIZE),
123               rate((ringtail - RINGSIZE / 16) % RINGSIZE,
124                    (ringtail - 1) % RINGSIZE),
125               rate((ringtail - RINGSIZE / 8) % RINGSIZE,
126                    (ringtail - 1) % RINGSIZE),
127               rate((ringtail - RINGSIZE / 4) % RINGSIZE,
128                    (ringtail - 1) % RINGSIZE),
129               rate((ringtail - RINGSIZE / 2) % RINGSIZE,
130                    (ringtail - 1) % RINGSIZE)) < 0
131        || fflush(stdout) < 0)
132       fatal(errno, "stdout");
133   }
134 }
135
136 int main(int argc, char **argv) {
137   int n;
138   struct addrinfo *res;
139   struct stringlist sl;
140   struct ip_mreq mreq;
141   struct ipv6_mreq mreq6;
142   int rtpfd;
143   char *sockname;
144   int is_multicast;
145   union any_sockaddr {
146     struct sockaddr sa;
147     struct sockaddr_in in;
148     struct sockaddr_in6 in6;
149   };
150   union any_sockaddr mgroup;
151
152   static const struct addrinfo prefs = {
153     .ai_flags = AI_PASSIVE,
154     .ai_family = PF_INET,
155     .ai_socktype = SOCK_DGRAM,
156     .ai_protocol = IPPROTO_UDP
157   };
158   static const int one = 1;
159
160   mem_init();
161   if(!setlocale(LC_CTYPE, "")) 
162     fatal(errno, "error calling setlocale");
163   while((n = getopt_long(argc, argv, "hVb:", options, 0)) >= 0) {
164     switch(n) {
165     case 'h': help();
166     case 'V': version("rtpmon");
167     case 'b': bpf = atoi(optarg); break;
168     default: fatal(0, "invalid option");
169     }
170   }
171   argc -= optind;
172   argv += optind;
173   switch(argc) {
174   case 1:
175   case 2:
176     /* Use command-line ADDRESS+PORT or just PORT */
177     sl.n = argc;
178     sl.s = argv;
179     break;
180   default:
181     fatal(0, "usage: rtpmon [OPTIONS] [ADDRESS] PORT");
182   }
183   if(!(res = get_address(&sl, &prefs, &sockname)))
184     exit(1);
185   /* Create the socket */
186   if((rtpfd = socket(res->ai_family,
187                      res->ai_socktype,
188                      res->ai_protocol)) < 0)
189     fatal(errno, "error creating socket");
190   /* Allow multiple listeners */
191   xsetsockopt(rtpfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one);
192   is_multicast = multicast(res->ai_addr);
193   /* The multicast and unicast/broadcast cases are different enough that they
194    * are totally split.  Trying to find commonality between them causes more
195    * trouble that it's worth. */
196   if(is_multicast) {
197     /* Stash the multicast group address */
198     memcpy(&mgroup, res->ai_addr, res->ai_addrlen);
199     switch(res->ai_addr->sa_family) {
200     case AF_INET:
201       mgroup.in.sin_port = 0;
202       break;
203     case AF_INET6:
204       mgroup.in6.sin6_port = 0;
205       break;
206     default:
207       fatal(0, "unsupported family %d", (int)res->ai_addr->sa_family);
208     }
209     /* Bind to to the multicast group address */
210     if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
211       fatal(errno, "error binding socket to %s", format_sockaddr(res->ai_addr));
212     /* Add multicast group membership */
213     switch(mgroup.sa.sa_family) {
214     case PF_INET:
215       mreq.imr_multiaddr = mgroup.in.sin_addr;
216       mreq.imr_interface.s_addr = 0;      /* use primary interface */
217       if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
218                     &mreq, sizeof mreq) < 0)
219         fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP");
220       break;
221     case PF_INET6:
222       mreq6.ipv6mr_multiaddr = mgroup.in6.sin6_addr;
223       memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface);
224       if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP,
225                     &mreq6, sizeof mreq6) < 0)
226         fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP");
227       break;
228     default:
229       fatal(0, "unsupported address family %d", res->ai_family);
230     }
231     /* Report what we did */
232     info("listening on %s multicast group %s",
233          format_sockaddr(res->ai_addr), format_sockaddr(&mgroup.sa));
234   } else {
235     /* Bind to 0/port */
236     switch(res->ai_addr->sa_family) {
237     case AF_INET: {
238       struct sockaddr_in *in = (struct sockaddr_in *)res->ai_addr;
239       
240       memset(&in->sin_addr, 0, sizeof (struct in_addr));
241       if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
242         fatal(errno, "error binding socket to 0.0.0.0 port %d",
243               ntohs(in->sin_port));
244       break;
245     }
246     case AF_INET6: {
247       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)res->ai_addr;
248       
249       memset(&in6->sin6_addr, 0, sizeof (struct in6_addr));
250       break;
251     }
252     default:
253       fatal(0, "unsupported family %d", (int)res->ai_addr->sa_family);
254     }
255     if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0)
256       fatal(errno, "error binding socket to %s", format_sockaddr(res->ai_addr));
257     /* Report what we did */
258     info("listening on %s", format_sockaddr(res->ai_addr));
259   }
260   for(;;) {
261     struct rtp_header header;
262     char buffer[4096];
263     struct iovec iov[2];
264     struct timeval when;
265
266     iov[0].iov_base = &header;
267     iov[0].iov_len = sizeof header;
268     iov[1].iov_base = buffer;
269     iov[1].iov_len = sizeof buffer;
270     n = readv(rtpfd, iov, 2);
271     gettimeofday(&when, 0);
272     if(n < 0) {
273       switch(errno) {
274       case EINTR:
275         continue;
276       default:
277         fatal(errno, "error reading from socket");
278       }
279     }
280     if((size_t)n <= sizeof (struct rtp_header)) {
281       info("ignored a short packet");
282       continue;
283     }
284     frames(&when, (n - sizeof header) / bpf);
285   }
286 }
287
288 /*
289 Local Variables:
290 c-basic-offset:2
291 comment-column:40
292 fill-column:79
293 indent-tabs-mode:nil
294 End:
295 */