chiark / gitweb /
WIP before new iov queue
[inn-innduct.git] / backends / innduct.c
1 /*
2  * Four files full of
3  *    token article
4  *
5  *   site.name_ductlock        lock taken out by innduct
6  * F site.name                 written by innd
7  * D site.name_duct            moved aside by innduct, for flush
8  *   site.name_deferwork       431'd articles, still being written
9  *   site.name_defergo_<inum>  431'd articles, ready for innxmit
10  *   site.name_deferlock       lock taken out by innxmit wrapper
11  *
12  *
13  *
14  * OVERALL STATES:
15  *
16  *                                                   START
17  *   ,-->--.                                           |
18  *   |     |                                         stat D
19  *   |     |                                         /   |
20  *   |     |                                  ENOENT/    |exists
21  *   |     V                            <----------'     |
22  *   |  Normal                                         stat F
23  *   |   F: innd writing, duct reading                  /|\
24  *   |   D: ENOENT                                     / | \
25  *   |     |                                          /  |  \
26  *   |     |  duct decides time to flush      same   /   |   |
27  *   |     |  duct makes hardlink             as D  /    |   |
28  *   |     |                                       /     |   |
29  *   |     V                            <---------'      |   |
30  *   |  Hardlinked                                       |   |
31  *   |   F == D: innd writing, duct reading              |   |
32  *   ^     |                                             |   |
33  *   |     |  duct unlinks F                            /    |
34  *   |     V                                  ENOENT   /     |
35  *   |  Moved                            <------------'      |
36  *   |   F: ENOENT                                           |
37  *   |   D: innd writing, duct reading                       |
38  *   |     |                                                 |
39  *   |     |  duct flushes feed                              |
40  *   |     |   (others can too, harmlessly)                  |
41  *   |     V                                                 |
42  *   |  Separated                          <-----------------'
43  *   |   F: innd writing                        different to D
44  *   |   D: duct reading
45  *   |     |
46  *   |     V  duct completes processing of D
47  *   |     |  duct unlinks D
48  *   |     |
49  *   `--<--'
50  *
51  */
52
53 static int max_connections, max_queue_per_conn;
54 static int connection_setup_timeout, port, try_stream;
55 static const char *remote_host;
56
57 #define ISNODE(T)    T *next, *back;
58 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
59
60 #define NODE(n) ((struct node*)&(n)->head)
61
62 #define LIST_ADDHEAD(l,n)                                               \
63  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
64 #define LIST_ADDTAIL(l,n)                                               \
65  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
66
67 #define LIST_REMHEAD(l)                                                   \
68  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
69 #define LIST_REMTAIL(l)                                                   \
70  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
71 #define LIST_REMOVE(l,n)                        \
72  (list_remove(NODE((n))), (void)(l).count--)
73 #define LIST_INSERT(l,n,pred) \
74  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
75
76 struct Article {
77   char *mid;
78   int midlen;
79   int nocheck; /* also used when CHECK says yes please */
80 };
81
82 #define CONNBUFSZ 16384
83
84 #define CN "<%d> "
85
86 typedef struct Conn Conn;
87 struct Conn {
88   ISNODE(Conn);
89   int fd, max_queue, stream;
90   LIST(Article) queue;
91   Article *tosend; /* points into queue */
92   char circ_buf[CONNBUFSZ];
93   unsigned circ_read, circ_write;
94 };
95
96
97 #define CHILD_ESTATUS_STREAM   4
98 #define CHILD_ESTATUS_NOSTREAM 5
99
100 static int since_connect_attempt;
101 static int nconns;
102 static LIST(Conn) idle, working, full;
103
104 static LIST(Article) *queue;
105
106 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
107
108
109 /*========== making new connections ==========*/
110
111 static int connecting_sockets[2]= {-1,-1};
112 static pid_t connecting_child;
113
114 static void report_child_status(const char *what, int status) {
115   if (WIFEXITED(status)) {
116     int es= WEXITSTATUS(status);
117     if (es)
118       warn("%s: child died with error exit status %d",es);
119   } else if (WIFSIGNALED(status)) {
120     int sig= WTERMSIG(status);
121     const char *sigstr= strsignal(sig);
122     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
123     if (sigstr)
124       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
125     else
126       warn("%s: child died due to unknown fatal signal %d%s",
127            what, sig, coredump);
128   } else {
129     warn("%s: child died with unknown wait status %d", status);
130   }
131 }
132
133 static void connect_attempt_discard(void) {
134   if (connecting_sockets[0]) {
135     cancel_fd(loop, connecting_sockets[0], OOP_READ);
136     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
137   }
138   perhaps_close(&connecting_sockets[0]);
139   perhaps_close(&connecting_sockets[1]);
140
141   if (connecting_child) {
142     int status;
143     r= kill(connecting_child, SIGKILL);
144     if (r) sysdie("cannot kill connect child");
145
146     pid_t got= waitpid(connecting_child, &status, WNOHANG);
147     if (got==-1) sysdie("cannot reap connect child");
148
149     if (!(WIFEXITED(status) ||
150           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
151       report_child_status("connect"
152     }
153     connecting_child= 0;
154   }
155 }
156
157 #define PREP_DECL_MSG_CMSG(msg)                 \
158   struct msghdr msg;                            \
159   memset(&msg,0,sizeof(msg));                   \
160   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
161   msg.msg_control= msg##cbuf;                   \
162   msg.msg_controllen= sizeof(msg##cbuf);
163
164 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
165   Conn *conn= 0;
166
167   conn= xcalloc(sizeof(*conn));
168   
169   DECL_MSG_CMSG(msg);
170   struct cmsghdr *h= 0;
171   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
172   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
173   if (!h) {
174     int status;
175     pid_t got= waitpid(connecting_child, &status, WNOHANG);
176     if (got != -1) {
177       assert(got==connecting_child);
178       connecting_child= 0;
179       if (WIFEXITED(status) &&
180           (WEXITSTATUS(status) != 0
181            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
182            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
183         /* child already reported the problem */
184       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
185         warn("connect: connection attempt timed out");
186       } else if (!WIFEXITED(status)) {
187         report_child_status("connect", status);
188         /* that's probably the root cause then */
189       }
190     } else {
191       /* child is still running apparently, report the socket problem */
192       if (rs < 0)
193         syswarn("connect: read from child socket failed");
194       else if (e == OOP_EXCEPTIONN)
195         warn("connect: unexpected exception on child socket");
196       else
197         warn("connect: unexpected EOF on child socket");
198     }
199     goto x;
200   }
201
202 #define CHK(field, val)                                                   \
203   if (h->cmsg_##field != val) {                                           \
204     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
205     goto x;                                                               \
206   }
207   CHK(level, SOL_SOCKET);
208   CHK(type,  SCM_RIGHTS);
209   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
210 #undef CHK
211
212   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
213
214   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
215
216   pid_t got= waitpid(connecting_child, &status, 0);
217   if (got==-1) sysdie("connect: real wait for child");
218   assert(got == connecting_child);
219   connecting_child= 0;
220
221   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
222   int es= WEXITSTATUS(status);
223   switch (es) {
224   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
225   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
226   default:
227     die("connect: child gave unexpected exit status %d", es);
228   }
229
230   set nonblocking;
231
232   /* Phew! */
233   LIST_ADDHEAD(idle, conn);
234   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
235   connect_attempt_discard();
236   process_queue();
237   return 0;
238
239  x:
240   if (conn) {
241     perhaps_close(&conn->fd);
242     free(conn);
243   }
244   connect_attempt_discard();
245 }
246
247 static void connect_start() {
248   assert(!connecting_sockets[0]);
249   assert(!connecting_sockets[1]);
250   assert(!connecting_child);
251
252   notice("starting connection attempt");
253
254   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
255   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
256
257   connecting_child= fork();
258   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
259
260   if (!connecting_child) {
261     FILE *cn_from, *cn_to;
262     char buf[NNTP_STRLEN+100];
263     int exitstatus= CHILD_ESTATUS_NOSTREAM;
264
265     put sigpipe back;
266     close unwanted fds;
267
268     r= close(connecting_sockets[0]);
269     if (r) sysdie("connect: close parent socket in child");
270
271     alarm(connection_setup_timeout);
272     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
273       if (buf[0]) {
274         sanitise_inplace(buf);
275         die("connect: rejected: %s", buf);
276       } else {
277         sysdie("connect: connection attempt failed");
278       }
279     }
280     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
281       sysdie("connect: authentication failed");
282     if (try_stream) {
283       if (fputs("MODE STREAM\r\n", cn_to) ||
284           fflush(cn_to))
285         sysdie("connect: could not send MODE STREAM");
286       buf[sizeof(buf)-1]= 0;
287       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
288         if (ferror(cn_from))
289           sysdie("connect: could not read response to MODE STREAM");
290         else
291           die("connect: connection close in response to MODE STREAM");
292       }
293       int l= strlen(buf);
294       assert(l>=1);
295       if (buf[-1]!='\n') {
296         sanitise_inplace(buf);
297         die("connect: response to MODE STREAM is too long: %.100s...",
298             remote_host, buf);
299       }
300       l--;  if (l>0 && buf[1-]=='\r') l--;
301       buf[l]= 0;
302       char *ep;
303       int rcode= strtoul(buf,&ep,10);
304       if (ep != buf[3]) {
305         sanitise_inplace(buf);
306         die("connect: bad response to MODE STREAM: %.50s", buf);
307       }
308       switch (rcode) {
309       case 203:
310         exitstatus= CHILD_ESTATUS_STREAM;
311         break;
312       case 480:
313       case 500:
314         break;
315       default:
316         sanitise_inplace(buf);
317         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
318         exitstatus= 2;
319         break;
320       }
321     }
322     int fd= fileno(cn_from);
323
324     PREP_DECL_MSG_CMSG(msg);
325     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
326     cmsg->cmsg_level= SOL_SOCKET;
327     cmsg->cmsg_type=  SCM_RIGHTS;
328     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
329     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
330
331     msg.msg_controllen= cmsg->cmsg_len;
332     r= sendmsg(connecting_sockets[1], &msg, 0);
333     if (r) sysdie("sendmsg failed for new connection");
334
335     _exit(exitstatus);
336   }
337
338   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
339   if (r) sysdie("connect: close child socket in parent");
340
341   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
342   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
343   return OOP_CONTINUE;
344
345  x:
346   connect_attempt_discard();
347 }
348
349 /*========== overall control of article flow ==========*/
350  
351 static void process_queue() {
352   if (!queue.count)
353     return;
354
355   if (working.head) {
356     transmit(working.head);
357   } else if (idle.head) {
358     transmit(idle.head);
359   } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
360              !connecting_child && !connect_delay) {
361     connect_delay= reconnect_delay_periods;
362     connect_start();
363   }
364
365  
366 /*========== article transmission ==========*/
367
368 static void *conn_writeable() {
369   for (;;) {
370     int circ_used= circ_write - circ_read;
371     if (circ_used < 0) circ_used += CONNBUFSZ;
372     writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
373
374     if (conn->circ_read == conn->circ_write)
375       return OOP_CONTINUE;
376
377     struct iovec iov[2];
378     int niov= 1;
379     iov[0].iov_base= conn->circ_buf + conn->circ_read;
380     if (conn->circ_read > conn->circ_write) { /* wrapped */
381       iov[0].iov_len= CONNBUFSZ - conn->circ_read;
382       iov[1].iov_base= conn->circ_buf;
383       iov[1].iov_len= conn->circ_write;
384       if (niov[1].iov_len) niov= 2;
385     } else {
386       iov[0].iov_len= conn->circ_write - conn->circ_read;
387     }
388     ssize_t rs= writev(conn->fd, &iov, niov);
389     if (rs < 0) {
390       if (errno == EAGAIN) return OOP_CONTINUE;
391       syswarn(CN "write failed", conn->fd);
392       conn_failed(conn);
393       return OOP_CONTINUE;
394     }
395     assert(rs > 0);
396
397     conn->circ_read += rs;
398     if (conn->circ_read > CONNBUFSZ)
399       conn->circ_read -= CONNBUFSZ;
400   }
401 }
402
403
404  
405 static void transmit(Conn *conn) {
406   assert(conn->queue.count < max_queue);
407   
408
409 main {
410   ignore sigpipe;
411 };