#!/usr/bin/perl -w # killall newstailer # with-lock-ex -f data/news/lock sh -xc \ # "rm data/news/*.rrd; ./newstailer -Odata/news/ -D \ # /var/log/news/news.notice.{6,5,4,3,2,1}.gz /var/log/news/news.notice{.0,} ''" use strict qw(refs vars); use POSIX; use IO::Handle; use IO::File; use File::Tail; use Parse::Syslog; use RRDs; our %dohosts; our $outpfx= './'; our $sep= '_'; our $debug= 0; our $interval= 30; open DEBUG, ">/dev/null" or die $!; while (@ARGV && $ARGV[0] =~ m/^\-./) { $_= shift @ARGV; last if $_ eq '--'; while (m/^\-./) { if (s/^-h//) { $dohosts{$_}=1; last; } elsif (s/^-O//) { $outpfx=$_; last; } elsif (s/^-s//) { $sep=$_; last; } elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; } elsif (s/^-D/-/) { $debug++; } else { die "bad option $_"; } } } if (!keys %dohosts) { my ($sysname, $nodename) = POSIX::uname(); die unless defined $nodename; $dohosts{$nodename}= 1; } die unless @ARGV; my $totail= pop @ARGV; if ($debug) { open DEBUG, ">& STDOUT" or die $!; } our %details; our @detail_defaults= ( Step => 60, DstArguments => "7200:0:U", Xff => 0.5, Archives => [ [ 3600*4, 60 ], # 4hr, 1min resolution [ 3600*25, 180 ], # 25h, 3min resolution [ 86400*8, 600 ], # 8d, 10min resolution [ 86400*7*14, 3600*2 ], # 14wks, 2hr resolution [ 86400*366, 3600*6 ], # 1yr, 6hr resolution [ 86400*366*3, 3600*24 ] ], # 3yr, 1d resolution ); our @fields_in= qw(seconds accepted refused rejected duplicate accepted_size duplicate_size); $details{'in'}= { Fields => \@fields_in, @detail_defaults }; our @fields_out= qw(missing offered deferred accepted unwanted rejected body_missing); $details{'out'}= { Fields => \@fields_out, @detail_defaults }; our ($time,$host,$peer,$conn,$stats); sub create_rrd ($$) { my ($inout, $path) = @_; my $details= $details{$inout}; my @sargs= ($path, '--start','now-1y', '--step',$details->{Step}); my @largs; push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}" foreach @{ $details{$inout}{Fields} }; foreach (@{ $details->{Archives} }) { my ($whole,$reso) = @$_; my $steps= $reso / $details->{Step}; my $rows= $whole / $reso; push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows"; } print DEBUG join(" \\\n ", "creating @sargs", @largs),"\n"; RRDs::create(@sargs,@largs); my $err= RRDs::error; die "$err [@sargs @largs]" if defined $err; } sub get_rrd_info ($$) { my ($rrdupdate, $path) = @_; my $h= RRDs::info($path); die "$path $! ".(RRDs::error) unless $h; die "$path ?" unless $h->{'last_update'}; $rrdupdate->{DoneUpto}= $h->{'last_update'}; } sub find_or_create_rrd ($) { my ($inout) = @_; my $rrd= { Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd", }; if (stat $rrd->{Path}) { get_rrd_info($rrd, $rrd->{Path}); } else { $!==&ENOENT or die "$rrd->{Path} $!"; create_rrd($inout, $rrd->{Path}); $rrd->{DoneUpto}= 0; } return $rrd; } our %rrds; our @rrd_blockedupdates; our $rrd_blockedupdate_time; sub update_rrd ($$) { my ($inout,$vals) = @_; my $rrd= $rrds{$host,$peer,$inout}; if (!$rrd) { $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout); $rrd->{Template}= join ':', @{ $details{$inout}{Fields} }; } return if $time <= $rrd->{DoneUpto}; my $blocked= $rrd->{BlockedUpdate}; if (defined $blocked) { for (my $ix=0; $ix<@$vals; $ix++) { my $old= $blocked->[$ix]; my $new= $vals->[$ix]; $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new; } return; } $rrd->{BlockedUpdate}= $vals; $rrd_blockedupdate_time= $time; push @rrd_blockedupdates, $rrd; } sub actually_update_rrds () { return unless defined $rrd_blockedupdate_time; return if $time == $rrd_blockedupdate_time; while (my $rrd= shift @rrd_blockedupdates) { my $vals= $rrd->{BlockedUpdate}; next unless $vals; delete $rrd->{BlockedUpdate}; my @args= ($rrd->{Path}, '--template',$rrd->{Template}, join(':',$rrd_blockedupdate_time,@$vals)); print DEBUG "update @args\n" if $debug>=2; RRDs::update(@args); my $err= RRDs::error; die "$err [@args]" if defined $err; } $rrd_blockedupdate_time= undef; } our %in_conns; sub inbound_connected () { print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2; $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ]; } sub inbound_closed () { print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2; delete $in_conns{$host,$peer,$conn}; } sub inbound_stats () { $_= $stats.' '; s/(?<=[a-z]) (?=[a-z])/_/g; my $hpc= $in_conns{$host,$peer,$conn}; if (!$hpc) { print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n"; $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ]; } else { print DEBUG "inbound $time stats $host $peer $conn $stats\n" if $debug>=2; } my %s; while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; } my @v; foreach my $f (@fields_in) { my $this= $s{$f}; if (!defined $this) { delete $hpc->[@v]; push @v, 'U'; next; } my $last= $hpc->[@v]; $hpc->[@v]= $this; push @v, defined($last) ? $this - $last : 'U'; } update_rrd('in',\@v); } sub outbound_stats () { print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2; $_= " $stats "; s/missing(?=\=\d+ \()/body_missing/; s/\([^()]*\)/ /; my %s; while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; } my @v; foreach my $f (@fields_out) { my $this= $s{$f}; push @v, defined($this) ? $this : 'U'; } update_rrd('out',\@v); } sub run ($) { my ($object) = @_; my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1; my $host_re= '[-.0-9a-z]+'; my $conn_re= '[1-9]\d{0,5}'; my ($process,$pid,$msg,$cc,$sl); while ($sl= $parser->next) { ($time,$host,$process,$pid,$msg) = @$sl; actually_update_rrds(); next unless exists $dohosts{$host}; print DEBUG "logfile ", join("|", map { defined($_) ? $_ : "" } @$sl), "\n" if $debug>=3; if ($process eq 'innd' && !defined $pid) { if (($peer,$conn) = $msg =~ m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) { inbound_connected() } elsif (($peer,$conn,$cc,$stats) = $msg =~ m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) { inbound_stats(); inbound_closed() if $cc eq 'closed'; } } elsif ($process eq 'innduct') { if (($peer,$stats) = $msg =~ m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) { outbound_stats(); } } } } #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) foreach my $staticpath (@ARGV) { if ($staticpath =~ m/\.gz$/) { my $fh= new IO::Handle; open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!; run($fh); !$fh->error or die "$staticpath $!"; $!=0;$?=0; close $fh or die "$staticpath $! $?"; } else { my $fh= new IO::File $staticpath, '<' or die $!; run($staticpath); !$fh->error or die "$staticpath $!"; close $fh or die "$staticpath $!"; } } exit 0 if $totail eq ''; my $tailer= new File::Tail name=>$totail, interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1 or die "$totail $!"; run($tailer); die "huh?";