chiark / gitweb /
appears to be able to do in
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 27 Jun 2010 17:00:56 +0000 (18:00 +0100)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 27 Jun 2010 17:00:56 +0000 (18:00 +0100)
newstailer

index 2b482e70fe6fda00010f65ba08e528c347ce4923..6fcbc89b9b9fa274f983d370c956718b2b712180 100755 (executable)
@@ -56,16 +56,9 @@ our %details= (
 
 our ($time,$host,$peer,$conn,$stats);
 
-sub path_rrd ($) {
-    my ($inout) = @_;
-    return "${outpfx}${host}${sep}${peer}_${inout}.rrd";
-}
+sub create_rrd ($$$) {
+    my ($inout, $path, $fields) = @_;
 
-sub perhaps_create_rrd ($$) {
-    my ($inout, $fields) = @_;
-    my $path= path_rrd($inout);
-    return if stat $path;
-    $!==&ENOENT or die "$path $!";
     my $details= $details{$inout};
 
     my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
@@ -83,25 +76,85 @@ sub perhaps_create_rrd ($$) {
     die "$err [@sargs @largs]" if defined $err;
 }
 
-sub update_rrd ($$$) {
-    my ($inout,$tmpl,$vals) = @_;
-    my $path= path_rrd($inout);
-    my @args= ($path, '--template',$tmpl, join(':',$time,@$vals));
-    print DEBUG "update @args\n";
-    RRDs::update(@args);
-    my $err= RRDs::error;
-    die "$err [@args]" if defined $err;
+sub get_rrd_info ($$) {
+    my ($rrdupdate, $path) = @_;
+    my $h= RRDs::info($path);
+    die "$path $! ".(RRDs::error) unless $h;
+    die "$path ?" unless $h->{'last_update'};
+    $rrdupdate->{DoneUpto}= $h->{'last_update'};
+}
+
+sub find_or_create_rrd ($$) {
+    my ($inout, $fields) = @_;
+    my $rrd= {
+       Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
+    };
+    if (stat $rrd->{Path}) {
+       get_rrd_info($rrd, $rrd->{Path});
+    } else {
+       $!==&ENOENT or die "$rrd->{Path} $!";
+       create_rrd($inout, $rrd->{Path}, $fields);
+       $rrd->{DoneUpto}= 0;
+    }
+    return $rrd;
+}
+
+our %rrds;
+our @rrd_blockedupdates;
+our $rrd_blockedupdate_time;
+
+sub update_rrd ($$$$) {
+    my ($inout,$tmpl,$vals,$fields) = @_;
+
+    my $rrd= $rrds{$host,$peer,$inout};
+    if (!$rrd) {
+       $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout, $fields);
+       $rrd->{Template}= $tmpl;
+    }
+    return if $time <= $rrd->{DoneUpto};
+
+    my $blocked= $rrd->{BlockedUpdate};
+    if (defined $blocked) {
+       for (my $ix=0; $ix<@$vals; $ix++) {
+           my $old= $blocked->[$ix];
+           my $new= $vals->[$ix];
+           $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
+       }
+       return;
+    }
+    $rrd->{BlockedUpdate}= $vals;
+    $rrd_blockedupdate_time= $time;
+    push @rrd_blockedupdates, $rrd;
+}
+
+sub actually_update_rrds () {
+    return unless defined $rrd_blockedupdate_time;
+    return if $time == $rrd_blockedupdate_time;
+
+    while (my $rrd= shift @rrd_blockedupdates) {
+       my $vals= $rrd->{BlockedUpdate};
+       next unless $vals;
+       delete $rrd->{BlockedUpdate};
+
+       my @args= ($rrd->{Path}, '--template',$rrd->{Template},
+                  join(':',$rrd_blockedupdate_time,@$vals));
+       print DEBUG "update @args\n";
+       RRDs::update(@args);
+       my $err= RRDs::error;
+       die "$err [@args]" if defined $err;
+    }
+
+    $rrd_blockedupdate_time= undef;
 }
 
 our %in_conns;
 
 sub inbound_connected () {
-    #print "$host $peer $conn START\n";
-    perhaps_create_rrd('in',\@fields_in);
+    print DEBUG "inbound connected $host $peer $conn\n";
     $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
 }
 sub inbound_closed () {
-    #print "$host $peer $conn STOP\n";
+    print DEBUG "inbound closed $host $peer $conn\n";
     delete $in_conns{$host,$peer,$conn};
 }
 sub inbound_stats () {
@@ -110,8 +163,10 @@ sub inbound_stats () {
     s/(?<=[a-z]) (?=[a-z])/_/g;
     my $hpc= $in_conns{$host,$peer,$conn};
     if (!$hpc) {
-       perhaps_create_rrd('in',\@fields_in);
+       print DEBUG "inbound UNKNOWN $host $peer $conn $stats\n";
        $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
+    } else {
+       print DEBUG "inbound stats $host $peer $conn $stats\n";
     }
     while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
     my @v;
@@ -126,7 +181,7 @@ sub inbound_stats () {
        $hpc->[@v]= $this;
        push @v, defined($last) ? $this - $last : 'U';
     }
-    update_rrd('in',$tmpl_in,\@v);
+    update_rrd('in',$tmpl_in,\@v,\@fields_in);
 }
 
 sub outbound_stats () {
@@ -141,10 +196,12 @@ sub run ($) {
     my ($process,$pid,$msg,$cc,$sl);
     while ($sl= $parser->next) {
        ($time,$host,$process,$pid,$msg) = @$sl;
+       actually_update_rrds();
        next unless exists $dohosts{$host};
        #print join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n";
        if ($process eq 'innd' && !defined $pid) {
-           if (($peer,$conn) = $msg =~ m/^($host_re) connected ($conn_re)$/) {
+           if (($peer,$conn) = $msg =~
+               m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
                inbound_connected()
            } elsif (($peer,$conn,$cc,$stats) = $msg =~
             m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
@@ -166,7 +223,7 @@ sub run ($) {
 foreach my $staticpath (@ARGV) {
     if ($staticpath =~ m/\.gz$/) {
        my $fh= new IO::Handle;
-       open $fh, '-|', 'gunzip', '--', $staticpath or die $!;
+       open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
        run($fh);
        !$fh->error or die "$staticpath $!";
        $!=0;$?=0; close $fh or die "$staticpath $! $?";