3 use strict qw(refs vars);
18 open DEBUG, ">/dev/null" or die $!;
20 while (@ARGV && $ARGV[0] =~ m/^\-./) {
24 if (s/^-h//) { $dohosts{$_}=1; last; }
25 elsif (s/^-O//) { $outpfx=$_; last; }
26 elsif (s/^-s//) { $sep=$_; last; }
27 elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
28 elsif (s/^-D/-/) { $debug++; }
29 else { die "bad option $_"; }
34 my ($sysname, $nodename) = POSIX::uname();
35 die unless defined $nodename;
36 $dohosts{$nodename}= 1;
40 my $totail= pop @ARGV;
42 if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
50 DstArguments => "7200:0:U",
52 Archives => [ [ 3600*4, 60 ], # 4hr, 1min resolution
53 [ 3600*25, 180 ], # 25h, 3min resolution
54 [ 86400*14*5, 3600 ], # 14wks, 1hr resolution
55 [ 86400*370*2, 3600*24 ] ], # 2yr+, 1day resolution
58 our @fields_in= qw(seconds accepted refused rejected duplicate
59 accepted_size duplicate_size);
61 Fields => \@fields_in,
65 our @fields_out= qw(missing offered deferred
66 accepted unwanted rejected body_missing);
68 Fields => \@fields_out,
73 our ($time,$host,$peer,$conn,$stats);
76 my ($inout, $path) = @_;
78 my $details= $details{$inout};
80 my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
82 push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
83 foreach @{ $details{$inout}{Fields} };
84 foreach (@{ $details->{Archives} }) {
85 my ($whole,$reso) = @$_;
86 my $steps= $reso / $details->{Step};
87 my $rows= $whole / $reso;
88 push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
90 print DEBUG join(" \\\n ", "creating @sargs", @largs),"\n";
91 RRDs::create(@sargs,@largs);
93 die "$err [@sargs @largs]" if defined $err;
96 sub get_rrd_info ($$) {
97 my ($rrdupdate, $path) = @_;
98 my $h= RRDs::info($path);
99 die "$path $! ".(RRDs::error) unless $h;
100 die "$path ?" unless $h->{'last_update'};
101 $rrdupdate->{DoneUpto}= $h->{'last_update'};
104 sub find_or_create_rrd ($) {
107 Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
109 if (stat $rrd->{Path}) {
110 get_rrd_info($rrd, $rrd->{Path});
112 $!==&ENOENT or die "$rrd->{Path} $!";
113 create_rrd($inout, $rrd->{Path});
120 our @rrd_blockedupdates;
121 our $rrd_blockedupdate_time;
123 sub update_rrd ($$) {
124 my ($inout,$vals) = @_;
126 my $rrd= $rrds{$host,$peer,$inout};
128 $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
129 $rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
131 return if $time <= $rrd->{DoneUpto};
133 my $blocked= $rrd->{BlockedUpdate};
134 if (defined $blocked) {
135 for (my $ix=0; $ix<@$vals; $ix++) {
136 my $old= $blocked->[$ix];
137 my $new= $vals->[$ix];
138 $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
142 $rrd->{BlockedUpdate}= $vals;
143 $rrd_blockedupdate_time= $time;
144 push @rrd_blockedupdates, $rrd;
147 sub actually_update_rrds () {
148 return unless defined $rrd_blockedupdate_time;
149 return if $time == $rrd_blockedupdate_time;
151 while (my $rrd= shift @rrd_blockedupdates) {
152 my $vals= $rrd->{BlockedUpdate};
154 delete $rrd->{BlockedUpdate};
156 my @args= ($rrd->{Path}, '--template',$rrd->{Template},
157 join(':',$rrd_blockedupdate_time,@$vals));
158 print DEBUG "update @args\n" if $debug>=2;
160 my $err= RRDs::error;
161 die "$err [@args]" if defined $err;
164 $rrd_blockedupdate_time= undef;
169 sub inbound_connected () {
170 print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
171 $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
173 sub inbound_closed () {
174 print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
175 delete $in_conns{$host,$peer,$conn};
177 sub inbound_stats () {
179 s/(?<=[a-z]) (?=[a-z])/_/g;
180 my $hpc= $in_conns{$host,$peer,$conn};
182 print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
183 $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
185 print DEBUG "inbound $time stats $host $peer $conn $stats\n"
189 while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
191 foreach my $f (@fields_in) {
193 if (!defined $this) {
198 my $last= $hpc->[@v];
200 push @v, defined($last) ? $this - $last : 'U';
202 update_rrd('in',\@v);
205 sub outbound_stats () {
206 print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
208 s/missing(?=\=\d+ \()/body_missing/;
211 while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
213 foreach my $f (@fields_out) {
215 push @v, defined($this) ? $this : 'U';
217 update_rrd('out',\@v);
222 my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
223 my $host_re= '[-.0-9a-z]+';
224 my $conn_re= '[1-9]\d{0,5}';
225 my ($process,$pid,$msg,$cc,$sl);
226 while ($sl= $parser->next) {
227 ($time,$host,$process,$pid,$msg) = @$sl;
228 actually_update_rrds();
229 next unless exists $dohosts{$host};
230 print DEBUG "logfile ",
231 join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n"
233 if ($process eq 'innd' && !defined $pid) {
234 if (($peer,$conn) = $msg =~
235 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
237 } elsif (($peer,$conn,$cc,$stats) = $msg =~
238 m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
240 inbound_closed() if $cc eq 'closed';
242 } elsif ($process eq 'innduct') {
243 if (($peer,$stats) = $msg =~
244 m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
251 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+)
254 foreach my $staticpath (@ARGV) {
255 if ($staticpath =~ m/\.gz$/) {
256 my $fh= new IO::Handle;
257 open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
259 !$fh->error or die "$staticpath $!";
260 $!=0;$?=0; close $fh or die "$staticpath $! $?";
262 my $fh= new IO::File $staticpath, '<' or die $!;
264 !$fh->error or die "$staticpath $!";
265 close $fh or die "$staticpath $!";
269 exit 0 if $totail eq '';
271 my $tailer= new File::Tail name=>$totail,
272 interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1