chiark / gitweb /
better grouping of news entries
[rrd-graphs.git] / newstailer
1 #!/usr/bin/perl -w
2
3 # Program for updating an rrd with info from innduct and inn logs
4 # Needs to be run once inside a lock
5
6 # rrd-graphs/newstailer - part of rrd-graphs, a tool for online graphs
7 # Copyright 2010, 2012 Ian Jackson
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Affero General Public License as
11 # published by the Free Software Foundation, either version 3 of the
12 # License, or (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU Affero General Public License for more details.
18 #
19 # You should have received a copy of the GNU Affero General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21
22
23 # killall newstailer
24 # with-lock-ex -f data/news/lock sh -xc \
25 # "rm data/news/*.rrd; ./newstailer -Odata/news/ -D \
26 # /var/log/news/news.notice.{6,5,4,3,2,1}.gz /var/log/news/news.notice{.0,} ''"
27
28 use strict qw(refs vars);
29 use POSIX;
30
31 use IO::Handle;
32 use IO::File;
33 use File::Tail;
34 use Parse::Syslog;
35 use RRDs;
36
37 our %dohosts;
38 our $outpfx= './';
39 our $sep= '_';
40 our $debug= 0;
41 our $interval= 30;
42
43 open DEBUG, ">/dev/null" or die $!;
44
45 while (@ARGV && $ARGV[0] =~ m/^\-./) {
46     $_= shift @ARGV;
47     last if $_ eq '--';
48     while (m/^\-./) {
49         if (s/^-h//) { $dohosts{$_}=1; last; }
50         elsif (s/^-O//) { $outpfx=$_; last; }
51         elsif (s/^-s//) { $sep=$_; last; }
52         elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
53         elsif (s/^-D/-/) { $debug++; }
54         else { die "bad option $_"; }
55     }
56 }
57
58 if (!keys %dohosts) {
59     my ($sysname, $nodename) = POSIX::uname();
60     die unless defined $nodename;
61     $dohosts{$nodename}= 1;
62 }
63
64 die unless @ARGV;
65 my $totail= pop @ARGV;
66
67 if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
68
69 our %details;
70
71
72 our @detail_defaults=
73     (
74      Step => 60,
75      DstArguments => "7200:0:U",
76      Xff => 0.5,
77      Archives => [ [ 3600*4,            60 ],   # 4hr, 1min resolution
78                    [ 3600*25,          180 ],   # 25h, 3min resolution
79                    [ 86400*8,          600 ],   #  8d, 10min resolution
80                    [ 86400*7*14,    3600*2 ],   # 14wks, 2hr resolution
81                    [ 86400*366,     3600*6 ],   # 1yr, 6hr resolution
82                    [ 86400*366*3,   3600*24 ] ], # 3yr, 1d resolution
83      );
84
85 our @fields_in= qw(seconds accepted refused rejected duplicate
86                    accepted_size duplicate_size);
87 $details{'in'}= {
88     Fields => \@fields_in,
89     @detail_defaults
90 };
91
92 our @fields_out= qw(missing offered deferred
93                     accepted unwanted rejected body_missing);
94 $details{'out'}= {
95     Fields => \@fields_out,
96     @detail_defaults
97 };
98
99
100 our ($time,$host,$peer,$conn,$stats);
101
102 sub create_rrd ($$) {
103     my ($inout, $path) = @_;
104
105     my $details= $details{$inout};
106
107     my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
108     my @largs;
109     push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
110         foreach @{ $details{$inout}{Fields} };
111     foreach (@{ $details->{Archives} }) {
112         my ($whole,$reso) = @$_;
113         my $steps= $reso / $details->{Step};
114         my $rows= $whole / $reso;
115         push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
116     } 
117     print DEBUG join("            \\\n  ", "creating @sargs", @largs),"\n";
118     RRDs::create(@sargs,@largs);
119     my $err= RRDs::error;
120     die "$err [@sargs @largs]" if defined $err;
121 }
122
123 sub get_rrd_info ($$) {
124     my ($rrdupdate, $path) = @_;
125     my $h= RRDs::info($path);
126     die "$path $! ".(RRDs::error) unless $h;
127     die "$path ?" unless $h->{'last_update'};
128     $rrdupdate->{DoneUpto}= $h->{'last_update'};
129 }
130
131 sub find_or_create_rrd ($) {
132     my ($inout) = @_;
133     my $rrd= {
134         Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
135     };
136     if (stat $rrd->{Path}) {
137         get_rrd_info($rrd, $rrd->{Path});
138     } else {
139         $!==&ENOENT or die "$rrd->{Path} $!";
140         create_rrd($inout, $rrd->{Path});
141         $rrd->{DoneUpto}= 0;
142     }
143     return $rrd;
144 }
145
146 our %rrds;
147 our @rrd_blockedupdates;
148 our $rrd_blockedupdate_time;
149
150 sub update_rrd ($$) {
151     my ($inout,$vals) = @_;
152
153     my $rrd= $rrds{$host,$peer,$inout};
154     if (!$rrd) {
155         $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
156         $rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
157     }
158     return if $time <= $rrd->{DoneUpto};
159
160     my $blocked= $rrd->{BlockedUpdate};
161     if (defined $blocked) {
162         for (my $ix=0; $ix<@$vals; $ix++) {
163             my $old= $blocked->[$ix];
164             my $new= $vals->[$ix];
165             $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
166         }
167         return;
168     }
169     $rrd->{BlockedUpdate}= $vals;
170     $rrd_blockedupdate_time= $time;
171     push @rrd_blockedupdates, $rrd;
172 }
173
174 sub actually_update_rrds () {
175     return unless defined $rrd_blockedupdate_time;
176     return if $time == $rrd_blockedupdate_time;
177
178     while (my $rrd= shift @rrd_blockedupdates) {
179         my $vals= $rrd->{BlockedUpdate};
180         next unless $vals;
181         delete $rrd->{BlockedUpdate};
182
183         my @args= ($rrd->{Path}, '--template',$rrd->{Template},
184                    join(':',$rrd_blockedupdate_time,@$vals));
185         print DEBUG "update @args\n" if $debug>=2;
186         RRDs::update(@args);
187         my $err= RRDs::error;
188         die "$err [@args]" if defined $err;
189     }
190
191     $rrd_blockedupdate_time= undef;
192 }
193
194 our %in_conns;
195
196 sub inbound_connected () {
197     print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
198     $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
199 }
200 sub inbound_closed () {
201     print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
202     delete $in_conns{$host,$peer,$conn};
203 }
204 sub inbound_stats () {
205     $_= $stats.' ';
206     s/(?<=[a-z]) (?=[a-z])/_/g;
207     my $hpc= $in_conns{$host,$peer,$conn};
208     if (!$hpc) {
209         print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
210         $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
211     } else {
212         print DEBUG "inbound $time stats $host $peer $conn $stats\n"
213             if $debug>=2;
214     }
215     my %s;
216     while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
217     my @v;
218     foreach my $f (@fields_in) {
219         my $this= $s{$f};
220         if (!defined $this) {
221             delete $hpc->[@v];
222             push @v, 'U';
223             next;
224         }
225         my $last= $hpc->[@v];
226         $hpc->[@v]= $this;
227         push @v, defined($last) ? $this - $last : 'U';
228     }
229     update_rrd('in',\@v);
230 }
231
232 sub outbound_stats () {
233     print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
234     $_= " $stats ";
235     s/missing(?=\=\d+ \()/body_missing/;
236     s/\([^()]*\)/ /;
237     my %s;
238     while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
239     my @v;
240     foreach my $f (@fields_out) {
241         my $this= $s{$f};
242         push @v, defined($this) ? $this : 'U';
243     }
244     update_rrd('out',\@v);
245 }
246
247 sub run ($) {
248     my ($object) = @_;
249     my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
250     my $host_re= '[-.0-9a-z]+';
251     my $conn_re= '[1-9]\d{0,5}';
252     my ($process,$pid,$msg,$cc,$sl);
253     while ($sl= $parser->next) {
254         ($time,$host,$process,$pid,$msg) = @$sl;
255         actually_update_rrds();
256         next unless exists $dohosts{$host};
257         print DEBUG "logfile ",
258             join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n"
259                 if $debug>=3;
260         if ($process eq 'innd' && !defined $pid) {
261             if (($peer,$conn) = $msg =~
262                 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
263                 inbound_connected()
264             } elsif (($peer,$conn,$cc,$stats) = $msg =~
265      m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
266                 inbound_stats();
267                 inbound_closed() if $cc eq 'closed';
268             }
269         } elsif ($process eq 'innduct') {
270             if (($peer,$stats) = $msg =~
271      m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
272                 outbound_stats();
273             }
274         }
275     }
276 }
277
278 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) 
279
280         
281 foreach my $staticpath (@ARGV) {
282     if ($staticpath =~ m/\.gz$/) {
283         my $fh= new IO::Handle;
284         open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
285         run($fh);
286         !$fh->error or die "$staticpath $!";
287         $!=0;$?=0; close $fh or die "$staticpath $! $?";
288     } else {
289         my $fh= new IO::File $staticpath, '<' or die $!;
290         run($staticpath);
291         !$fh->error or die "$staticpath $!";
292         close $fh or die "$staticpath $!";
293     }
294 }
295
296 exit 0 if $totail eq '';
297
298 my $tailer= new File::Tail name=>$totail,
299     interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
300     or die "$totail $!";
301
302 run($tailer);
303
304 die "huh?";