our ($time,$host,$peer,$conn,$stats);
-sub path_rrd ($) {
- my ($inout) = @_;
- return "${outpfx}${host}${sep}${peer}_${inout}.rrd";
-}
+sub create_rrd ($$$) {
+ my ($inout, $path, $fields) = @_;
-sub perhaps_create_rrd ($$) {
- my ($inout, $fields) = @_;
- my $path= path_rrd($inout);
- return if stat $path;
- $!==&ENOENT or die "$path $!";
my $details= $details{$inout};
my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
die "$err [@sargs @largs]" if defined $err;
}
-sub update_rrd ($$$) {
- my ($inout,$tmpl,$vals) = @_;
- my $path= path_rrd($inout);
- my @args= ($path, '--template',$tmpl, join(':',$time,@$vals));
- print DEBUG "update @args\n";
- RRDs::update(@args);
- my $err= RRDs::error;
- die "$err [@args]" if defined $err;
+sub get_rrd_info ($$) {
+ my ($rrdupdate, $path) = @_;
+ my $h= RRDs::info($path);
+ die "$path $! ".(RRDs::error) unless $h;
+ die "$path ?" unless $h->{'last_update'};
+ $rrdupdate->{DoneUpto}= $h->{'last_update'};
+}
+
+sub find_or_create_rrd ($$) {
+ my ($inout, $fields) = @_;
+ my $rrd= {
+ Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
+ };
+ if (stat $rrd->{Path}) {
+ get_rrd_info($rrd, $rrd->{Path});
+ } else {
+ $!==&ENOENT or die "$rrd->{Path} $!";
+ create_rrd($inout, $rrd->{Path}, $fields);
+ $rrd->{DoneUpto}= 0;
+ }
+ return $rrd;
+}
+
+our %rrds;
+our @rrd_blockedupdates;
+our $rrd_blockedupdate_time;
+
+sub update_rrd ($$$$) {
+ my ($inout,$tmpl,$vals,$fields) = @_;
+
+ my $rrd= $rrds{$host,$peer,$inout};
+ if (!$rrd) {
+ $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout, $fields);
+ $rrd->{Template}= $tmpl;
+ }
+ return if $time <= $rrd->{DoneUpto};
+
+ my $blocked= $rrd->{BlockedUpdate};
+ if (defined $blocked) {
+ for (my $ix=0; $ix<@$vals; $ix++) {
+ my $old= $blocked->[$ix];
+ my $new= $vals->[$ix];
+ $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
+ }
+ return;
+ }
+ $rrd->{BlockedUpdate}= $vals;
+ $rrd_blockedupdate_time= $time;
+ push @rrd_blockedupdates, $rrd;
+}
+
+sub actually_update_rrds () {
+ return unless defined $rrd_blockedupdate_time;
+ return if $time == $rrd_blockedupdate_time;
+
+ while (my $rrd= shift @rrd_blockedupdates) {
+ my $vals= $rrd->{BlockedUpdate};
+ next unless $vals;
+ delete $rrd->{BlockedUpdate};
+
+ my @args= ($rrd->{Path}, '--template',$rrd->{Template},
+ join(':',$rrd_blockedupdate_time,@$vals));
+ print DEBUG "update @args\n";
+ RRDs::update(@args);
+ my $err= RRDs::error;
+ die "$err [@args]" if defined $err;
+ }
+
+ $rrd_blockedupdate_time= undef;
}
our %in_conns;
sub inbound_connected () {
- #print "$host $peer $conn START\n";
- perhaps_create_rrd('in',\@fields_in);
+ print DEBUG "inbound connected $host $peer $conn\n";
$in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
}
sub inbound_closed () {
- #print "$host $peer $conn STOP\n";
+ print DEBUG "inbound closed $host $peer $conn\n";
delete $in_conns{$host,$peer,$conn};
}
sub inbound_stats () {
s/(?<=[a-z]) (?=[a-z])/_/g;
my $hpc= $in_conns{$host,$peer,$conn};
if (!$hpc) {
- perhaps_create_rrd('in',\@fields_in);
+ print DEBUG "inbound UNKNOWN $host $peer $conn $stats\n";
$in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
+ } else {
+ print DEBUG "inbound stats $host $peer $conn $stats\n";
}
while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
my @v;
$hpc->[@v]= $this;
push @v, defined($last) ? $this - $last : 'U';
}
- update_rrd('in',$tmpl_in,\@v);
+ update_rrd('in',$tmpl_in,\@v,\@fields_in);
}
sub outbound_stats () {
my ($process,$pid,$msg,$cc,$sl);
while ($sl= $parser->next) {
($time,$host,$process,$pid,$msg) = @$sl;
+ actually_update_rrds();
next unless exists $dohosts{$host};
#print join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n";
if ($process eq 'innd' && !defined $pid) {
- if (($peer,$conn) = $msg =~ m/^($host_re) connected ($conn_re)$/) {
+ if (($peer,$conn) = $msg =~
+ m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
inbound_connected()
} elsif (($peer,$conn,$cc,$stats) = $msg =~
m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
foreach my $staticpath (@ARGV) {
if ($staticpath =~ m/\.gz$/) {
my $fh= new IO::Handle;
- open $fh, '-|', 'gunzip', '--', $staticpath or die $!;
+ open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
run($fh);
!$fh->error or die "$staticpath $!";
$!=0;$?=0; close $fh or die "$staticpath $! $?";