#!/usr/bin/perl -w
# Program for updating an rrd with info from innduct and inn logs
# Needs to be run once inside a lock
# rrd-graphs/newstailer - part of rrd-graphs, a tool for online graphs
# Copyright 2010, 2012 Ian Jackson
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
# killall newstailer
# with-lock-ex -f data/news/lock sh -xc \
# "rm data/news/*.rrd; ./newstailer -Odata/news/ -D \
# /var/log/news/news.notice.{6,5,4,3,2,1}.gz /var/log/news/news.notice{.0,} ''"
use strict qw(refs vars);
use POSIX;
use IO::Handle;
use IO::File;
use File::Tail;
use Parse::Syslog;
use RRDs;
our %dohosts;
our $outpfx= './';
our $sep= '_';
our $debug= 0;
our $interval= 30;
open DEBUG, ">/dev/null" or die $!;
while (@ARGV && $ARGV[0] =~ m/^\-./) {
$_= shift @ARGV;
last if $_ eq '--';
while (m/^\-./) {
if (s/^-h//) { $dohosts{$_}=1; last; }
elsif (s/^-O//) { $outpfx=$_; last; }
elsif (s/^-s//) { $sep=$_; last; }
elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
elsif (s/^-D/-/) { $debug++; }
else { die "bad option $_"; }
}
}
if (!keys %dohosts) {
my ($sysname, $nodename) = POSIX::uname();
die unless defined $nodename;
$dohosts{$nodename}= 1;
}
die unless @ARGV;
my $totail= pop @ARGV;
if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
our %details;
our @detail_defaults=
(
Step => 60,
DstArguments => "7200:0:U",
Xff => 0.5,
Archives => [ [ 3600*4, 60 ], # 4hr, 1min resolution
[ 3600*25, 180 ], # 25h, 3min resolution
[ 86400*8, 600 ], # 8d, 10min resolution
[ 86400*7*14, 3600*2 ], # 14wks, 2hr resolution
[ 86400*366, 3600*6 ], # 1yr, 6hr resolution
[ 86400*366*3, 3600*24 ] ], # 3yr, 1d resolution
);
our @fields_in= qw(seconds accepted refused rejected duplicate
accepted_size duplicate_size);
$details{'in'}= {
Fields => \@fields_in,
@detail_defaults
};
our @fields_out= qw(missing offered deferred
accepted unwanted rejected body_missing);
$details{'out'}= {
Fields => \@fields_out,
@detail_defaults
};
our ($time,$host,$peer,$conn,$stats);
sub create_rrd ($$) {
my ($inout, $path) = @_;
my $details= $details{$inout};
my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
my @largs;
push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
foreach @{ $details{$inout}{Fields} };
foreach (@{ $details->{Archives} }) {
my ($whole,$reso) = @$_;
my $steps= $reso / $details->{Step};
my $rows= $whole / $reso;
push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
}
print DEBUG join(" \\\n ", "creating @sargs", @largs),"\n";
RRDs::create(@sargs,@largs);
my $err= RRDs::error;
die "$err [@sargs @largs]" if defined $err;
}
sub get_rrd_info ($$) {
my ($rrdupdate, $path) = @_;
my $h= RRDs::info($path);
die "$path $! ".(RRDs::error) unless $h;
die "$path ?" unless $h->{'last_update'};
$rrdupdate->{DoneUpto}= $h->{'last_update'};
}
sub find_or_create_rrd ($) {
my ($inout) = @_;
my $rrd= {
Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
};
if (stat $rrd->{Path}) {
get_rrd_info($rrd, $rrd->{Path});
} else {
$!==&ENOENT or die "$rrd->{Path} $!";
create_rrd($inout, $rrd->{Path});
$rrd->{DoneUpto}= 0;
}
return $rrd;
}
our %rrds;
our @rrd_blockedupdates;
our $rrd_blockedupdate_time;
sub update_rrd ($$) {
my ($inout,$vals) = @_;
my $rrd= $rrds{$host,$peer,$inout};
if (!$rrd) {
$rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
$rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
}
return if $time <= $rrd->{DoneUpto};
my $blocked= $rrd->{BlockedUpdate};
if (defined $blocked) {
for (my $ix=0; $ix<@$vals; $ix++) {
my $old= $blocked->[$ix];
my $new= $vals->[$ix];
$blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
}
return;
}
$rrd->{BlockedUpdate}= $vals;
$rrd_blockedupdate_time= $time;
push @rrd_blockedupdates, $rrd;
}
sub actually_update_rrds () {
return unless defined $rrd_blockedupdate_time;
return if $time == $rrd_blockedupdate_time;
while (my $rrd= shift @rrd_blockedupdates) {
my $vals= $rrd->{BlockedUpdate};
next unless $vals;
delete $rrd->{BlockedUpdate};
my @args= ($rrd->{Path}, '--template',$rrd->{Template},
join(':',$rrd_blockedupdate_time,@$vals));
print DEBUG "update @args\n" if $debug>=2;
RRDs::update(@args);
my $err= RRDs::error;
die "$err [@args]" if defined $err;
}
$rrd_blockedupdate_time= undef;
}
our %in_conns;
sub inbound_connected () {
print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
$in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
}
sub inbound_closed () {
print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
delete $in_conns{$host,$peer,$conn};
}
sub inbound_stats () {
$_= $stats.' ';
s/(?<=[a-z]) (?=[a-z])/_/g;
my $hpc= $in_conns{$host,$peer,$conn};
if (!$hpc) {
print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
$in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
} else {
print DEBUG "inbound $time stats $host $peer $conn $stats\n"
if $debug>=2;
}
my %s;
while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
my @v;
foreach my $f (@fields_in) {
my $this= $s{$f};
if (!defined $this) {
delete $hpc->[@v];
push @v, 'U';
next;
}
my $last= $hpc->[@v];
$hpc->[@v]= $this;
push @v, defined($last) ? $this - $last : 'U';
}
update_rrd('in',\@v);
}
sub outbound_stats () {
print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
$_= " $stats ";
s/missing(?=\=\d+ \()/body_missing/;
s/\([^()]*\)/ /;
my %s;
while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
my @v;
foreach my $f (@fields_out) {
my $this= $s{$f};
push @v, defined($this) ? $this : 'U';
}
update_rrd('out',\@v);
}
sub run ($) {
my ($object) = @_;
my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
my $host_re= '[-.0-9a-z]+';
my $conn_re= '[1-9]\d{0,5}';
my ($process,$pid,$msg,$cc,$sl);
while ($sl= $parser->next) {
($time,$host,$process,$pid,$msg) = @$sl;
actually_update_rrds();
next unless exists $dohosts{$host};
print DEBUG "logfile ",
join("|", map { defined($_) ? $_ : "" } @$sl), "\n"
if $debug>=3;
if ($process eq 'innd' && !defined $pid) {
if (($peer,$conn) = $msg =~
m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
inbound_connected()
} elsif (($peer,$conn,$cc,$stats) = $msg =~
m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
inbound_stats();
inbound_closed() if $cc eq 'closed';
}
} elsif ($process eq 'innduct') {
if (($peer,$stats) = $msg =~
m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
outbound_stats();
}
}
}
}
#seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+)
foreach my $staticpath (@ARGV) {
if ($staticpath =~ m/\.gz$/) {
my $fh= new IO::Handle;
open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
run($fh);
!$fh->error or die "$staticpath $!";
$!=0;$?=0; close $fh or die "$staticpath $! $?";
} else {
my $fh= new IO::File $staticpath, '<' or die $!;
run($staticpath);
!$fh->error or die "$staticpath $!";
close $fh or die "$staticpath $!";
}
}
exit 0 if $totail eq '';
my $tailer= new File::Tail name=>$totail,
interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
or die "$totail $!";
run($tailer);
die "huh?";