3 * tailing reliable realtime streaming feeder for inn
5 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * (I believe that when you compile and link this as part of the inn2
21 * build, with the Makefile runes I have provided, all the libraries
22 * and files which end up included in innduct are licence-compatible
23 * with GPLv3. If not then please let me know. -Ian Jackson.)
38 #include "inn/innconf.h"
39 #include "inn/messages.h"
42 #include <sys/types.h>
45 #include <sys/socket.h>
66 /*----- general definitions, probably best not changed -----*/
68 #define CONNCHILD_ESTATUS_STREAM 24
69 #define CONNCHILD_ESTATUS_NOSTREAM 25
71 #define INNDCOMMCHILD_ESTATUS_FAIL 26
72 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
74 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
75 #define MAX_CLI_COMMAND 1000
77 #define VA va_list al; va_start(al,fmt)
78 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
79 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
80 #define NORET __attribute__((__noreturn__))
82 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
83 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
85 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
87 #define FOR_LIST_NODE(nodevar, list) \
88 for ((nodevar)=LIST_HEAD(list); (nodevar); (nodevar)=LIST_NEXT((nodevar)))
90 #define FOR_CONN(conn) FOR_LIST_NODE(conn, conns)
92 /*----- doubly linked lists -----*/
94 #define ISNODE(T) struct node list_node
97 union { struct list li; T *for_type; } u; \
101 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
103 #define LIST_CHECKCANHAVENODE(l,n) \
104 ((void)((n) == ((l).u.for_type))) /* just for the type check */
106 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
107 ( LIST_CHECKCANHAVENODE(l,n), \
108 list_addsomehow(&(l).u.li, NODE((n))), \
112 #define LIST_REMSOMEHOW(l,list_remsomehow) \
113 ( (typeof((l).u.for_type)) \
116 list_remsomehow(&(l).u.li) ) \
122 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
123 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
124 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
125 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
127 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
128 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
129 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
130 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
132 #define LIST_REMOVE(l,n) \
133 ( LIST_CHECKCANHAVENODE(l,n), \
134 list_remove(NODE((n))), \
138 #define LIST_INSERT(l,n,pred) \
139 ( LIST_CHECKCANHAVENODE(l,n), \
140 LIST_CHECKCANHAVENODE(l,pred), \
141 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
145 /*----- type predeclarations -----*/
147 typedef struct Conn Conn;
148 typedef struct Article Article;
149 typedef struct InputFile InputFile;
150 typedef struct XmitDetails XmitDetails;
151 typedef struct Filemon_Perfile Filemon_Perfile;
152 typedef enum StateMachineState StateMachineState;
153 typedef struct CliCommand CliCommand;
159 /*----- configuration options -----*/
161 extern const char *sitename, *remote_host;
162 extern const char *feedfile, *path_run, *path_cli, *path_cli_dir;
163 extern int quiet_multiple, interactive, try_filemon, try_stream, port;
164 extern const char *inndconffile;
166 extern int max_connections, max_queue_per_conn, target_max_feedfile_size;
167 extern int period_seconds, filepoll_seconds, max_queue_per_ipf;
168 extern int connection_setup_timeout, inndcomm_flush_timeout;
170 extern double nocheck_thresh;
171 extern double nocheck_decay;
173 /* all these are initialised to seconds, and converted to periods in main */
174 extern int reconnect_delay_periods;
175 extern int flushfail_retry_periods;
176 extern int backlog_retry_minperiods;
177 extern int backlog_spontrescan_periods;
178 extern int spontaneous_flush_periods;
179 extern int max_separated_periods;
180 extern int need_activity_periods;
181 extern int stats_log_periods;
182 extern int lowvol_thresh;
183 extern int lowvol_periods;
185 extern double max_bad_data_ratio;
186 extern int max_bad_data_initial;
189 /*----- article states, and statistics -----*/
191 typedef enum { /* in queue in conn->sent */
192 art_Unchecked, /* not checked, not sent checking */
193 art_Wanted, /* checked, wanted sent body as requested */
194 art_Unsolicited, /* - sent body without check */
197 extern const char *const artstate_names[]; /* xmit.c */
199 #define RESULT_COUNTS(RCS,RCN) \
208 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
209 #define RCI_TRIPLE_VALS_BASE(counts,x) \
210 counts[art_Unchecked] x \
211 + counts[art_Wanted] x \
212 + counts[art_Unsolicited] x, \
213 counts[art_Unchecked] x \
214 , counts[art_Wanted] x \
215 , counts[art_Unsolicited] x
218 #define RC_INDEX(x) RC_##x,
219 RESULT_COUNTS(RC_INDEX, RC_INDEX)
224 read_ok, read_blank, read_err, nooffer_missing,
228 /*----- transmission buffers -----*/
244 /*----- core operational data structure types -----*/
247 int results[art_MaxState][RCI_max];
252 /* This is also an instance of struct oop_readable */
253 struct oop_readable readable; /* first */
254 oop_readable_call *readable_callback;
255 void *readable_callback_user;
258 Filemon_Perfile *filemon;
260 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
262 int skippinglong, paused, fake_readable;
265 long inprogress; /* includes queue.count and also articles in conns */
266 long autodefer; /* -1 means not doing autodefer */
285 int fd; /* may be 0, meaning closed (during construction/destruction) */
286 oop_read *rd; /* likewise */
287 int oopwriting; /* since on_fd is not idempotent */
288 int max_queue, stream;
289 const char *quitting;
290 int since_activity; /* periods */
291 ArticleList waiting; /* not yet told peer */
292 ArticleList priority; /* peer says send it now */
293 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
294 struct iovec xmit[CONNIOVS];
295 XmitDetails xmitd[CONNIOVS];
299 #define SMS_LIST(X) \
307 enum StateMachineState {
308 #define SMS_DEF_ENUM(s) sm_##s,
309 SMS_LIST(SMS_DEF_ENUM)
312 extern const char *sms_names[];
314 /*========== function declarations ==========*/
316 /*----- help.c -----*/
318 void syscrash(const char *fmt, ...) NORET_PRINTF(1,2);
319 void crash(const char *fmt, ...) NORET_PRINTF(1,2);
320 void info(const char *fmt, ...) PRINTF(1,2);
321 void dbg(const char *fmt, ...) PRINTF(1,2);
323 void logv(int sysloglevel, const char *pfx, int errnoval,
324 const char *fmt, va_list al) PRINTF(5,0);
326 char *mvasprintf(const char *fmt, va_list al) PRINTF(1,0);
327 char *masprintf(const char *fmt, ...) PRINTF(1,2);
329 int close_perhaps(int *fd);
330 void xclose(int fd, const char *what, const char *what2);
331 void xclose_perhaps(int *fd, const char *what, const char *what2);
332 pid_t xfork(const char *what); /* also runs postfork in child */
333 pid_t xfork_bare(const char *what);
335 void on_fd_read_except(int fd, oop_call_fd callback);
336 void cancel_fd_read_except(int fd);
338 void report_child_status(const char *what, int status);
339 int xwaitpid(pid_t *pid, const char *what);
341 void *zxmalloc(size_t sz);
342 void xunlink(const char *path, const char *what);
344 void xsigaction(int signo, const struct sigaction *sa);
345 void xsigsetdefault(int signo);
346 void raise_default(int signo) NORET;
348 void xgettimeofday(struct timeval *tv_r);
349 void xsetnonblock(int fd, int nonb);
351 void check_isreg(const struct stat *stab, const char *path,
353 void xfstat(int fd, struct stat *stab_r, const char *what);
354 void xfstat_isreg(int fd, struct stat *stab_r,
355 const char *path, const char *what);
356 void xlstat_isreg(const char *path, struct stat *stab,
357 int *enoent_r /* 0 means ENOENT is fatal */,
359 int samefile(const struct stat *a, const struct stat *b);
361 char *sanitise(const char *input, int len);
363 static inline int isewouldblock(int errnoval) {
364 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
367 #define INNLOGSETS(INNLOGSET) \
368 INNLOGSET(die, "fatal", LOG_ERR) \
369 INNLOGSET(warn, "warning", LOG_WARNING) \
370 INNLOGSET(notice, "notice", LOG_NOTICE) \
371 INNLOGSET(trace, "trace", LOG_NOTICE)
372 #define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \
373 void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \
375 INNLOGSETS(INNLOGSET_DECLARE)
377 /*----- duct.c -----*/
382 /*----- cli.c -----*/
385 void cli_stdio(void);
387 /*----- conn.c -----*/
389 void conn_closefd(Conn *conn, const char *msgprefix);
390 void check_idle_conns(void);
391 int conn_busy(Conn *conn);
392 void conn_dispose(Conn *conn);
394 void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
395 void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
397 void notice_conns_more(const char *new_kind);
398 void notice_conns_fewer(void);
399 void notice_conns_stats(void);
401 int allow_connect_start(void);
402 void connect_start(void);
404 /*----- defer.c -----*/
406 void poll_backlog_file(void);
407 void search_backlog_file(void);
408 void open_defer(void);
409 void close_defer(void);
411 /*----- filemon.c -----*/
413 int filemon_method_init(void);
414 void filemon_method_dump_info(FILE *f);
416 void filemon_start(InputFile *ipf);
417 void filemon_stop(InputFile *ipf);
419 /*----- infile.c -----*/
423 void inputfile_reading_start(InputFile *ipf);
424 void inputfile_reading_stop(InputFile *ipf);
425 void inputfile_reading_pause(InputFile *ipf);
426 void inputfile_reading_resume(InputFile *ipf);
427 /* pause and resume are idempotent, and no-op if not done _reading_start */
429 InputFile *open_input_file(const char *path);
430 void close_input_file(InputFile *ipf); /* does not free */
431 char *dbg_report_ipf(InputFile *ipf);
433 void tailing_make_readable(InputFile *ipf);
435 /*----- recv.c -----*/
437 extern const oop_rd_style peer_rd_style;
438 oop_rd_call peer_rd_ok, peer_rd_err;
440 void article_done(Article *art, int whichcount);
442 /*----- statemc.c -----*/
444 void statemc_check_flushing_done(void);
445 void statemc_check_backlog_done(void);
447 extern sig_atomic_t terminate_sig_flag;
448 void statemc_period_poll(void);
449 void statemc_lock(void);
450 void init_signals(void);
451 void statemc_init(void);
452 void showstats(void);
454 #define SMS(newstate, periods, why) \
455 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
456 void statemc_setstate(StateMachineState newsms, int periods,
457 const char *forlog, const char *why);
459 void statemc_start_flush(const char *why); /* Normal => Flushing */
460 void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
461 int trigger_flush_ok(const char *why /* 0 means timeout */);
462 /* => Flushing,FLUSHING, ret 1; or ret 0 */
464 void preterminate(void);
466 /*----- xmit.c -----*/
468 void inputfile_queue_check_expired(InputFile *ipf);
469 void check_assign_articles(void);
470 void queue_check_input_done(void);
471 void check_reading_pause_resume(InputFile *ipf);
473 void conn_maybe_write(Conn *conn);
474 void conn_make_some_xmits(Conn *conn);
475 void *conn_write_some_xmits(Conn *conn);
477 void xmit_free(XmitDetails *d);
479 int article_check_expired(Article *art /* must be queued, not conn */);
480 void article_autodefer(InputFile *ipf, Article *art);
481 void article_defer(Article *art /* not on a queue */, int whichcount);
482 void autodefer_input_file(InputFile *ipf);
484 /*----- external linkage for debug/dump only -----*/
486 extern pid_t connecting_child;
487 extern int connecting_fdpass_sock;
488 extern pid_t inndcomm_child;
490 /*========== general operational variables ==========*/
493 extern oop_source *loop;
494 extern ConnList conns;
495 extern char *path_lock, *path_flushing, *path_defer, *path_dump;
496 extern char *globpat_backlog;
497 extern pid_t self_pid;
498 extern int *lowvol_perperiod;
499 extern int lowvol_circptr;
500 extern int lowvol_total; /* does not include current period */
501 extern int until_stats_log;
504 extern StateMachineState sms;
505 extern int until_flush;
506 extern InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
507 extern Counts backlog_counts;
508 extern int backlog_counts_report;
510 extern int until_connect, until_backlog_nextscan;
511 extern double accept_proportion;
512 extern int nocheck, nocheck_reported, in_child;
515 extern int simulate_flush;
516 extern int logv_use_syslog;
517 extern const char *logv_prefix;