3 * tailing reliable realtime streaming feeder for inn
5 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * (I believe that when you compile and link this as part of the inn2
21 * build, with the Makefile runes I have provided, all the libraries
22 * and files which end up included in innduct are licence-compatible
23 * with GPLv3. If not then please let me know. -Ian Jackson.)
38 #include "inn/innconf.h"
39 #include "inn/messages.h"
42 #include <sys/types.h>
45 #include <sys/socket.h>
66 /*----- general definitions, probably best not changed -----*/
68 #define CONNCHILD_ESTATUS_STREAM 24
69 #define CONNCHILD_ESTATUS_NOSTREAM 25
71 #define INNDCOMMCHILD_ESTATUS_FAIL 26
72 #define INNDCOMMCHILD_ESTATUS_NONESUCH 27
74 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10)
75 #define MAX_CLI_COMMAND 1000
77 #define VA va_list al; va_start(al,fmt)
78 #define PRINTF(f,a) __attribute__((__format__(printf,f,a)))
79 #define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a)))
80 #define NORET __attribute__((__noreturn__))
82 #define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr))))
83 #define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr)))
85 #define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v);
87 #define FOR_CONN(conn) \
88 for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn)))
90 /*----- doubly linked lists -----*/
92 #define ISNODE(T) struct node list_node
95 union { struct list li; T *for_type; } u; \
99 #define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node)
101 #define LIST_CHECKCANHAVENODE(l,n) \
102 ((void)((n) == ((l).u.for_type))) /* just for the type check */
104 #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \
105 ( LIST_CHECKCANHAVENODE(l,n), \
106 list_addsomehow(&(l).u.li, NODE((n))), \
110 #define LIST_REMSOMEHOW(l,list_remsomehow) \
111 ( (typeof((l).u.for_type)) \
114 list_remsomehow(&(l).u.li) ) \
120 #define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead)
121 #define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail)
122 #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead)
123 #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail)
125 #define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li))
126 #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l))))
127 #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n))))
128 #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n))))
130 #define LIST_REMOVE(l,n) \
131 ( LIST_CHECKCANHAVENODE(l,n), \
132 list_remove(NODE((n))), \
136 #define LIST_INSERT(l,n,pred) \
137 ( LIST_CHECKCANHAVENODE(l,n), \
138 LIST_CHECKCANHAVENODE(l,pred), \
139 list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \
143 /*----- type predeclarations -----*/
145 typedef struct Conn Conn;
146 typedef struct Article Article;
147 typedef struct InputFile InputFile;
148 typedef struct XmitDetails XmitDetails;
149 typedef struct Filemon_Perfile Filemon_Perfile;
150 typedef enum StateMachineState StateMachineState;
151 typedef struct CliCommand CliCommand;
157 /*----- configuration options -----*/
158 /* when changing defaults, remember to update the manpage */
160 static const char *sitename, *remote_host;
161 static const char *feedfile, *path_run, *path_cli, *path_cli_dir;
162 static int quiet_multiple=0;
163 static int interactive=0, try_filemon=1;
164 static int try_stream=1;
166 static const char *inndconffile;
168 static int max_connections=10;
169 static int max_queue_per_conn=200;
170 static int target_max_feedfile_size=100000;
171 static int period_seconds=30;
172 static int filepoll_seconds=5;
173 static int max_queue_per_ipf=-1;
175 static int connection_setup_timeout=200;
176 static int inndcomm_flush_timeout=100;
178 static double nocheck_thresh= 95.0; /* converted from percentage by main */
179 static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
181 /* all these are initialised to seconds, and converted to periods in main */
182 static int reconnect_delay_periods=1000;
183 static int flushfail_retry_periods=1000;
184 static int backlog_retry_minperiods=100;
185 static int backlog_spontrescan_periods=300;
186 static int spontaneous_flush_periods=100000;
187 static int max_separated_periods=2000;
188 static int need_activity_periods=1000;
189 static int lowvol_thresh=3;
190 static int lowvol_periods=1000;
192 static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
193 static int max_bad_data_initial= 30;
194 /* in one corrupt 4096-byte block the number of newlines has
195 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
198 /*----- statistics -----*/
200 typedef enum { /* in queue in conn->sent */
201 art_Unchecked, /* not checked, not sent checking */
202 art_Wanted, /* checked, wanted sent body as requested */
203 art_Unsolicited, /* - sent body without check */
207 static const char *const artstate_names[]=
208 { "Unchecked", "Wanted", "Unsolicited", 0 };
210 #define RESULT_COUNTS(RCS,RCN) \
219 #define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
220 #define RCI_TRIPLE_VALS_BASE(counts,x) \
221 counts[art_Unchecked] x \
222 + counts[art_Wanted] x \
223 + counts[art_Unsolicited] x, \
224 counts[art_Unchecked] x \
225 , counts[art_Wanted] x \
226 , counts[art_Unsolicited] x
229 #define RC_INDEX(x) RC_##x,
230 RESULT_COUNTS(RC_INDEX, RC_INDEX)
235 /*----- transmission buffers -----*/
251 /*----- core operational data structure types -----*/
254 /* This is also an instance of struct oop_readable */
255 struct oop_readable readable; /* first */
256 oop_readable_call *readable_callback;
257 void *readable_callback_user;
260 Filemon_Perfile *filemon;
262 oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
264 int skippinglong, paused, fake_readable;
267 long inprogress; /* includes queue.count and also articles in conns */
268 long autodefer; /* -1 means not doing autodefer */
270 int counts[art_MaxState][RCI_max];
271 int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing;
286 #define SMS_LIST(X) \
294 enum StateMachineState {
295 #define SMS_DEF_ENUM(s) sm_##s,
296 SMS_LIST(SMS_DEF_ENUM)
299 extern const char *sms_names[];
301 /*========== function declarations ==========*/
303 /*----- help.c -----*/
305 static void syscrash(const char *fmt, ...) NORET_PRINTF(1,2);
306 static void crash(const char *fmt, ...) NORET_PRINTF(1,2);
307 static void info(const char *fmt, ...) PRINTF(1,2);
308 static void dbg(const char *fmt, ...) PRINTF(1,2);
310 void logv(int sysloglevel, const char *pfx, int errnoval,
311 const char *fmt, va_list al) PRINTF(5,0);
313 char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0);
314 char *xasprintf(const char *fmt, ...) PRINTF(1,2);
316 int close_perhaps(int *fd);
317 void xclose(int fd, const char *what, const char *what2);
318 void xclose_perhaps(int *fd, const char *what, const char *what2);
319 pid_t xfork(const char *what);
321 static void on_fd_read_except(int fd, oop_call_fd callback);
322 void cancel_fd_read_except(int fd);
324 void report_child_status(const char *what, int status);
325 int xwaitpid(pid_t *pid, const char *what);
327 void *zxmalloc(size_t sz);
328 void xunlink(const char *path, const char *what);
330 void xsigaction(int signo, const struct sigaction *sa);
331 void xsigsetdefault(int signo);
333 void xgettimeofday(struct timeval *tv_r);
334 void xsetnonblock(int fd, int nonb);
336 void check_isreg(const struct stat *stab, const char *path,
338 void xfstat(int fd, struct stat *stab_r, const char *what);
339 void xfstat_isreg(int fd, struct stat *stab_r,
340 const char *path, const char *what);
341 void xlstat_isreg(const char *path, struct stat *stab,
342 int *enoent_r /* 0 means ENOENT is fatal */,
344 int samefile(const struct stat *a, const struct stat *b);
346 char *sanitise(const char *input, int len);
348 static inline int isewouldblock(int errnoval) {
349 return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
352 /*----- innduct.c -----*/
356 /*----- conn.c -----*/
358 void conn_closefd(Conn *conn, const char *msgprefix);
360 /*----- defer.c -----*/
362 void poll_backlog_file(void);
364 /*----- infile.c -----*/
368 /*----- statemc.c -----*/
370 sig_atomic_t terminate_sig_flag;
372 /*----- xmit.c -----*/
374 void inputfile_queue_check_expired(InputFile *ipf);
376 /*----- external linkage for debug/dump only -----*/
378 pid_t connecting_child;
379 pid_t inndcomm_child;
381 /*========== general operational variables ==========*/
384 extern oop_source *loop;
385 extern ConnList conns;
386 extern char *path_lock, *path_flushing, *path_defer, *path_dump;
387 extern char *globpat_backlog;
388 extern pid_t self_pid;
389 extern int *lowvol_perperiod;
390 extern int lowvol_circptr;
391 extern int lowvol_total; /* does not include current period */
394 extern StateMachineState sms;
395 extern int until_flush;
396 extern InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
398 extern int until_connect, until_backlog_nextscan;
399 extern double accept_proportion;
400 extern int nocheck, nocheck_reported, in_child;
403 extern int simulate_flush;
404 extern int logv_use_syslog;
405 extern const char *logv_prefix;