3 use strict qw(refs vars);
18 open DEBUG, ">/dev/null" or die $!;
20 while (@ARGV && $ARGV[0] =~ m/^\-./) {
24 if (s/^-h//) { $dohosts{$_}=1; last; }
25 elsif (s/^-O//) { $outpfx=$_; last; }
26 elsif (s/^-s//) { $sep=$_; last; }
27 elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
28 elsif (s/^-D/-/) { $debug++; }
29 else { die "bad option $_"; }
34 my ($sysname, $nodename) = POSIX::uname();
35 die unless defined $nodename;
36 $dohosts{$nodename}= 1;
40 my $totail= pop @ARGV;
42 if ($debug) { open DEBUG, ">& STDERR" or die $!; }
44 our @fields_in= qw(seconds accepted refused rejected duplicate
45 accepted_size duplicate_size);
46 our $tmpl_in= join ':',@fields_in;
50 DstArguments => "7200:0:U",
52 Archives => [ [ 3600*4, 60 ], # 4hr, 1min resolution
53 [ 3600*25, 180 ], # 25h, 3min resolution
54 [ 86400*14*5, 3600 ], # 14wks, 1hr resolution
55 [ 86400*370*2, 3600*24 ] ], # 2yr+, 1day resolution
59 our ($time,$host,$peer,$conn,$stats);
61 sub create_rrd ($$$) {
62 my ($inout, $path, $fields) = @_;
64 my $details= $details{$inout};
66 my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
68 push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}" foreach @$fields;
69 foreach (@{ $details->{Archives} }) {
70 my ($whole,$reso) = @$_;
71 my $steps= $reso / $details->{Step};
72 my $rows= $whole / $reso;
73 push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
75 print DEBUG join(" \\\n ", "creating @sargs", @largs),"\n";
76 RRDs::create(@sargs,@largs);
78 die "$err [@sargs @largs]" if defined $err;
81 sub get_rrd_info ($$) {
82 my ($rrdupdate, $path) = @_;
83 my $h= RRDs::info($path);
84 die "$path $! ".(RRDs::error) unless $h;
85 die "$path ?" unless $h->{'last_update'};
86 $rrdupdate->{DoneUpto}= $h->{'last_update'};
89 sub find_or_create_rrd ($$) {
90 my ($inout, $fields) = @_;
92 Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
94 if (stat $rrd->{Path}) {
95 get_rrd_info($rrd, $rrd->{Path});
97 $!==&ENOENT or die "$rrd->{Path} $!";
98 create_rrd($inout, $rrd->{Path}, $fields);
105 our @rrd_blockedupdates;
106 our $rrd_blockedupdate_time;
108 sub update_rrd ($$$$) {
109 my ($inout,$tmpl,$vals,$fields) = @_;
111 my $rrd= $rrds{$host,$peer,$inout};
113 $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout, $fields);
114 $rrd->{Template}= $tmpl;
116 return if $time <= $rrd->{DoneUpto};
118 my $blocked= $rrd->{BlockedUpdate};
119 if (defined $blocked) {
120 for (my $ix=0; $ix<@$vals; $ix++) {
121 my $old= $blocked->[$ix];
122 my $new= $vals->[$ix];
123 $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
127 $rrd->{BlockedUpdate}= $vals;
128 $rrd_blockedupdate_time= $time;
129 push @rrd_blockedupdates, $rrd;
132 sub actually_update_rrds () {
133 return unless defined $rrd_blockedupdate_time;
134 return if $time == $rrd_blockedupdate_time;
136 while (my $rrd= shift @rrd_blockedupdates) {
137 my $vals= $rrd->{BlockedUpdate};
139 delete $rrd->{BlockedUpdate};
141 my @args= ($rrd->{Path}, '--template',$rrd->{Template},
142 join(':',$rrd_blockedupdate_time,@$vals));
143 print DEBUG "update @args\n" if $debug>=2;
145 my $err= RRDs::error;
146 die "$err [@args]" if defined $err;
149 $rrd_blockedupdate_time= undef;
154 sub inbound_connected () {
155 print DEBUG "inbound connected $host $peer $conn\n" if $debug>=2;
156 $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
158 sub inbound_closed () {
159 print DEBUG "inbound closed $host $peer $conn\n" if $debug>=2;
160 delete $in_conns{$host,$peer,$conn};
162 sub inbound_stats () {
165 s/(?<=[a-z]) (?=[a-z])/_/g;
166 my $hpc= $in_conns{$host,$peer,$conn};
168 print DEBUG "inbound UNKNOWN $host $peer $conn $stats\n";
169 $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
171 print DEBUG "inbound stats $host $peer $conn $stats\n" if $debug>=2;
173 while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
175 foreach my $f (@fields_in) {
177 if (!defined $this) {
182 my $last= $hpc->[@v];
184 push @v, defined($last) ? $this - $last : 'U';
186 update_rrd('in',$tmpl_in,\@v,\@fields_in);
189 sub outbound_stats () {
190 print "$host $peer OUT $stats\n";
195 my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
196 my $host_re= '[-.0-9a-z]+';
197 my $conn_re= '[1-9]\d{0,5}';
198 my ($process,$pid,$msg,$cc,$sl);
199 while ($sl= $parser->next) {
200 ($time,$host,$process,$pid,$msg) = @$sl;
201 actually_update_rrds();
202 next unless exists $dohosts{$host};
203 #print join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n";
204 if ($process eq 'innd' && !defined $pid) {
205 if (($peer,$conn) = $msg =~
206 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
208 } elsif (($peer,$conn,$cc,$stats) = $msg =~
209 m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
211 inbound_closed() if $cc eq 'closed';
213 } elsif ($process eq 'innduct') {
214 if (($peer,$stats) = $msg =~
215 m/^($host_re)\| (?:completed|processed) \S+ (read=.*)$/) {
222 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+)
225 foreach my $staticpath (@ARGV) {
226 if ($staticpath =~ m/\.gz$/) {
227 my $fh= new IO::Handle;
228 open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
230 !$fh->error or die "$staticpath $!";
231 $!=0;$?=0; close $fh or die "$staticpath $! $?";
233 my $fh= new IO::File $staticpath, '<' or die $!;
235 !$fh->error or die "$staticpath $!";
236 close $fh or die "$staticpath $!";
240 exit 0 if $totail eq '';
242 my $tailer= new File::Tail name=>$totail,
243 interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1