chiark / gitweb /
input seems to work
[rrd-graphs.git] / newstailer
1 #!/usr/bin/perl -w
2
3 use strict qw(refs vars);
4 use POSIX;
5
6 use IO::Handle;
7 use IO::File;
8 use File::Tail;
9 use Parse::Syslog;
10 use RRDs;
11
12 our %dohosts;
13 our $outpfx= './';
14 our $sep= '_';
15 our $debug= 0;
16 our $interval= 30;
17
18 open DEBUG, ">/dev/null" or die $!;
19
20 while (@ARGV && $ARGV[0] =~ m/^\-./) {
21     $_= shift @ARGV;
22     last if $_ eq '--';
23     while (m/^\-./) {
24         if (s/^-h//) { $dohosts{$_}=1; last; }
25         elsif (s/^-O//) { $outpfx=$_; last; }
26         elsif (s/^-s//) { $sep=$_; last; }
27         elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
28         elsif (s/^-D/-/) { $debug++; }
29         else { die "bad option $_"; }
30     }
31 }
32
33 if (!keys %dohosts) {
34     my ($sysname, $nodename) = POSIX::uname();
35     die unless defined $nodename;
36     $dohosts{$nodename}= 1;
37 }
38
39 die unless @ARGV;
40 my $totail= pop @ARGV;
41
42 if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
43
44 our %details;
45
46
47 our @detail_defaults=
48     (
49      Step => 60,
50      DstArguments => "7200:0:U",
51      Xff => 0.5,
52      Archives => [ [ 3600*4,           60 ],   # 4hr, 1min resolution
53                    [ 3600*25,         180 ],   # 25h, 3min resolution
54                    [ 86400*14*5,      3600 ],  # 14wks, 1hr resolution
55                    [ 86400*370*2, 3600*24 ] ], # 2yr+, 1day resolution
56      );
57
58 our @fields_in= qw(seconds accepted refused rejected duplicate
59                    accepted_size duplicate_size);
60 $details{'in'}= {
61     Fields => \@fields_in,
62     @detail_defaults
63 };
64
65 our @fields_out= qw(missing offered deferred
66                     accepted unwanted rejected body_missing);
67 $details{'out'}= {
68     Fields => \@fields_out,
69     @detail_defaults
70 };
71
72
73 our ($time,$host,$peer,$conn,$stats);
74
75 sub create_rrd ($$) {
76     my ($inout, $path) = @_;
77
78     my $details= $details{$inout};
79
80     my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
81     my @largs;
82     push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
83         foreach @{ $details{$inout}{Fields} };
84     foreach (@{ $details->{Archives} }) {
85         my ($whole,$reso) = @$_;
86         my $steps= $reso / $details->{Step};
87         my $rows= $whole / $reso;
88         push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
89     } 
90     print DEBUG join("            \\\n  ", "creating @sargs", @largs),"\n";
91     RRDs::create(@sargs,@largs);
92     my $err= RRDs::error;
93     die "$err [@sargs @largs]" if defined $err;
94 }
95
96 sub get_rrd_info ($$) {
97     my ($rrdupdate, $path) = @_;
98     my $h= RRDs::info($path);
99     die "$path $! ".(RRDs::error) unless $h;
100     die "$path ?" unless $h->{'last_update'};
101     $rrdupdate->{DoneUpto}= $h->{'last_update'};
102 }
103
104 sub find_or_create_rrd ($) {
105     my ($inout) = @_;
106     my $rrd= {
107         Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
108     };
109     if (stat $rrd->{Path}) {
110         get_rrd_info($rrd, $rrd->{Path});
111     } else {
112         $!==&ENOENT or die "$rrd->{Path} $!";
113         create_rrd($inout, $rrd->{Path});
114         $rrd->{DoneUpto}= 0;
115     }
116     return $rrd;
117 }
118
119 our %rrds;
120 our @rrd_blockedupdates;
121 our $rrd_blockedupdate_time;
122
123 sub update_rrd ($$) {
124     my ($inout,$vals) = @_;
125
126     my $rrd= $rrds{$host,$peer,$inout};
127     if (!$rrd) {
128         $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
129         $rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
130     }
131     return if $time <= $rrd->{DoneUpto};
132
133     my $blocked= $rrd->{BlockedUpdate};
134     if (defined $blocked) {
135         for (my $ix=0; $ix<@$vals; $ix++) {
136             my $old= $blocked->[$ix];
137             my $new= $vals->[$ix];
138             $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
139         }
140         return;
141     }
142     $rrd->{BlockedUpdate}= $vals;
143     $rrd_blockedupdate_time= $time;
144     push @rrd_blockedupdates, $rrd;
145 }
146
147 sub actually_update_rrds () {
148     return unless defined $rrd_blockedupdate_time;
149     return if $time == $rrd_blockedupdate_time;
150
151     while (my $rrd= shift @rrd_blockedupdates) {
152         my $vals= $rrd->{BlockedUpdate};
153         next unless $vals;
154         delete $rrd->{BlockedUpdate};
155
156         my @args= ($rrd->{Path}, '--template',$rrd->{Template},
157                    join(':',$rrd_blockedupdate_time,@$vals));
158         print DEBUG "update @args\n" if $debug>=2;
159         RRDs::update(@args);
160         my $err= RRDs::error;
161         die "$err [@args]" if defined $err;
162     }
163
164     $rrd_blockedupdate_time= undef;
165 }
166
167 our %in_conns;
168
169 sub inbound_connected () {
170     print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
171     $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
172 }
173 sub inbound_closed () {
174     print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
175     delete $in_conns{$host,$peer,$conn};
176 }
177 sub inbound_stats () {
178     $_= $stats.' ';
179     s/(?<=[a-z]) (?=[a-z])/_/g;
180     my $hpc= $in_conns{$host,$peer,$conn};
181     if (!$hpc) {
182         print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
183         $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
184     } else {
185         print DEBUG "inbound $time stats $host $peer $conn $stats\n"
186             if $debug>=2;
187     }
188     my %s;
189     while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
190     my @v;
191     foreach my $f (@fields_in) {
192         my $this= $s{$f};
193         if (!defined $this) {
194             delete $hpc->[@v];
195             push @v, 'U';
196             next;
197         }
198         my $last= $hpc->[@v];
199         $hpc->[@v]= $this;
200         push @v, defined($last) ? $this - $last : 'U';
201     }
202     update_rrd('in',\@v);
203 }
204
205 sub outbound_stats () {
206     print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
207     $_= " $stats ";
208     s/missing(?=\=\d+ \()/body_missing/;
209     s/\([^()]*\)/ /;
210     my %s;
211     while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
212     my @v;
213     foreach my $f (@fields_out) {
214         my $this= $s{$f};
215         push @v, defined($this) ? $this : 'U';
216     }
217     update_rrd('out',\@v);
218 }
219
220 sub run ($) {
221     my ($object) = @_;
222     my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
223     my $host_re= '[-.0-9a-z]+';
224     my $conn_re= '[1-9]\d{0,5}';
225     my ($process,$pid,$msg,$cc,$sl);
226     while ($sl= $parser->next) {
227         ($time,$host,$process,$pid,$msg) = @$sl;
228         actually_update_rrds();
229         next unless exists $dohosts{$host};
230         print DEBUG "logfile ",
231             join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n"
232                 if $debug>=3;
233         if ($process eq 'innd' && !defined $pid) {
234             if (($peer,$conn) = $msg =~
235                 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
236                 inbound_connected()
237             } elsif (($peer,$conn,$cc,$stats) = $msg =~
238      m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
239                 inbound_stats();
240                 inbound_closed() if $cc eq 'closed';
241             }
242         } elsif ($process eq 'innduct') {
243             if (($peer,$stats) = $msg =~
244      m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
245                 outbound_stats();
246             }
247         }
248     }
249 }
250
251 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) 
252
253         
254 foreach my $staticpath (@ARGV) {
255     if ($staticpath =~ m/\.gz$/) {
256         my $fh= new IO::Handle;
257         open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
258         run($fh);
259         !$fh->error or die "$staticpath $!";
260         $!=0;$?=0; close $fh or die "$staticpath $! $?";
261     } else {
262         my $fh= new IO::File $staticpath, '<' or die $!;
263         run($staticpath);
264         !$fh->error or die "$staticpath $!";
265         close $fh or die "$staticpath $!";
266     }
267 }
268
269 exit 0 if $totail eq '';
270
271 my $tailer= new File::Tail name=>$totail,
272     interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
273     or die "$totail $!";
274
275 run($tailer);
276
277 die "huh?";