chiark / gitweb /
2855c412a0f6c2113042c75d1107197b7615b6e0
[inn-innduct.git] / backends / innduct.c
1 /*
2  * Four files full of
3  *    token article
4  *
5  *   site.name_ductlock        lock taken out by innduct
6  * F site.name                 written by innd
7  * D site.name_duct            moved aside by innduct, for flush
8  *   site.name_deferwork       431'd articles, still being written
9  *   site.name_defergo_<inum>  431'd articles, ready for innxmit
10  *   site.name_deferlock       lock taken out by innxmit wrapper
11  *
12  *
13  *
14  * OVERALL STATES:
15  *
16  *                                                   START
17  *   ,-->--.                                           |
18  *   |     |                                         stat D
19  *   |     |                                         /   |
20  *   |     |                                  ENOENT/    |exists
21  *   |     V                            <----------'     |
22  *   |  Normal                                         stat F
23  *   |   F: innd writing, duct reading                  /|\
24  *   |   D: ENOENT                                     / | \
25  *   |     |                                          /  |  \
26  *   |     |  duct decides time to flush      same   /   |   |
27  *   |     |  duct makes hardlink             as D  /    |   |
28  *   |     |                                       /     |   |
29  *   |     V                            <---------'      |   |
30  *   |  Hardlinked                                       |   |
31  *   |   F == D: innd writing, duct reading              |   |
32  *   ^     |                                             |   |
33  *   |     |  duct unlinks F                            /    |
34  *   |     V                                  ENOENT   /     |
35  *   |  Moved                            <------------'      |
36  *   |   F: ENOENT                                           |
37  *   |   D: innd writing, duct reading                       |
38  *   |     |                                                 |
39  *   |     |  duct flushes feed                              |
40  *   |     |   (others can too, harmlessly)                  |
41  *   |     V                                                 |
42  *   |  Separated                          <-----------------'
43  *   |   F: innd writing                        different to D
44  *   |   D: duct reading
45  *   |     |
46  *   |     V  duct completes processing of D
47  *   |     |  duct unlinks D
48  *   |     |
49  *   `--<--'
50  *
51  */
52
53 static int max_connections, max_queue_per_conn;
54 static int connection_setup_timeout, port, try_stream;
55 static const char *remote_host;
56
57 #define ISNODE(T)    T *next, *back;
58 #define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
59
60 #define NODE(n) ((struct node*)&(n)->head)
61
62 #define LIST_ADDHEAD(l,n)                                               \
63  (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
64 #define LIST_ADDTAIL(l,n)                                               \
65  (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
66
67 #define LIST_REMHEAD(l)                                                   \
68  ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
69 #define LIST_REMTAIL(l)                                                   \
70  ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
71 #define LIST_REMOVE(l,n)                        \
72  (list_remove(NODE((n))), (void)(l).count--)
73 #define LIST_INSERT(l,n,pred) \
74  (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
75
76 struct Article {
77   char *mid;
78   int midlen;
79   int checked, sentbody;
80 };
81
82 #define CONNIOVS 128
83
84 #define CN "<%d> "
85
86 typedef struct Conn Conn;
87
88 typedef enum {
89   Malloc, Const, Artdata;
90 } XmitKind;
91
92 typedef struct {
93   XmitKind kind;
94   union {
95     char *malloc_tofree;
96     ARTHANDLE *sm_art;
97   } info;
98 } XmitDetails;
99
100 struct Conn {
101   ISNODE(Conn);
102   int fd, max_queue, stream;
103   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
104   LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
105   struct iovec xmit[CONNIOVS];
106   XmitDetails xmitd[CONNIOVS];
107   int xmitu;
108 };
109
110
111 #define CHILD_ESTATUS_STREAM   4
112 #define CHILD_ESTATUS_NOSTREAM 5
113
114 static int since_connect_attempt;
115 static int nconns;
116 static LIST(Conn) idle, working, full;
117
118 static LIST(Article) *queue;
119
120 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
121
122
123 /*========== making new connections ==========*/
124
125 static int connecting_sockets[2]= {-1,-1};
126 static pid_t connecting_child;
127
128 static void report_child_status(const char *what, int status) {
129   if (WIFEXITED(status)) {
130     int es= WEXITSTATUS(status);
131     if (es)
132       warn("%s: child died with error exit status %d",es);
133   } else if (WIFSIGNALED(status)) {
134     int sig= WTERMSIG(status);
135     const char *sigstr= strsignal(sig);
136     const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
137     if (sigstr)
138       warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
139     else
140       warn("%s: child died due to unknown fatal signal %d%s",
141            what, sig, coredump);
142   } else {
143     warn("%s: child died with unknown wait status %d", status);
144   }
145 }
146
147 static void connect_attempt_discard(void) {
148   if (connecting_sockets[0]) {
149     cancel_fd(loop, connecting_sockets[0], OOP_READ);
150     cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
151   }
152   perhaps_close(&connecting_sockets[0]);
153   perhaps_close(&connecting_sockets[1]);
154
155   if (connecting_child) {
156     int status;
157     r= kill(connecting_child, SIGKILL);
158     if (r) sysdie("cannot kill connect child");
159
160     pid_t got= waitpid(connecting_child, &status, WNOHANG);
161     if (got==-1) sysdie("cannot reap connect child");
162
163     if (!(WIFEXITED(status) ||
164           (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
165       report_child_status("connect"
166     }
167     connecting_child= 0;
168   }
169 }
170
171 #define PREP_DECL_MSG_CMSG(msg)                 \
172   struct msghdr msg;                            \
173   memset(&msg,0,sizeof(msg));                   \
174   char msg##cbuf[CMSG_SPACE(sizeof(fd))];       \
175   msg.msg_control= msg##cbuf;                   \
176   msg.msg_controllen= sizeof(msg##cbuf);
177
178 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
179   Conn *conn= 0;
180
181   conn= xcalloc(sizeof(*conn));
182   
183   DECL_MSG_CMSG(msg);
184   struct cmsghdr *h= 0;
185   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
186   if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
187   if (!h) {
188     int status;
189     pid_t got= waitpid(connecting_child, &status, WNOHANG);
190     if (got != -1) {
191       assert(got==connecting_child);
192       connecting_child= 0;
193       if (WIFEXITED(status) &&
194           (WEXITSTATUS(status) != 0
195            WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
196            WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
197         /* child already reported the problem */
198       } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
199         warn("connect: connection attempt timed out");
200       } else if (!WIFEXITED(status)) {
201         report_child_status("connect", status);
202         /* that's probably the root cause then */
203       }
204     } else {
205       /* child is still running apparently, report the socket problem */
206       if (rs < 0)
207         syswarn("connect: read from child socket failed");
208       else if (e == OOP_EXCEPTIONN)
209         warn("connect: unexpected exception on child socket");
210       else
211         warn("connect: unexpected EOF on child socket");
212     }
213     goto x;
214   }
215
216 #define CHK(field, val)                                                   \
217   if (h->cmsg_##field != val) {                                           \
218     die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
219     goto x;                                                               \
220   }
221   CHK(level, SOL_SOCKET);
222   CHK(type,  SCM_RIGHTS);
223   CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
224 #undef CHK
225
226   if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
227
228   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
229
230   pid_t got= waitpid(connecting_child, &status, 0);
231   if (got==-1) sysdie("connect: real wait for child");
232   assert(got == connecting_child);
233   connecting_child= 0;
234
235   if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
236   int es= WEXITSTATUS(status);
237   switch (es) {
238   case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
239   case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
240   default:
241     die("connect: child gave unexpected exit status %d", es);
242   }
243
244   set nonblocking;
245
246   /* Phew! */
247   LIST_ADDHEAD(idle, conn);
248   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
249   connect_attempt_discard();
250   check_master_queue();
251   return 0;
252
253  x:
254   if (conn) {
255     perhaps_close(&conn->fd);
256     free(conn);
257   }
258   connect_attempt_discard();
259 }
260
261 static void connect_start() {
262   assert(!connecting_sockets[0]);
263   assert(!connecting_sockets[1]);
264   assert(!connecting_child);
265
266   notice("starting connection attempt");
267
268   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
269   if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
270
271   connecting_child= fork();
272   if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
273
274   if (!connecting_child) {
275     FILE *cn_from, *cn_to;
276     char buf[NNTP_STRLEN+100];
277     int exitstatus= CHILD_ESTATUS_NOSTREAM;
278
279     put sigpipe back;
280     close unwanted fds;
281
282     r= close(connecting_sockets[0]);
283     if (r) sysdie("connect: close parent socket in child");
284
285     alarm(connection_setup_timeout);
286     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
287       if (buf[0]) {
288         sanitise_inplace(buf);
289         die("connect: rejected: %s", buf);
290       } else {
291         sysdie("connect: connection attempt failed");
292       }
293     }
294     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
295       sysdie("connect: authentication failed");
296     if (try_stream) {
297       if (fputs("MODE STREAM\r\n", cn_to) ||
298           fflush(cn_to))
299         sysdie("connect: could not send MODE STREAM");
300       buf[sizeof(buf)-1]= 0;
301       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
302         if (ferror(cn_from))
303           sysdie("connect: could not read response to MODE STREAM");
304         else
305           die("connect: connection close in response to MODE STREAM");
306       }
307       int l= strlen(buf);
308       assert(l>=1);
309       if (buf[-1]!='\n') {
310         sanitise_inplace(buf);
311         die("connect: response to MODE STREAM is too long: %.100s...",
312             remote_host, buf);
313       }
314       l--;  if (l>0 && buf[1-]=='\r') l--;
315       buf[l]= 0;
316       char *ep;
317       int rcode= strtoul(buf,&ep,10);
318       if (ep != buf[3]) {
319         sanitise_inplace(buf);
320         die("connect: bad response to MODE STREAM: %.50s", buf);
321       }
322       switch (rcode) {
323       case 203:
324         exitstatus= CHILD_ESTATUS_STREAM;
325         break;
326       case 480:
327       case 500:
328         break;
329       default:
330         sanitise_inplace(buf);
331         warn("connect: unexpected response to MODE STREAM: %.50s", buf);
332         exitstatus= 2;
333         break;
334       }
335     }
336     int fd= fileno(cn_from);
337
338     PREP_DECL_MSG_CMSG(msg);
339     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
340     cmsg->cmsg_level= SOL_SOCKET;
341     cmsg->cmsg_type=  SCM_RIGHTS;
342     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
343     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
344
345     msg.msg_controllen= cmsg->cmsg_len;
346     r= sendmsg(connecting_sockets[1], &msg, 0);
347     if (r) sysdie("sendmsg failed for new connection");
348
349     _exit(exitstatus);
350   }
351
352   r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
353   if (r) sysdie("connect: close child socket in parent");
354
355   loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
356   loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
357   return OOP_CONTINUE;
358
359  x:
360   connect_attempt_discard();
361 }
362
363 /*========== overall control of article flow ==========*/
364  
365 static void conn_check_work(Conn *conn);
366
367 static void check_master_queue(void) {
368   if (!queue.count)
369     return;
370
371   Conn *last_assigned=0;
372   for (;;) {
373     if (working.head) {
374       conn_assign_one_article(&working, &last_assigned);
375     } else if (idle.head) {
376       conn_assign_one_article(&idle, &last_assigned);
377     } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
378                !connecting_child && !connect_delay) {
379       connect_delay= reconnect_delay_periods;
380       connect_start();
381     } else {
382       break;
383     }
384   }
385   conn_check_work(last_assigned);
386
387  
388 static void conn_assign_one_article(LIST(Conn) *connlist,
389                                     Conn **last_assigned) {
390   Conn *conn= connlist->head;
391
392   LIST_REMOVE(*connlist, conn);
393   Article *art= LIST_REMHEAD(queue);
394   LIST_ADDTAIL(conn->queue, art);
395   LIST_ADD(*conn_determine_right_list(conn), conn);
396
397   /* This slightly odd arrangement is so that we call conn_check_work
398    * once after filling the queue for a new connection in
399    * check_master_queue, rather than for each article. */
400   if (conn != *last_assigned && *last_assigned)
401     conn_check_work(*last_assigned);
402   *last_assigned= conn;
403 }
404
405 static int conn_total_queued_articles(Conn *conn) {
406   return conn->sent.count + conn->queue.count;
407 }
408
409 static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
410   int inqueue= conn_total_queued_articles(conn);
411   assert(inqueue <= max_queue);
412   if (inqueue == 0) return &idle;
413   if (inqueue == conn->max_queue) return &full;
414   return &working;
415 }
416
417 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
418   check_conn_work(u);
419 }
420   
421 static void conn_check_work(Conn *conn)  {
422   void *rp= 0;
423   for (;;) {
424     conn_make_some_xmits(conn);
425     if (!conn->xmitu) {
426       loop->cancel_fd(loop, conn->fd, OOP_WRITE);
427       return;
428     }
429
430     void *rp= conn_write_some_xmits(conn);
431     if (rp==OOP_CONTINUE) {
432       loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
433       return;
434     } else if (rp==OOP_HALT) {
435       return;
436     } else if (!rp) {
437       /* transmitted everything */
438     } else {
439       abort();
440     }
441   }
442 }
443  
444 /*========== article transmission ==========*/
445
446 static void *conn_write_some_xmits(Conn *conn) {
447   /* return values:
448    *      0:            nothing more to write, no need to call us again
449    *      OOP_CONTINUE: more to write but fd not writeable
450    *      OOP_HALT:     disaster, have destroyed conn
451    */
452   for (;;) {
453     int count= conn->xmitu;
454     if (!count) return 0;
455   
456     if (count > IOV_MAX) count= IOV_MAX;
457     ssize_t rs= writev(conn->fd, conn->xmit, count);
458     if (rs < 0) {
459       if (errno == EAGAIN) return OOP_CONTINUE;
460       syswarn(CN "write failed", conn->fd);
461       conn_failed(conn);
462       return OOP_HALT;
463     }
464     assert(rs > 0);
465
466     for (done=0; rs && done<xmitu; done++) {
467       struct iovec *vp= &conn->xmit[done];
468       XmitDetails *dp= &conn->xmitd[done];
469       if (rs > vp->iov_len) {
470         rs -= vp->iov_len;
471         xmit_free(dp);
472       } else {
473         vp->iov_base += rs;
474         vp->iov_len -= rs;
475       }
476     }
477     int newu= conn->xmitu - done;
478     memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
479     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
480     conn->xmitu= newu;
481   }
482 }
483
484 static void conn_make_some_xmits(Conn *conn) {
485   for (;;) {
486     if (conn->xmitu+5 > CONNIOVS)
487       break;
488
489     Article *art= LIST_REMHEAD(queue);
490     if (!art) break;
491
492     if (art->checked || conn->nocheck) {
493       /* actually send it */
494
495       ARTHANDLE *artdata= SMretrieve(somehow);
496       
497       if (conn->stream) {
498         if (artdata) {
499           XMIT_LITERAL("TAKETHIS ");
500           xmit_noalloc(art->mid, art->midlen);
501           XMIT_LITERAL("\r\n");
502           xmit_artbody(artdata);
503         }
504       } else {
505         /* we got 235 from IHAVE */
506         if (artdata) {
507           xmit_artbody(artdata);
508         } else {
509           XMIT_LITERAL(".\r\n");
510         }
511       }
512       art->sent= 1;
513       LIST_ADDTAIL(conn->sent, art);
514
515     } else {
516       /* check it */
517       
518       if (conn->stream)
519         XMIT_LITERAL("IHAVE ");
520       else
521         XMIT_LITERAL("CHECK ");
522       xmit_noalloc(art->mid, art->midlen);
523       XMIT_LITERAL("\r\n");
524
525       LIST_ADDTAIL(conn->sent, art);
526     }
527   }
528 }
529  
530     
531     if (conn->queue.head) {
532       if (conn->queue.checked || conn->nocheck) {
533         
534
535       && conn->xmitu+3 <= CONNIOVS) {
536       if (
537       XMIT("
538     if (conn->xmitu < CONNIOVS
539       
540
541   if (!queue
542   
543
544
545
546  
547     int circ_used= circ_write - circ_read;
548     if (circ_used < 0) circ_used += CONNBUFSZ;
549     writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
550
551     if (conn->circ_read == conn->circ_write)
552       return OOP_CONTINUE;
553
554     struct iovec iov[2];
555     int niov= 1;
556     iov[0].iov_base= conn->circ_buf + conn->circ_read;
557     if (conn->circ_read > conn->circ_write) { /* wrapped */
558       iov[0].iov_len= CONNBUFSZ - conn->circ_read;
559       iov[1].iov_base= conn->circ_buf;
560       iov[1].iov_len= conn->circ_write;
561       if (niov[1].iov_len) niov= 2;
562     } else {
563       iov[0].iov_len= conn->circ_write - conn->circ_read;
564     }
565     ssize_t rs= writev(conn->fd, &iov, niov);
566     if (rs < 0) {
567       
568     }
569     assert(rs > 0);
570
571     conn->circ_read += rs;
572     if (conn->circ_read > CONNBUFSZ)
573       conn->circ_read -= CONNBUFSZ;
574   }
575 }
576
577
578 main {
579   ignore sigpipe;
580 };