chiark / gitweb /
copyright notices and licences
[rrd-graphs.git] / newstailer
index 3ff6b72140d056b24e11f9b258cc75cd754fca6b..8fffccdb46e15e1630845c4e46d0cd12e07c65c1 100755 (executable)
@@ -1,5 +1,30 @@
 #!/usr/bin/perl -w
 
+# Program for updating an rrd with info from innduct and inn logs
+# Needs to be run once inside a lock
+
+# rrd-graphs/newstailer - part of rrd-graphs, a tool for online graphs
+# Copyright 2010, 2012 Ian Jackson
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+# killall newstailer
+# with-lock-ex -f data/news/lock sh -xc \
+# "rm data/news/*.rrd; ./newstailer -Odata/news/ -D \
+# /var/log/news/news.notice.{6,5,4,3,2,1}.gz /var/log/news/news.notice{.0,} ''"
+
 use strict qw(refs vars);
 use POSIX;
 
@@ -7,37 +32,271 @@ use IO::Handle;
 use IO::File;
 use File::Tail;
 use Parse::Syslog;
+use RRDs;
 
-die unless @ARGV;
-die if $ARGV[0] =~ m/^\-/;
+our %dohosts;
+our $outpfx= './';
+our $sep= '_';
+our $debug= 0;
+our $interval= 30;
+
+open DEBUG, ">/dev/null" or die $!;
+
+while (@ARGV && $ARGV[0] =~ m/^\-./) {
+    $_= shift @ARGV;
+    last if $_ eq '--';
+    while (m/^\-./) {
+       if (s/^-h//) { $dohosts{$_}=1; last; }
+       elsif (s/^-O//) { $outpfx=$_; last; }
+       elsif (s/^-s//) { $sep=$_; last; }
+       elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
+       elsif (s/^-D/-/) { $debug++; }
+       else { die "bad option $_"; }
+    }
+}
+
+if (!keys %dohosts) {
+    my ($sysname, $nodename) = POSIX::uname();
+    die unless defined $nodename;
+    $dohosts{$nodename}= 1;
+}
 
+die unless @ARGV;
 my $totail= pop @ARGV;
 
+if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
+
+our %details;
+
+
+our @detail_defaults=
+    (
+     Step => 60,
+     DstArguments => "7200:0:U",
+     Xff => 0.5,
+     Archives => [ [ 3600*4,            60 ],   # 4hr, 1min resolution
+                  [ 3600*25,          180 ],   # 25h, 3min resolution
+                  [ 86400*8,          600 ],   #  8d, 10min resolution
+                  [ 86400*7*14,    3600*2 ],   # 14wks, 2hr resolution
+                  [ 86400*366,     3600*6 ],   # 1yr, 6hr resolution
+                  [ 86400*366*3,   3600*24 ] ], # 3yr, 1d resolution
+     );
+
+our @fields_in= qw(seconds accepted refused rejected duplicate
+                  accepted_size duplicate_size);
+$details{'in'}= {
+    Fields => \@fields_in,
+    @detail_defaults
+};
+
+our @fields_out= qw(missing offered deferred
+                   accepted unwanted rejected body_missing);
+$details{'out'}= {
+    Fields => \@fields_out,
+    @detail_defaults
+};
+
+
+our ($time,$host,$peer,$conn,$stats);
+
+sub create_rrd ($$) {
+    my ($inout, $path) = @_;
+
+    my $details= $details{$inout};
+
+    my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
+    my @largs;
+    push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
+       foreach @{ $details{$inout}{Fields} };
+    foreach (@{ $details->{Archives} }) {
+       my ($whole,$reso) = @$_;
+       my $steps= $reso / $details->{Step};
+       my $rows= $whole / $reso;
+       push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
+    } 
+    print DEBUG join("            \\\n  ", "creating @sargs", @largs),"\n";
+    RRDs::create(@sargs,@largs);
+    my $err= RRDs::error;
+    die "$err [@sargs @largs]" if defined $err;
+}
+
+sub get_rrd_info ($$) {
+    my ($rrdupdate, $path) = @_;
+    my $h= RRDs::info($path);
+    die "$path $! ".(RRDs::error) unless $h;
+    die "$path ?" unless $h->{'last_update'};
+    $rrdupdate->{DoneUpto}= $h->{'last_update'};
+}
+
+sub find_or_create_rrd ($) {
+    my ($inout) = @_;
+    my $rrd= {
+       Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
+    };
+    if (stat $rrd->{Path}) {
+       get_rrd_info($rrd, $rrd->{Path});
+    } else {
+       $!==&ENOENT or die "$rrd->{Path} $!";
+       create_rrd($inout, $rrd->{Path});
+       $rrd->{DoneUpto}= 0;
+    }
+    return $rrd;
+}
+
+our %rrds;
+our @rrd_blockedupdates;
+our $rrd_blockedupdate_time;
+
+sub update_rrd ($$) {
+    my ($inout,$vals) = @_;
+
+    my $rrd= $rrds{$host,$peer,$inout};
+    if (!$rrd) {
+       $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
+       $rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
+    }
+    return if $time <= $rrd->{DoneUpto};
+
+    my $blocked= $rrd->{BlockedUpdate};
+    if (defined $blocked) {
+       for (my $ix=0; $ix<@$vals; $ix++) {
+           my $old= $blocked->[$ix];
+           my $new= $vals->[$ix];
+           $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
+       }
+       return;
+    }
+    $rrd->{BlockedUpdate}= $vals;
+    $rrd_blockedupdate_time= $time;
+    push @rrd_blockedupdates, $rrd;
+}
+
+sub actually_update_rrds () {
+    return unless defined $rrd_blockedupdate_time;
+    return if $time == $rrd_blockedupdate_time;
+
+    while (my $rrd= shift @rrd_blockedupdates) {
+       my $vals= $rrd->{BlockedUpdate};
+       next unless $vals;
+       delete $rrd->{BlockedUpdate};
+
+       my @args= ($rrd->{Path}, '--template',$rrd->{Template},
+                  join(':',$rrd_blockedupdate_time,@$vals));
+       print DEBUG "update @args\n" if $debug>=2;
+       RRDs::update(@args);
+       my $err= RRDs::error;
+       die "$err [@args]" if defined $err;
+    }
+
+    $rrd_blockedupdate_time= undef;
+}
+
+our %in_conns;
+
+sub inbound_connected () {
+    print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
+    $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
+}
+sub inbound_closed () {
+    print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
+    delete $in_conns{$host,$peer,$conn};
+}
+sub inbound_stats () {
+    $_= $stats.' ';
+    s/(?<=[a-z]) (?=[a-z])/_/g;
+    my $hpc= $in_conns{$host,$peer,$conn};
+    if (!$hpc) {
+       print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
+       $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
+    } else {
+       print DEBUG "inbound $time stats $host $peer $conn $stats\n"
+           if $debug>=2;
+    }
+    my %s;
+    while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
+    my @v;
+    foreach my $f (@fields_in) {
+       my $this= $s{$f};
+       if (!defined $this) {
+           delete $hpc->[@v];
+           push @v, 'U';
+           next;
+       }
+       my $last= $hpc->[@v];
+       $hpc->[@v]= $this;
+       push @v, defined($last) ? $this - $last : 'U';
+    }
+    update_rrd('in',\@v);
+}
+
+sub outbound_stats () {
+    print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
+    $_= " $stats ";
+    s/missing(?=\=\d+ \()/body_missing/;
+    s/\([^()]*\)/ /;
+    my %s;
+    while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
+    my @v;
+    foreach my $f (@fields_out) {
+       my $this= $s{$f};
+       push @v, defined($this) ? $this : 'U';
+    }
+    update_rrd('out',\@v);
+}
+
 sub run ($) {
     my ($object) = @_;
     my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
-    while (my $sl= $parser->next) {
-       print join("|", @$sl), "\n";
+    my $host_re= '[-.0-9a-z]+';
+    my $conn_re= '[1-9]\d{0,5}';
+    my ($process,$pid,$msg,$cc,$sl);
+    while ($sl= $parser->next) {
+       ($time,$host,$process,$pid,$msg) = @$sl;
+       actually_update_rrds();
+       next unless exists $dohosts{$host};
+       print DEBUG "logfile ",
+           join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n"
+               if $debug>=3;
+       if ($process eq 'innd' && !defined $pid) {
+           if (($peer,$conn) = $msg =~
+               m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
+               inbound_connected()
+           } elsif (($peer,$conn,$cc,$stats) = $msg =~
+     m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
+               inbound_stats();
+               inbound_closed() if $cc eq 'closed';
+           }
+       } elsif ($process eq 'innduct') {
+           if (($peer,$stats) = $msg =~
+     m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
+               outbound_stats();
+           }
+       }
     }
 }
 
+#seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) 
+
+       
 foreach my $staticpath (@ARGV) {
     if ($staticpath =~ m/\.gz$/) {
        my $fh= new IO::Handle;
-       open $fh, '-|', 'gunzip', '--', $staticpath or die $!;
+       open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
        run($fh);
        !$fh->error or die "$staticpath $!";
        $!=0;$?=0; close $fh or die "$staticpath $! $?";
     } else {
-       my $fh= new IO::File '<', $staticpath or die $!;
+       my $fh= new IO::File $staticpath, '<' or die $!;
        run($staticpath);
        !$fh->error or die "$staticpath $!";
        close $fh or die "$staticpath $!";
     }
 }
 
+exit 0 if $totail eq '';
+
 my $tailer= new File::Tail name=>$totail,
-    interval=>60, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
+    interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
     or die "$totail $!";
 
 run($tailer);