chiark / gitweb /
WIP
[innduct.git] / backends / innduct.c
1 /*
2  * Four files full of
3  *    token article
4  *
5  *   site.name_ductlock        lock taken out by innduct
6  * F site.name                 written by innd
7  * D site.name_duct            moved aside by innduct, for flush
8  *   site.name_deferwork       431'd articles, still being written
9  *   site.name_defergo_<inum>  431'd articles, ready for innxmit
10  *   site.name_deferlock       lock taken out by innxmit wrapper
11  *
12  *
13  *
14  * OVERALL STATES:
15  *
16  *                                                   START
17  *   ,-->--.                                           |
18  *   |     |                                         stat D
19  *   |     |                                         /   |
20  *   |     |                                  ENOENT/    |exists
21  *   |     V                            <----------'     |
22  *   |  Normal                                         stat F
23  *   |   F: innd writing, duct reading                  /|\
24  *   |   D: ENOENT                                     / | \
25  *   |     |                                          /  |  \
26  *   |     |  duct decides time to flush      same   /   |   |
27  *   |     |  duct makes hardlink             as D  /    |   |
28  *   |     |                                       /     |   |
29  *   |     V                            <---------'      |   |
30  *   |  Hardlinked                                       |   |
31  *   |   F == D: innd writing, duct reading              |   |
32  *   ^     |                                             |   |
33  *   |     |  duct unlinks F                            /    |
34  *   |     V                                  ENOENT   /     |
35  *   |  Moved                            <------------'      |
36  *   |   F: ENOENT                                           |
37  *   |   D: innd writing, duct reading                       |
38  *   |     |                                                 |
39  *   |     |  duct flushes feed                              |
40  *   |     |   (others can too, harmlessly)                  |
41  *   |     V                                                 |
42  *   |  Separated                          <-----------------'
43  *   |   F: innd writing                        different to D
44  *   |   D: duct reading
45  *   |     |
46  *   |     V  duct completes processing of D
47  *   |     |  duct unlinks D
48  *   |     |
49  *   `--<--'
50  *
51  */
52
53 static int max_connections, max_queue_per_conn;
54 static int connection_setup_timeout, port, try_stream;
55 static const char *remote_host;
56
57 #define ISNODE(T)    T *next, *back;
58 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
59
60 #define NODE(n) ((struct node*)&(n)->head)
61
62 #define LIST_ADDHEAD(l,n)                                               \
63  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
64 #define LIST_ADDTAIL(l,n)                                               \
65  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
66
67 #define LIST_REMHEAD(l)                                                   \
68  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
69 #define LIST_REMTAIL(l)                                                   \
70  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
71 #define LIST_REMOVE(l,n)                        \
72  (list_remove(NODE((n))), (void)(l).count--)
73 #define LIST_INSERT(l,n,pred) \
74  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
75
76 struct Article {
77   char *mid;
78   int midlen;
79   int checked;
80 };
81
82 #define CONNBUFSZ 16384
83
84 #define CN "<%d> "
85
86 typedef struct Conn Conn;
87
88 typedef enum {
89   Malloc, Const, Artdata;
90 } XmitKind;
91 typedef struct {
92   XmitKind kind;
93   union {
94     char *malloc_tofree;
95   } info;
96 } XmitDetails;
97
98 struct Conn {
99   ISNODE(Conn);
100   int fd, max_queue, stream;
101   LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
102   LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
103   Article send; /* partially transmitted */
104   struct iovec *xmit;
105   XmitDetails *xmitd;
106   int xmitu;
107 };
108
109
110 #define CHILD_ESTATUS_STREAM   4
111 #define CHILD_ESTATUS_NOSTREAM 5
112
113 static int since_connect_attempt;
114 static int nconns;
115 static LIST(Conn) idle, working, full;
116
117 static LIST(Article) *queue;
118
119 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
120
121
122 /*========== making new connections ==========*/
123
124 static int connecting_sockets[2]= {-1,-1};
125 static pid_t connecting_child;
126
127 static void report_child_status(const char *what, int status) {
128   if (WIFEXITED(status)) {
129     int es= WEXITSTATUS(status);
130     if (es)
131       warn("%s: child died with error exit status %d",es);
132   } else if (WIFSIGNALED(status)) {
133     int sig= WTERMSIG(status);
134     const char *sigstr= strsignal(sig);
135     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
136     if (sigstr)
137       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
138     else
139       warn("%s: child died due to unknown fatal signal %d%s",
140            what, sig, coredump);
141   } else {
142     warn("%s: child died with unknown wait status %d", status);
143   }
144 }
145
146 static void connect_attempt_discard(void) {
147   if (connecting_sockets[0]) {
148     cancel_fd(loop, connecting_sockets[0], OOP_READ);
149     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
150   }
151   perhaps_close(&connecting_sockets[0]);
152   perhaps_close(&connecting_sockets[1]);
153
154   if (connecting_child) {
155     int status;
156     r= kill(connecting_child, SIGKILL);
157     if (r) sysdie("cannot kill connect child");
158
159     pid_t got= waitpid(connecting_child, &status, WNOHANG);
160     if (got==-1) sysdie("cannot reap connect child");
161
162     if (!(WIFEXITED(status) ||
163           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
164       report_child_status("connect"
165     }
166     connecting_child= 0;
167   }
168 }
169
170 #define PREP_DECL_MSG_CMSG(msg)                 \
171   struct msghdr msg;                            \
172   memset(&msg,0,sizeof(msg));                   \
173   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
174   msg.msg_control= msg##cbuf;                   \
175   msg.msg_controllen= sizeof(msg##cbuf);
176
177 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
178   Conn *conn= 0;
179
180   conn= xcalloc(sizeof(*conn));
181   
182   DECL_MSG_CMSG(msg);
183   struct cmsghdr *h= 0;
184   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
185   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
186   if (!h) {
187     int status;
188     pid_t got= waitpid(connecting_child, &status, WNOHANG);
189     if (got != -1) {
190       assert(got==connecting_child);
191       connecting_child= 0;
192       if (WIFEXITED(status) &&
193           (WEXITSTATUS(status) != 0
194            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
195            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
196         /* child already reported the problem */
197       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
198         warn("connect: connection attempt timed out");
199       } else if (!WIFEXITED(status)) {
200         report_child_status("connect", status);
201         /* that's probably the root cause then */
202       }
203     } else {
204       /* child is still running apparently, report the socket problem */
205       if (rs < 0)
206         syswarn("connect: read from child socket failed");
207       else if (e == OOP_EXCEPTIONN)
208         warn("connect: unexpected exception on child socket");
209       else
210         warn("connect: unexpected EOF on child socket");
211     }
212     goto x;
213   }
214
215 #define CHK(field, val)                                                   \
216   if (h->cmsg_##field != val) {                                           \
217     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
218     goto x;                                                               \
219   }
220   CHK(level, SOL_SOCKET);
221   CHK(type,  SCM_RIGHTS);
222   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
223 #undef CHK
224
225   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
226
227   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
228
229   pid_t got= waitpid(connecting_child, &status, 0);
230   if (got==-1) sysdie("connect: real wait for child");
231   assert(got == connecting_child);
232   connecting_child= 0;
233
234   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
235   int es= WEXITSTATUS(status);
236   switch (es) {
237   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
238   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
239   default:
240     die("connect: child gave unexpected exit status %d", es);
241   }
242
243   set nonblocking;
244
245   /* Phew! */
246   LIST_ADDHEAD(idle, conn);
247   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
248   connect_attempt_discard();
249   process_queue();
250   return 0;
251
252  x:
253   if (conn) {
254     perhaps_close(&conn->fd);
255     free(conn);
256   }
257   connect_attempt_discard();
258 }
259
260 static void connect_start() {
261   assert(!connecting_sockets[0]);
262   assert(!connecting_sockets[1]);
263   assert(!connecting_child);
264
265   notice("starting connection attempt");
266
267   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
268   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
269
270   connecting_child= fork();
271   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
272
273   if (!connecting_child) {
274     FILE *cn_from, *cn_to;
275     char buf[NNTP_STRLEN+100];
276     int exitstatus= CHILD_ESTATUS_NOSTREAM;
277
278     put sigpipe back;
279     close unwanted fds;
280
281     r= close(connecting_sockets[0]);
282     if (r) sysdie("connect: close parent socket in child");
283
284     alarm(connection_setup_timeout);
285     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
286       if (buf[0]) {
287         sanitise_inplace(buf);
288         die("connect: rejected: %s", buf);
289       } else {
290         sysdie("connect: connection attempt failed");
291       }
292     }
293     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
294       sysdie("connect: authentication failed");
295     if (try_stream) {
296       if (fputs("MODE STREAM\r\n", cn_to) ||
297           fflush(cn_to))
298         sysdie("connect: could not send MODE STREAM");
299       buf[sizeof(buf)-1]= 0;
300       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
301         if (ferror(cn_from))
302           sysdie("connect: could not read response to MODE STREAM");
303         else
304           die("connect: connection close in response to MODE STREAM");
305       }
306       int l= strlen(buf);
307       assert(l>=1);
308       if (buf[-1]!='\n') {
309         sanitise_inplace(buf);
310         die("connect: response to MODE STREAM is too long: %.100s...",
311             remote_host, buf);
312       }
313       l--;  if (l>0 && buf[1-]=='\r') l--;
314       buf[l]= 0;
315       char *ep;
316       int rcode= strtoul(buf,&ep,10);
317       if (ep != buf[3]) {
318         sanitise_inplace(buf);
319         die("connect: bad response to MODE STREAM: %.50s", buf);
320       }
321       switch (rcode) {
322       case 203:
323         exitstatus= CHILD_ESTATUS_STREAM;
324         break;
325       case 480:
326       case 500:
327         break;
328       default:
329         sanitise_inplace(buf);
330         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
331         exitstatus= 2;
332         break;
333       }
334     }
335     int fd= fileno(cn_from);
336
337     PREP_DECL_MSG_CMSG(msg);
338     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
339     cmsg->cmsg_level= SOL_SOCKET;
340     cmsg->cmsg_type=  SCM_RIGHTS;
341     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
342     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
343
344     msg.msg_controllen= cmsg->cmsg_len;
345     r= sendmsg(connecting_sockets[1], &msg, 0);
346     if (r) sysdie("sendmsg failed for new connection");
347
348     _exit(exitstatus);
349   }
350
351   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
352   if (r) sysdie("connect: close child socket in parent");
353
354   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
355   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
356   return OOP_CONTINUE;
357
358  x:
359   connect_attempt_discard();
360 }
361
362 /*========== overall control of article flow ==========*/
363  
364 static void check_master_queue(void) {
365   if (!queue.count)
366     return;
367
368   if (working.head) {
369     conn_assign_one_article(&working);
370   } else if (idle.head) {
371     conn_assign_one_article(&idle);
372   } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
373              !connecting_child && !connect_delay) {
374     connect_delay= reconnect_delay_periods;
375     connect_start();
376   }
377
378
379 static int conn_total_queued_articles(Conn *conn) {
380   return conn->sent.count + !!conn->send + conn->queue.count;
381 }
382  
383 static void conn_assign_one_article(LIST(Conn) *connlist) {
384   Conn *conn= connlist->head;
385
386   LIST_REMOVE(*connlist, conn);
387   Article *art= LIST_REMHEAD(queue);
388   LIST_ADDTAIL(conn->queue, art);
389   LIST_ADD(*conn_determine_right_list(conn), conn);
390   
391   check_conn_work(conn);
392 }
393
394 static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
395   int inqueue= conn_total_queued_articles(conn);
396   assert(inqueue <= max_queue);
397   if (inqueue == 0) return &idle;
398   if (inqueue == conn->max_queue) return &full;
399   return &working;
400 }
401
402 static void check_conn_work(Conn *conn)  {
403   void *rp;
404   for (;;) {
405     conn_make_some_xmits(conn);
406
407     void *rp= conn_write_some_xmits(conn);
408     if (!rp) {
409       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
410       return;
411     } else if (rp==OOP_CONTINUE) {
412       loop->on_fd(loop, conn->fd, OOP_WRITE;)
413  else if (rp==OOP_HALT) {
414       return;
415     }
416     
417
418       if (
419
420     while (
421     
422 }
423  
424 /*========== article transmission ==========*/
425
426 static void *conn_writeable() {
427   for (;;) {
428     
429     if (!conn->xmitu) {
430       perhaps_transmit_on(conn);
431       if (!conn->xmitu) {
432         unlink from readable;
433         break;
434       }
435     }
436
437
438 static void *conn_write_some_xmits(Conn *conn) {
439   /* return values:
440    *      0:            nothing more to write, no need to call us again
441    *      OOP_CONTINUE: more to write but fd not writeable
442    *      OOP_HALT:     disaster, have destroyed conn
443    */
444   for (;;) {
445     int count= conn->xmitu;
446     if (!count) return 0;
447   
448     if (count > IOV_MAX) count= IOV_MAX;
449     ssize_t rs= writev(conn->fd, conn->xmit, count);
450     if (rs < 0) {
451       if (errno == EAGAIN) return OOP_CONTINUE;
452       syswarn(CN "write failed", conn->fd);
453       conn_failed(conn);
454       return OOP_HALT;
455     }
456     assert(rs > 0);
457
458     for (done=0; rs && done<xmitu; done++) {
459       struct iovec *vp= &conn->xmit[done];
460       XmitDetails *dp= &conn->xmitd[done];
461       if (rs > vp->iov_len) {
462         rs -= vp->iov_len;
463         xmit_free(dp);
464       } else {
465         vp->iov_base += rs;
466         vp->iov_len -= rs;
467       }
468     }
469     int newu= conn->xmitu - done;
470     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
471     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
472     conn->xmitu= newu;
473   }
474 }
475
476 static void conn_make_some_xmits(Conn *conn) {
477   for (;;) {
478     if (conn->send) {
479       do something about this article text;
480       continue;
481     }
482
483     if (conn->xmitu+3 > conn->xmita)
484       /* no space for a CHECK even */
485       break;
486
487     Article *art= LIST_REMHEAD(queue);
488     if (!art) break;
489
490     if (art->checked || conn->nocheck) {
491       if (conn->stream) {
492         XMIT_LITERAL("TAKETHIS ");
493         xmit_noalloc(art->mid, art->midlen);
494         XMIT_LITERAL("\r\n");
495       } else {
496         /* we got 235 from IHAVE */
497       }
498       conn->send= art;
499     } else {
500       if (conn->stream)
501         XMIT_LITERAL("IHAVE ");
502       else
503         XMIT_LITERAL("CHECK ");
504       xmit_noalloc(art->mid, art->midlen);
505       XMIT_LITERAL("\r\n");
506       LIST_ADDTAIL(conn->sent, art);
507     }
508   }
509 }
510  
511     
512     if (conn->queue.head) {
513       if (conn->queue.checked || conn->nocheck) {
514         
515
516       && conn->xmitu+3 <= xmita) {
517       if (
518       XMIT("
519     if (conn->xmitu < xmita
520       
521
522   if (!queue
523   
524
525
526
527  
528     int circ_used= circ_write - circ_read;
529     if (circ_used < 0) circ_used += CONNBUFSZ;
530     writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
531
532     if (conn->circ_read == conn->circ_write)
533       return OOP_CONTINUE;
534
535     struct iovec iov[2];
536     int niov= 1;
537     iov[0].iov_base= conn->circ_buf + conn->circ_read;
538     if (conn->circ_read > conn->circ_write) { /* wrapped */
539       iov[0].iov_len= CONNBUFSZ - conn->circ_read;
540       iov[1].iov_base= conn->circ_buf;
541       iov[1].iov_len= conn->circ_write;
542       if (niov[1].iov_len) niov= 2;
543     } else {
544       iov[0].iov_len= conn->circ_write - conn->circ_read;
545     }
546     ssize_t rs= writev(conn->fd, &iov, niov);
547     if (rs < 0) {
548       
549     }
550     assert(rs > 0);
551
552     conn->circ_read += rs;
553     if (conn->circ_read > CONNBUFSZ)
554       conn->circ_read -= CONNBUFSZ;
555   }
556 }
557
558
559 main {
560   ignore sigpipe;
561 };