chiark / gitweb /
debug level 1 shows creation and unknown stuff only
[rrd-graphs.git] / newstailer
1 #!/usr/bin/perl -w
2
3 use strict qw(refs vars);
4 use POSIX;
5
6 use IO::Handle;
7 use IO::File;
8 use File::Tail;
9 use Parse::Syslog;
10 use RRDs;
11
12 our %dohosts;
13 our $outpfx= './';
14 our $sep= '_';
15 our $debug= 0;
16 our $interval= 30;
17
18 open DEBUG, ">/dev/null" or die $!;
19
20 while (@ARGV && $ARGV[0] =~ m/^\-./) {
21     $_= shift @ARGV;
22     last if $_ eq '--';
23     while (m/^\-./) {
24         if (s/^-h//) { $dohosts{$_}=1; last; }
25         elsif (s/^-O//) { $outpfx=$_; last; }
26         elsif (s/^-s//) { $sep=$_; last; }
27         elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
28         elsif (s/^-D/-/) { $debug++; }
29         else { die "bad option $_"; }
30     }
31 }
32
33 if (!keys %dohosts) {
34     my ($sysname, $nodename) = POSIX::uname();
35     die unless defined $nodename;
36     $dohosts{$nodename}= 1;
37 }
38
39 die unless @ARGV;
40 my $totail= pop @ARGV;
41
42 if ($debug) { open DEBUG, ">& STDERR" or die $!; }
43
44 our @fields_in= qw(seconds accepted refused rejected duplicate
45                    accepted_size duplicate_size);
46 our $tmpl_in= join ':',@fields_in;
47 our %details= (
48     'in' => {
49         Step => 60,
50         DstArguments => "7200:0:U",
51         Xff => 0.5,
52         Archives => [ [ 3600*4,           60 ],   # 4hr, 1min resolution
53                       [ 3600*25,         180 ],   # 25h, 3min resolution
54                       [ 86400*14*5,      3600 ],  # 14wks, 1hr resolution
55                       [ 86400*370*2, 3600*24 ] ], # 2yr+, 1day resolution
56     }
57 );
58
59 our ($time,$host,$peer,$conn,$stats);
60
61 sub create_rrd ($$$) {
62     my ($inout, $path, $fields) = @_;
63
64     my $details= $details{$inout};
65
66     my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
67     my @largs;
68     push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}" foreach @$fields;
69     foreach (@{ $details->{Archives} }) {
70         my ($whole,$reso) = @$_;
71         my $steps= $reso / $details->{Step};
72         my $rows= $whole / $reso;
73         push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
74     } 
75     print DEBUG join("            \\\n  ", "creating @sargs", @largs),"\n";
76     RRDs::create(@sargs,@largs);
77     my $err= RRDs::error;
78     die "$err [@sargs @largs]" if defined $err;
79 }
80
81 sub get_rrd_info ($$) {
82     my ($rrdupdate, $path) = @_;
83     my $h= RRDs::info($path);
84     die "$path $! ".(RRDs::error) unless $h;
85     die "$path ?" unless $h->{'last_update'};
86     $rrdupdate->{DoneUpto}= $h->{'last_update'};
87 }
88
89 sub find_or_create_rrd ($$) {
90     my ($inout, $fields) = @_;
91     my $rrd= {
92         Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
93     };
94     if (stat $rrd->{Path}) {
95         get_rrd_info($rrd, $rrd->{Path});
96     } else {
97         $!==&ENOENT or die "$rrd->{Path} $!";
98         create_rrd($inout, $rrd->{Path}, $fields);
99         $rrd->{DoneUpto}= 0;
100     }
101     return $rrd;
102 }
103
104 our %rrds;
105 our @rrd_blockedupdates;
106 our $rrd_blockedupdate_time;
107
108 sub update_rrd ($$$$) {
109     my ($inout,$tmpl,$vals,$fields) = @_;
110
111     my $rrd= $rrds{$host,$peer,$inout};
112     if (!$rrd) {
113         $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout, $fields);
114         $rrd->{Template}= $tmpl;
115     }
116     return if $time <= $rrd->{DoneUpto};
117
118     my $blocked= $rrd->{BlockedUpdate};
119     if (defined $blocked) {
120         for (my $ix=0; $ix<@$vals; $ix++) {
121             my $old= $blocked->[$ix];
122             my $new= $vals->[$ix];
123             $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
124         }
125         return;
126     }
127     $rrd->{BlockedUpdate}= $vals;
128     $rrd_blockedupdate_time= $time;
129     push @rrd_blockedupdates, $rrd;
130 }
131
132 sub actually_update_rrds () {
133     return unless defined $rrd_blockedupdate_time;
134     return if $time == $rrd_blockedupdate_time;
135
136     while (my $rrd= shift @rrd_blockedupdates) {
137         my $vals= $rrd->{BlockedUpdate};
138         next unless $vals;
139         delete $rrd->{BlockedUpdate};
140
141         my @args= ($rrd->{Path}, '--template',$rrd->{Template},
142                    join(':',$rrd_blockedupdate_time,@$vals));
143         print DEBUG "update @args\n" if $debug>=2;
144         RRDs::update(@args);
145         my $err= RRDs::error;
146         die "$err [@args]" if defined $err;
147     }
148
149     $rrd_blockedupdate_time= undef;
150 }
151
152 our %in_conns;
153
154 sub inbound_connected () {
155     print DEBUG "inbound connected $host $peer $conn\n" if $debug>=2;
156     $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
157 }
158 sub inbound_closed () {
159     print DEBUG "inbound closed $host $peer $conn\n" if $debug>=2;
160     delete $in_conns{$host,$peer,$conn};
161 }
162 sub inbound_stats () {
163     $_= $stats.' ';
164     my %s;
165     s/(?<=[a-z]) (?=[a-z])/_/g;
166     my $hpc= $in_conns{$host,$peer,$conn};
167     if (!$hpc) {
168         print DEBUG "inbound UNKNOWN $host $peer $conn $stats\n";
169         $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
170     } else {
171         print DEBUG "inbound stats $host $peer $conn $stats\n" if $debug>=2;
172     }
173     while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
174     my @v;
175     foreach my $f (@fields_in) {
176         my $this= $s{$f};
177         if (!defined $this) {
178             delete $hpc->[@v];
179             push @v, 'U';
180             next;
181         }
182         my $last= $hpc->[@v];
183         $hpc->[@v]= $this;
184         push @v, defined($last) ? $this - $last : 'U';
185     }
186     update_rrd('in',$tmpl_in,\@v,\@fields_in);
187 }
188
189 sub outbound_stats () {
190     print "$host $peer OUT $stats\n";
191 }
192
193 sub run ($) {
194     my ($object) = @_;
195     my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
196     my $host_re= '[-.0-9a-z]+';
197     my $conn_re= '[1-9]\d{0,5}';
198     my ($process,$pid,$msg,$cc,$sl);
199     while ($sl= $parser->next) {
200         ($time,$host,$process,$pid,$msg) = @$sl;
201         actually_update_rrds();
202         next unless exists $dohosts{$host};
203         #print join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n";
204         if ($process eq 'innd' && !defined $pid) {
205             if (($peer,$conn) = $msg =~
206                 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
207                 inbound_connected()
208             } elsif (($peer,$conn,$cc,$stats) = $msg =~
209             m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
210                 inbound_stats();
211                 inbound_closed() if $cc eq 'closed';
212             }
213         } elsif ($process eq 'innduct') {
214             if (($peer,$stats) = $msg =~
215                 m/^($host_re)\| (?:completed|processed) \S+ (read=.*)$/) {
216                 outbound_stats();
217             }
218         }
219     }
220 }
221
222 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) 
223
224         
225 foreach my $staticpath (@ARGV) {
226     if ($staticpath =~ m/\.gz$/) {
227         my $fh= new IO::Handle;
228         open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
229         run($fh);
230         !$fh->error or die "$staticpath $!";
231         $!=0;$?=0; close $fh or die "$staticpath $! $?";
232     } else {
233         my $fh= new IO::File $staticpath, '<' or die $!;
234         run($staticpath);
235         !$fh->error or die "$staticpath $!";
236         close $fh or die "$staticpath $!";
237     }
238 }
239
240 exit 0 if $totail eq '';
241
242 my $tailer= new File::Tail name=>$totail,
243     interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
244     or die "$totail $!";
245
246 run($tailer);
247
248 die "huh?";