3 * tailing reliable realtime streaming feeder for inn
5 * Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
6 * and contributors; see LICENCE.txt.
7 * SPDX-License-Identifier: GPL-3.0-or-later
22 #include "inn/innconf.h"
23 #include "inn/messages.h"
26 #include <sys/types.h>
29 #include <sys/socket.h>
51 /*----- general definitions, probably best not changed -----*/
53 #define CONNCHILD_ESTATUS_STREAM 24
54 #define CONNCHILD_ESTATUS_NOSTREAM 25
56 #define INNDCOMMCHILD_ESTATUS_FAIL 26
57 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
59 #define MAX_LINE_FEEDFILE (NNTP_MAXLEN_MSGID + sizeof(TOKEN)*2 + 10)
60 #define MAX_CLI_COMMAND 1000
62 #define VA va_list al; va_start(al,fmt)
63 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
64 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
65 #define NORET __attribute__((__noreturn__))
67 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
68 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
70 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
72 #define FOR_LIST_NODE(nodevar, list) \
73 for ((nodevar)=LIST_HEAD(list); (nodevar); (nodevar)=LIST_NEXT((nodevar)))
75 #define FOR_CONN(conn) FOR_LIST_NODE(conn, conns)
77 /*----- doubly linked lists -----*/
79 #define ISNODE(T) struct node list_node
82 union { struct list li; T *for_type; } u; \
86 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
88 #define LIST_CHECKCANHAVENODE(l,n) \
89 ((void)((n) == ((l).u.for_type))) /* just for the type check */
91 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
92 ( LIST_CHECKCANHAVENODE(l,n), \
93 list_addsomehow(&(l).u.li, NODE((n))), \
97 #define LIST_REMSOMEHOW(l,list_remsomehow) \
98 ( (typeof((l).u.for_type)) \
101 list_remsomehow(&(l).u.li) ) \
107 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
108 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
109 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
110 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
112 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
113 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
114 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
115 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
117 #define LIST_REMOVE(l,n) \
118 ( LIST_CHECKCANHAVENODE(l,n), \
119 list_remove(NODE((n))), \
123 #define LIST_INSERT(l,n,pred) \
124 ( LIST_CHECKCANHAVENODE(l,n), \
125 LIST_CHECKCANHAVENODE(l,pred), \
126 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
130 /*----- type predeclarations -----*/
132 typedef struct Conn Conn;
133 typedef struct Article Article;
134 typedef struct InputFile InputFile;
135 typedef struct XmitDetails XmitDetails;
136 typedef struct Filemon_Perfile Filemon_Perfile;
137 typedef enum StateMachineState StateMachineState;
138 typedef struct CliCommand CliCommand;
144 /*----- configuration options -----*/
146 extern const char *sitename, *remote_host;
147 extern const char *feedfile, *path_run, *path_cli, *path_cli_dir;
148 extern int quiet_multiple, interactive, try_filemon, try_stream, port;
149 extern const char *inndconffile;
151 extern int max_connections, max_queue_per_conn, target_max_feedfile_size;
152 extern int period_seconds, filepoll_seconds, max_queue_per_ipf;
153 extern int connection_setup_timeout, inndcomm_flush_timeout;
155 extern double nocheck_thresh;
156 extern double nocheck_decay;
158 /* all these are initialised to seconds, and converted to periods in main */
159 extern int reconnect_delay_periods;
160 extern int flushfail_retry_periods;
161 extern int backlog_retry_minperiods;
162 extern int backlog_spontrescan_periods;
163 extern int spontaneous_flush_periods;
164 extern int max_separated_periods;
165 extern int need_activity_periods;
166 extern int stats_log_periods;
167 extern int lowvol_thresh;
168 extern int lowvol_periods;
170 extern double max_bad_data_ratio;
171 extern int max_bad_data_initial;
174 /*----- article states, and statistics -----*/
176 typedef enum { /* in queue in conn->sent */
177 art_Unchecked, /* not checked, not sent checking */
178 art_Wanted, /* checked, wanted sent body as requested */
179 art_Unsolicited, /* - sent body without check */
182 extern const char *const artstate_names[]; /* xmit.c */
184 #define RESULT_COUNTS(RCS,RCN) \
193 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
194 #define RCI_TRIPLE_VALS_BASE(counts,x) \
195 counts[art_Unchecked] x \
196 + counts[art_Wanted] x \
197 + counts[art_Unsolicited] x, \
198 counts[art_Unchecked] x \
199 , counts[art_Wanted] x \
200 , counts[art_Unsolicited] x
203 #define RC_INDEX(x) RC_##x,
204 RESULT_COUNTS(RC_INDEX, RC_INDEX)
209 read_ok, read_blank, read_err, nooffer_missing,
213 /*----- transmission buffers -----*/
229 /*----- core operational data structure types -----*/
232 int results[art_MaxState][RCI_max];
237 /* This is also an instance of struct oop_readable */
238 struct oop_readable readable; /* first */
239 oop_readable_call *readable_callback;
240 void *readable_callback_user;
243 Filemon_Perfile *filemon;
245 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
247 int skippinglong, paused, fake_readable;
250 long inprogress; /* includes queue.count and also articles in conns */
251 long autodefer; /* -1 means not doing autodefer */
270 int fd; /* may be 0, meaning closed (during construction/destruction) */
271 oop_read *rd; /* likewise */
272 int oopwriting; /* since on_fd is not idempotent */
273 int max_queue, stream;
274 const char *quitting;
275 int since_activity; /* periods */
276 ArticleList waiting; /* not yet told peer */
277 ArticleList priority; /* peer says send it now */
278 ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
279 struct iovec xmit[CONNIOVS];
280 XmitDetails xmitd[CONNIOVS];
284 #define SMS_LIST(X) \
292 enum StateMachineState {
293 #define SMS_DEF_ENUM(s) sm_##s,
294 SMS_LIST(SMS_DEF_ENUM)
297 extern const char *sms_names[];
299 /*========== function declarations ==========*/
301 /*----- help.c -----*/
303 void syscrash(const char *fmt, ...) NORET_PRINTF(1,2);
304 void crash(const char *fmt, ...) NORET_PRINTF(1,2);
305 void info(const char *fmt, ...) PRINTF(1,2);
306 void dbg(const char *fmt, ...) PRINTF(1,2);
308 void logv(int sysloglevel, const char *pfx, int errnoval,
309 const char *fmt, va_list al) PRINTF(4,0);
311 char *mvasprintf(const char *fmt, va_list al) PRINTF(1,0);
312 char *masprintf(const char *fmt, ...) PRINTF(1,2);
314 int close_perhaps(int *fd);
315 void xclose(int fd, const char *what, const char *what2);
316 void xclose_perhaps(int *fd, const char *what, const char *what2);
317 pid_t xfork(const char *what); /* also runs postfork in child */
318 pid_t xfork_bare(const char *what);
320 void on_fd_read_except(int fd, oop_call_fd callback);
321 void cancel_fd_read_except(int fd);
323 void report_child_status(const char *what, int status);
324 int xwaitpid(pid_t *pid, const char *what);
326 void *zxmalloc(size_t sz);
327 void xunlink(const char *path, const char *what);
329 void xsigaction(int signo, const struct sigaction *sa);
330 void xsigsetdefault(int signo);
331 void raise_default(int signo) NORET;
333 void xgettimeofday(struct timeval *tv_r);
334 void xsetnonblock(int fd, int nonb);
336 void check_isreg(const struct stat *stab, const char *path,
338 void xfstat(int fd, struct stat *stab_r, const char *what);
339 void xfstat_isreg(int fd, struct stat *stab_r,
340 const char *path, const char *what);
341 void xlstat_isreg(const char *path, struct stat *stab,
342 int *enoent_r /* 0 means ENOENT is fatal */,
344 int samefile(const struct stat *a, const struct stat *b);
346 char *sanitise(const char *input, int len);
348 static inline int isewouldblock(int errnoval) {
349 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
352 #define INNLOGSETS(INNLOGSET) \
353 INNLOGSET(die, "fatal", LOG_ERR) \
354 INNLOGSET(warn, "warning", LOG_WARNING) \
355 INNLOGSET(notice, "notice", LOG_NOTICE) \
356 INNLOGSET(debug, "debug", LOG_DEBUG)
357 #define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \
358 void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \
360 INNLOGSETS(INNLOGSET_DECLARE)
362 /*----- duct.c -----*/
367 /*----- cli.c -----*/
370 void cli_stdio(void);
372 /*----- conn.c -----*/
374 void conn_closefd(Conn *conn, const char *msgprefix);
375 void check_idle_conns(void);
376 int conn_busy(Conn *conn);
377 void conn_dispose(Conn *conn);
379 void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
380 void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
382 void notice_conns_more(const char *new_kind);
383 void notice_conns_fewer(void);
384 void notice_conns_stats(void);
386 int allow_connect_start(void);
387 void connect_start(void);
389 /*----- defer.c -----*/
391 void poll_backlog_file(void);
392 void search_backlog_file(void);
393 void open_defer(void);
394 void close_defer(void);
396 /*----- filemon.c -----*/
398 int filemon_method_init(void);
399 void filemon_method_dump_info(FILE *f);
401 void filemon_start(InputFile *ipf);
402 void filemon_stop(InputFile *ipf);
404 /*----- infile.c -----*/
408 void inputfile_reading_start(InputFile *ipf);
409 void inputfile_reading_stop(InputFile *ipf);
410 void inputfile_reading_pause(InputFile *ipf);
411 void inputfile_reading_resume(InputFile *ipf);
412 /* pause and resume are idempotent, and no-op if not done _reading_start */
414 InputFile *open_input_file(const char *path);
415 void close_input_file(InputFile *ipf); /* does not free */
416 char *dbg_report_ipf(InputFile *ipf);
418 void tailing_make_readable(InputFile *ipf);
420 /*----- recv.c -----*/
422 extern const oop_rd_style peer_rd_style;
423 oop_rd_call peer_rd_ok, peer_rd_err;
425 void article_done(Article *art, int whichcount);
427 /*----- statemc.c -----*/
429 void statemc_check_flushing_done(void);
430 void statemc_check_backlog_done(void);
432 extern sig_atomic_t terminate_sig_flag;
433 void statemc_period_poll(void);
434 void statemc_lock(void);
435 void init_signals(void);
436 void statemc_init(void);
437 void showstats(void);
439 #define SMS(newstate, periods, why) \
440 (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
441 void statemc_setstate(StateMachineState newsms, int periods,
442 const char *forlog, const char *why);
444 void statemc_start_flush(const char *why); /* Normal => Flushing */
445 void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
446 int trigger_flush_ok(const char *why /* 0 means timeout */);
447 /* => Flushing,FLUSHING, ret 1; or ret 0 */
449 void preterminate(void);
451 /*----- xmit.c -----*/
453 void inputfile_queue_check_expired(InputFile *ipf);
454 void check_assign_articles(void);
455 void queue_check_input_done(void);
456 void check_reading_pause_resume(InputFile *ipf);
458 void conn_maybe_write(Conn *conn);
459 void conn_make_some_xmits(Conn *conn);
460 void *conn_write_some_xmits(Conn *conn);
462 void xmit_free(XmitDetails *d);
464 int article_check_expired(Article *art /* must be queued, not conn */);
465 void article_autodefer(InputFile *ipf, Article *art);
466 void article_defer(Article *art /* not on a queue */, int whichcount);
467 void autodefer_input_file(InputFile *ipf);
469 /*----- external linkage for debug/dump only -----*/
471 extern pid_t connecting_child;
472 extern int connecting_fdpass_sock;
473 extern pid_t inndcomm_child;
475 /*========== general operational variables ==========*/
478 extern oop_source *loop;
479 extern ConnList conns;
480 extern char *path_lock, *path_flushing, *path_defer, *path_dump;
481 extern char *globpat_backlog;
482 extern pid_t self_pid;
483 extern int *lowvol_perperiod;
484 extern int lowvol_circptr;
485 extern int lowvol_total; /* does not include current period */
486 extern int until_stats_log;
489 extern StateMachineState sms;
490 extern int until_flush;
491 extern InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
492 extern Counts backlog_counts;
493 extern int backlog_counts_report;
495 extern int until_connect, until_backlog_nextscan;
496 extern double accept_proportion;
497 extern int nocheck, nocheck_reported, in_child;
500 extern int simulate_flush;
501 extern int logv_use_syslog;
502 extern const char *logv_prefix;