chiark / gitweb /
new invoke script
[rrd-graphs.git] / newstailer
1 #!/usr/bin/perl -w
2
3 # killall newstailer
4 # with-lock-ex -f data/news/lock sh -xc \
5 # "rm data/news/*.rrd; ./newstailer -Odata/news/ -D \
6 # /var/log/news/news.notice.{6,5,4,3,2,1}.gz /var/log/news/news.notice{.0,} ''"
7
8 use strict qw(refs vars);
9 use POSIX;
10
11 use IO::Handle;
12 use IO::File;
13 use File::Tail;
14 use Parse::Syslog;
15 use RRDs;
16
17 our %dohosts;
18 our $outpfx= './';
19 our $sep= '_';
20 our $debug= 0;
21 our $interval= 30;
22
23 open DEBUG, ">/dev/null" or die $!;
24
25 while (@ARGV && $ARGV[0] =~ m/^\-./) {
26     $_= shift @ARGV;
27     last if $_ eq '--';
28     while (m/^\-./) {
29         if (s/^-h//) { $dohosts{$_}=1; last; }
30         elsif (s/^-O//) { $outpfx=$_; last; }
31         elsif (s/^-s//) { $sep=$_; last; }
32         elsif (s/^-i([1-9]\d{0,5})$//) { $interval= $1; }
33         elsif (s/^-D/-/) { $debug++; }
34         else { die "bad option $_"; }
35     }
36 }
37
38 if (!keys %dohosts) {
39     my ($sysname, $nodename) = POSIX::uname();
40     die unless defined $nodename;
41     $dohosts{$nodename}= 1;
42 }
43
44 die unless @ARGV;
45 my $totail= pop @ARGV;
46
47 if ($debug) { open DEBUG, ">& STDOUT" or die $!; }
48
49 our %details;
50
51
52 our @detail_defaults=
53     (
54      Step => 60,
55      DstArguments => "7200:0:U",
56      Xff => 0.5,
57      Archives => [ [ 3600*4,            60 ],   # 4hr, 1min resolution
58                    [ 3600*25,          180 ],   # 25h, 3min resolution
59                    [ 86400*8,          600 ],   #  8d, 10min resolution
60                    [ 86400*7*14,    3600*2 ],   # 14wks, 2hr resolution
61                    [ 86400*366,     3600*6 ],   # 1yr, 6hr resolution
62                    [ 86400*366*3,   3600*24 ] ], # 3yr, 1d resolution
63      );
64
65 our @fields_in= qw(seconds accepted refused rejected duplicate
66                    accepted_size duplicate_size);
67 $details{'in'}= {
68     Fields => \@fields_in,
69     @detail_defaults
70 };
71
72 our @fields_out= qw(missing offered deferred
73                     accepted unwanted rejected body_missing);
74 $details{'out'}= {
75     Fields => \@fields_out,
76     @detail_defaults
77 };
78
79
80 our ($time,$host,$peer,$conn,$stats);
81
82 sub create_rrd ($$) {
83     my ($inout, $path) = @_;
84
85     my $details= $details{$inout};
86
87     my @sargs= ($path, '--start','now-1y', '--step',$details->{Step});
88     my @largs;
89     push @largs, "DS:$_:ABSOLUTE:$details->{DstArguments}"
90         foreach @{ $details{$inout}{Fields} };
91     foreach (@{ $details->{Archives} }) {
92         my ($whole,$reso) = @$_;
93         my $steps= $reso / $details->{Step};
94         my $rows= $whole / $reso;
95         push @largs, "RRA:AVERAGE:$details->{Xff}:$steps:$rows";
96     } 
97     print DEBUG join("            \\\n  ", "creating @sargs", @largs),"\n";
98     RRDs::create(@sargs,@largs);
99     my $err= RRDs::error;
100     die "$err [@sargs @largs]" if defined $err;
101 }
102
103 sub get_rrd_info ($$) {
104     my ($rrdupdate, $path) = @_;
105     my $h= RRDs::info($path);
106     die "$path $! ".(RRDs::error) unless $h;
107     die "$path ?" unless $h->{'last_update'};
108     $rrdupdate->{DoneUpto}= $h->{'last_update'};
109 }
110
111 sub find_or_create_rrd ($) {
112     my ($inout) = @_;
113     my $rrd= {
114         Path => "${outpfx}${host}${sep}${peer}_${inout}.rrd",
115     };
116     if (stat $rrd->{Path}) {
117         get_rrd_info($rrd, $rrd->{Path});
118     } else {
119         $!==&ENOENT or die "$rrd->{Path} $!";
120         create_rrd($inout, $rrd->{Path});
121         $rrd->{DoneUpto}= 0;
122     }
123     return $rrd;
124 }
125
126 our %rrds;
127 our @rrd_blockedupdates;
128 our $rrd_blockedupdate_time;
129
130 sub update_rrd ($$) {
131     my ($inout,$vals) = @_;
132
133     my $rrd= $rrds{$host,$peer,$inout};
134     if (!$rrd) {
135         $rrd= $rrds{$host,$peer,$inout}= find_or_create_rrd($inout);
136         $rrd->{Template}= join ':', @{ $details{$inout}{Fields} };
137     }
138     return if $time <= $rrd->{DoneUpto};
139
140     my $blocked= $rrd->{BlockedUpdate};
141     if (defined $blocked) {
142         for (my $ix=0; $ix<@$vals; $ix++) {
143             my $old= $blocked->[$ix];
144             my $new= $vals->[$ix];
145             $blocked->[$ix]= ($old eq 'U' || $new eq 'U') ? 'U' : $old + $new;
146         }
147         return;
148     }
149     $rrd->{BlockedUpdate}= $vals;
150     $rrd_blockedupdate_time= $time;
151     push @rrd_blockedupdates, $rrd;
152 }
153
154 sub actually_update_rrds () {
155     return unless defined $rrd_blockedupdate_time;
156     return if $time == $rrd_blockedupdate_time;
157
158     while (my $rrd= shift @rrd_blockedupdates) {
159         my $vals= $rrd->{BlockedUpdate};
160         next unless $vals;
161         delete $rrd->{BlockedUpdate};
162
163         my @args= ($rrd->{Path}, '--template',$rrd->{Template},
164                    join(':',$rrd_blockedupdate_time,@$vals));
165         print DEBUG "update @args\n" if $debug>=2;
166         RRDs::update(@args);
167         my $err= RRDs::error;
168         die "$err [@args]" if defined $err;
169     }
170
171     $rrd_blockedupdate_time= undef;
172 }
173
174 our %in_conns;
175
176 sub inbound_connected () {
177     print DEBUG "inbound $time connected $host $peer $conn\n" if $debug>=2;
178     $in_conns{$host,$peer,$conn} = [ (0) x @fields_in ];
179 }
180 sub inbound_closed () {
181     print DEBUG "inbound $time closed $host $peer $conn\n" if $debug>=2;
182     delete $in_conns{$host,$peer,$conn};
183 }
184 sub inbound_stats () {
185     $_= $stats.' ';
186     s/(?<=[a-z]) (?=[a-z])/_/g;
187     my $hpc= $in_conns{$host,$peer,$conn};
188     if (!$hpc) {
189         print DEBUG "inbound $time UNKNOWN $host $peer $conn $stats\n";
190         $in_conns{$host,$peer,$conn}= $hpc= [ (undef) x @fields_in ];
191     } else {
192         print DEBUG "inbound $time stats $host $peer $conn $stats\n"
193             if $debug>=2;
194     }
195     my %s;
196     while (s/^([a-z_]+) (\d+)\s//) { $s{$1}= $2; }
197     my @v;
198     foreach my $f (@fields_in) {
199         my $this= $s{$f};
200         if (!defined $this) {
201             delete $hpc->[@v];
202             push @v, 'U';
203             next;
204         }
205         my $last= $hpc->[@v];
206         $hpc->[@v]= $this;
207         push @v, defined($last) ? $this - $last : 'U';
208     }
209     update_rrd('in',\@v);
210 }
211
212 sub outbound_stats () {
213     print DEBUG "outbound $time stats $host $peer $stats\n" if $debug>=2;
214     $_= " $stats ";
215     s/missing(?=\=\d+ \()/body_missing/;
216     s/\([^()]*\)/ /;
217     my %s;
218     while (s/ ([a-z]\w+)\=(\d+) / /) { $s{$1}= $2; }
219     my @v;
220     foreach my $f (@fields_out) {
221         my $this= $s{$f};
222         push @v, defined($this) ? $this : 'U';
223     }
224     update_rrd('out',\@v);
225 }
226
227 sub run ($) {
228     my ($object) = @_;
229     my $parser= new Parse::Syslog $object, repeat=>0, arrayref=>1;
230     my $host_re= '[-.0-9a-z]+';
231     my $conn_re= '[1-9]\d{0,5}';
232     my ($process,$pid,$msg,$cc,$sl);
233     while ($sl= $parser->next) {
234         ($time,$host,$process,$pid,$msg) = @$sl;
235         actually_update_rrds();
236         next unless exists $dohosts{$host};
237         print DEBUG "logfile ",
238             join("|", map { defined($_) ? $_ : "<undef>" } @$sl), "\n"
239                 if $debug>=3;
240         if ($process eq 'innd' && !defined $pid) {
241             if (($peer,$conn) = $msg =~
242                 m/^($host_re) connected ($conn_re)(?: streaming allowed)?$/) {
243                 inbound_connected()
244             } elsif (($peer,$conn,$cc,$stats) = $msg =~
245      m/^($host_re):($conn_re) (closed|checkpoint) (seconds .*)$/) {
246                 inbound_stats();
247                 inbound_closed() if $cc eq 'closed';
248             }
249         } elsif ($process eq 'innduct') {
250             if (($peer,$stats) = $msg =~
251      m/^($host_re)\| notice: (?:completed|processed) \S+ (read=.*)$/) {
252                 outbound_stats();
253             }
254         }
255     }
256 }
257
258 #seconds (\d+) accepted (\d+) refused (\d+) rejected (\d+) duplicate (\d+) accepted size (\d+) duplicate size (\d+) 
259
260         
261 foreach my $staticpath (@ARGV) {
262     if ($staticpath =~ m/\.gz$/) {
263         my $fh= new IO::Handle;
264         open $fh, '-|', 'gunzip', '-c', '--', $staticpath or die $!;
265         run($fh);
266         !$fh->error or die "$staticpath $!";
267         $!=0;$?=0; close $fh or die "$staticpath $! $?";
268     } else {
269         my $fh= new IO::File $staticpath, '<' or die $!;
270         run($staticpath);
271         !$fh->error or die "$staticpath $!";
272         close $fh or die "$staticpath $!";
273     }
274 }
275
276 exit 0 if $totail eq '';
277
278 my $tailer= new File::Tail name=>$totail,
279     interval=>$interval, adjustafter=>2, ignore_nonexistant=>1, tail=>-1
280     or die "$totail $!";
281
282 run($tailer);
283
284 die "huh?";