From 1957906e9941bf1d181f338202832a8bdc526ab1 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 27 Jun 2010 18:00:56 +0100 Subject: [PATCH] appears to be able to do in --- newstailer | 105 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 24 deletions(-) diff --git a/newstailer b/newstailer index 2b482e7..6fcbc89 100755 --- a/newstailer +++ b/newstailer @@ -56,16 +56,9 @@ our %details= ( our ($time,$host,$peer,$conn,$stats); -sub path_rrd ($) { - my ($inout) = @_; - return "${outpfx}${host}${sep}${peer}_${inout}.rrd"; -} +sub create_rrd ($$$) { + my ($inout, $path, $fields) = @_; -sub perhaps_create_rrd ($$) { - my ($inout, $fields) = @_; - my $path= path_rrd($inout); - return if stat $path; - $!==&ENOENT or die "$path $!"; my $details= $details{$inout}; my @sargs= ($path, '--start','now-1y', '--step',$details->{Step}); @@ -83,25 +76,85 @@ sub perhaps_create_rrd ($$) { die "$err [@sargs @largs]" if defined $err; } -sub update_rrd ($$$) { - my ($inout,$tmpl,$vals) = @_; - my $path= path_rrd($inout); - my @args= ($path, '--template',$tmpl, join(':',$time,@$vals)); - print DEBUG "update @args\n"; - RRDs::update(@args); - my $err= RRDs::error; - die "$err [@args]" if defined $err; +sub get_rrd_info ($$) { + my ($rrdupdate, $path) = @_; + my $h= RRDs::info($path); + die "$path $! ".(RRDs::error) unless $h; + die "$path ?" unless $h->{'last_update'}; + $rrdupdate->{DoneUpto}= $h->{'last_update'}; +} + +sub find_or_create_rrd ($$) { + my ($inout, $fields) = @_; + my $rrd= { + Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd", + }; + if (stat $rrd->{Path}) { + get_rrd_info($rrd, $rrd->{Path}); + } else { + $!==&ENOENT or die "$rrd->{Path} $!"; + create_rrd($inout, $rrd->{Path}, $fields); + $rrd->{DoneUpto}= 0; + } + return $rrd; +} + +our %rrds; +our @rrd_blockedupdates; +our $rrd_blockedupdate_time; + +sub update_rrd ($$$$) { + my ($inout,$tmpl,$vals,$fields) = @_; + + my $rrd= $rrds{$host,$peer,$inout}; + if (!$rrd) { + $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout, $fields); + $rrd->{Template}= $tmpl; + } + return if $time <= $rrd->{DoneUpto}; + + my $blocked= $rrd->{BlockedUpdate}; + if (defined $blocked) { + for (my $ix=0; $ix<@$vals; $ix++) { + my $old= $blocked->[$ix]; + my $new= $vals->[$ix]; + $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new; + } + return; + } + $rrd->{BlockedUpdate}= $vals; + $rrd_blockedupdate_time= $time; + push @rrd_blockedupdates, $rrd; +} + +sub actually_update_rrds () { + return unless defined $rrd_blockedupdate_time; + return if $time == $rrd_blockedupdate_time; + + while (my $rrd= shift @rrd_blockedupdates) { + my $vals= $rrd->{BlockedUpdate}; + next unless $vals; + delete $rrd->{BlockedUpdate}; + + my @args= ($rrd->{Path}, '--template',$rrd->{Template}, + join(':',$rrd_blockedupdate_time,@$vals)); + print DEBUG "update @args\n"; + RRDs::update(@args); + my $err= RRDs::error; + die "$err [@args]" if defined $err; + } + + $rrd_blockedupdate_time= undef; } our %in_conns; sub inbound_connected () { - #print "$host $peer $conn START\n"; - perhaps_create_rrd('in',\@fields_in); + print DEBUG "inbound connected $host $peer $conn\n"; $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ]; } sub inbound_closed () { - #print "$host $peer $conn STOP\n"; + print DEBUG "inbound closed $host $peer $conn\n"; delete $in_conns{$host,$peer,$conn}; } sub inbound_stats () { @@ -110,8 +163,10 @@ sub inbound_stats () { s/(?<=[a-z]) (?=[a-z])/_/g; my $hpc= $in_conns{$host,$peer,$conn}; if (!$hpc) { - perhaps_create_rrd('in',\@fields_in); + print DEBUG "inbound UNKNOWN $host $peer $conn $stats\n"; $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ]; + } else { + print DEBUG "inbound stats $host $peer $conn $stats\n"; } while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; } my @v; @@ -126,7 +181,7 @@ sub inbound_stats () { $hpc->[@v]= $this; push @v, defined($last) ? $this - $last : 'U'; } - update_rrd('in',$tmpl_in,\@v); + update_rrd('in',$tmpl_in,\@v,\@fields_in); } sub outbound_stats () { @@ -141,10 +196,12 @@ sub run ($) { my ($process,$pid,$msg,$cc,$sl); while ($sl= $parser->next) { ($time,$host,$process,$pid,$msg) = @$sl; + actually_update_rrds(); next unless exists $dohosts{$host}; #print join("|", map { defined($_) ? $_ : "" } @$sl), "\n"; if ($process eq 'innd' && !defined $pid) { - if (($peer,$conn) = $msg =~ m/^($host_re) connected ($conn_re)$/) { + if (($peer,$conn) = $msg =~ + m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) { inbound_connected() } elsif (($peer,$conn,$cc,$stats) = $msg =~ m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) { @@ -166,7 +223,7 @@ sub run ($) { foreach my $staticpath (@ARGV) { if ($staticpath =~ m/\.gz$/) { my $fh= new IO::Handle; - open $fh, '-|', 'gunzip', '--', $staticpath or die $!; + open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!; run($fh); !$fh->error or die "$staticpath $!"; $!=0;$?=0; close $fh or die "$staticpath $! $?"; -- 2.30.2