chiark / gitweb /
debugging for thing that crashed
[innduct.git] / backends / innxmit.c
1 /*  $Id: innxmit.c 6716 2004-05-16 20:26:56Z rra $
2 **
3 **  Transmit articles to remote site.
4 **  Modified for NNTP streaming: 1996-01-03 Jerry Aguirre
5 */
6
7 #include "config.h"
8 #include "clibrary.h"
9 #include "portable/socket.h"
10 #include "portable/time.h"
11 #include <ctype.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <setjmp.h>
15 #include <signal.h>
16 #include <syslog.h>
17 #include <sys/stat.h>
18 #include <sys/uio.h>
19
20 /* Needed on AIX 4.1 to get fd_set and friends. */
21 #ifdef HAVE_SYS_SELECT_H
22 # include <sys/select.h>
23 #endif
24
25 #include "inn/history.h"
26 #include "inn/innconf.h"
27 #include "inn/messages.h"
28 #include "inn/qio.h"
29 #include "inn/timer.h"
30 #include "inn/wire.h"
31 #include "libinn.h"
32 #include "nntp.h"
33 #include "paths.h"
34 #include "storage.h"
35
36 #define OUTPUT_BUFFER_SIZE      (16 * 1024)
37
38 /* Streaming extensions to NNTP.  This extension removes the lock-step
39 ** limitation of conventional NNTP.  Article transfer is several times
40 ** faster.  Negotiated and falls back to old mode if receiver refuses.
41 */
42
43 /* max number of articles that can be streamed ahead */
44 #define STNBUF 32
45
46 /* Send "takethis" without "check" if this many articles were
47 ** accepted in a row.
48 */
49 #define STNC 16
50
51 /* typical number of articles to stream  */
52 /* must be able to fopen this many articles */
53 #define STNBUFL (STNBUF/2)
54
55 /* number of retries before requeueing to disk */
56 #define STNRETRY 5
57
58 struct stbufs {         /* for each article we are procesing */
59         char *st_fname;         /* file name */
60         char *st_id;            /* message ID */
61         int   st_retry;         /* retry count */
62         int   st_age;           /* age count */
63         ARTHANDLE *art;         /* arthandle to read article contents */
64         int   st_hash;          /* hash value to speed searches */
65         long  st_size;          /* article size */
66 };
67 static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
68 static int stnq;        /* current number of active entries in stbuf */
69 static long stnofail;   /* Count of consecutive successful sends */
70
71 static int TryStream = true;    /* Should attempt stream negotation? */
72 static int CanStream = false;   /* Result of stream negotation */
73 static int DoCheck   = true;    /* Should check before takethis? */
74 static char modestream[] = "mode stream";
75 static char modeheadfeed[] = "mode headfeed";
76 static long retries = 0;
77 static int logRejects = false ;  /* syslog the 437 responses. */
78
79
80
81 /*
82 ** Syslog formats - collected together so they remain consistent
83 */
84 static char     STAT1[] =
85         "%s stats offered %lu accepted %lu refused %lu rejected %lu missing %lu accsize %.0f rejsize %.0f";
86 static char     STAT2[] = "%s times user %.3f system %.3f elapsed %.3f";
87 static char     GOT_BADCOMMAND[] = "%s rejected %s %s";
88 static char     REJECTED[] = "%s rejected %s (%s) %s";
89 static char     REJ_STREAM[] = "%s rejected (%s) %s";
90 static char     CANT_CONNECT[] = "%s connect failed %s";
91 static char     CANT_AUTHENTICATE[] = "%s authenticate failed %s";
92 static char     IHAVE_FAIL[] = "%s ihave failed %s";
93
94 static char     CANT_FINDIT[] = "%s can't find %s";
95 static char     CANT_PARSEIT[] = "%s can't parse ID %s";
96 static char     UNEXPECTED[] = "%s unexpected response code %s";
97
98 /*
99 **  Global variables.
100 */
101 static bool             AlwaysRewrite;
102 static bool             Debug;
103 static bool             DoRequeue = true;
104 static bool             Purging;
105 static bool             STATprint;
106 static bool             HeadersFeed;
107 static char             *BATCHname;
108 static char             *BATCHtemp;
109 static char             *REMhost;
110 static double           STATbegin;
111 static double           STATend;
112 static FILE             *BATCHfp;
113 static int              FromServer;
114 static int              ToServer;
115 static struct history   *History;
116 static QIOSTATE         *BATCHqp;
117 static sig_atomic_t     GotAlarm;
118 static sig_atomic_t     GotInterrupt;
119 static sig_atomic_t     JMPyes;
120 static jmp_buf          JMPwhere;
121 static char             *REMbuffer;
122 static char             *REMbuffptr;
123 static char             *REMbuffend;
124 static unsigned long    STATaccepted;
125 static unsigned long    STAToffered;
126 static unsigned long    STATrefused;
127 static unsigned long    STATrejected;
128 static unsigned long    STATmissing;
129 static double           STATacceptedsize;
130 static double           STATrejectedsize;
131
132
133 /* Prototypes. */
134 static ARTHANDLE *article_open(const char *path, const char *id);
135 static void article_free(ARTHANDLE *);
136
137
138 /*
139 **  Return true if the history file has the article expired.
140 */
141 static bool
142 Expired(char *MessageID) {
143     return !HISlookup(History, MessageID, NULL, NULL, NULL, NULL);
144 }
145
146
147 /*
148 **  Flush and reset the site's output buffer.  Return false on error.
149 */
150 static bool
151 REMflush(void)
152 {
153     int         i;
154
155     if (REMbuffptr == REMbuffer) return true; /* nothing buffered */
156     i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
157     REMbuffptr = REMbuffer;
158     return i < 0 ? false : true;
159 }
160
161 /*
162 **  Return index to entry matching this message ID.  Else return -1.
163 **  The hash is to speed up the search.
164 **  the protocol.
165 */
166 static int
167 stindex(char *MessageID, int hash) {
168     int i;
169
170     for (i = 0; i < STNBUF; i++) { /* linear search for ID */
171         if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
172          && (stbuf[i].st_hash == hash)) {
173             int n;
174
175             if (strcasecmp(MessageID, stbuf[i].st_id)) continue;
176
177             /* left of '@' is case sensitive */
178             for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ;
179             if (strncmp(MessageID, stbuf[i].st_id, n)) continue;
180             else break; /* found a match */
181         }
182     }
183     if (i >= STNBUF) i = -1;  /* no match found ? */
184     return (i);
185 }
186
187 /* stidhash(): calculate a hash value for message IDs to speed comparisons */
188 static int
189 stidhash(char *MessageID) {
190     char        *p;
191     int         hash;
192
193     hash = 0;
194     for (p = MessageID + 1; *p && (*p != '>'); p++) {
195         hash <<= 1;
196         if (isascii((int)*p) && isupper((int)*p)) {
197             hash += tolower(*p);
198         } else {
199             hash += *p;
200         }
201     }
202     return hash;
203 }
204
205 /* stalloc(): save path, ID, and qp into one of the streaming mode entries */
206 static int
207 stalloc(char *Article, char *MessageID, ARTHANDLE *art, int hash) {
208     int i;
209
210     for (i = 0; i < STNBUF; i++) {
211         if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
212     }
213     if (i >= STNBUF) { /* stnq says not full but can not find unused */
214         syslog(L_ERROR, "stalloc: Internal error");
215         return (-1);
216     }
217     if ((int)strlen(Article) >= SPOOLNAMEBUFF) {
218         syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
219         return (-1);
220     }
221     /* allocate buffers on first use.
222     ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
223     ** If ID is ever longer than NNTP_STRLEN then other code would break.
224     */
225     if (!stbuf[i].st_fname)
226         stbuf[i].st_fname = xmalloc(SPOOLNAMEBUFF);
227     if (!stbuf[i].st_id)
228         stbuf[i].st_id = xmalloc(NNTP_STRLEN);
229     strlcpy(stbuf[i].st_fname, Article, SPOOLNAMEBUFF);
230     strlcpy(stbuf[i].st_id, MessageID, NNTP_STRLEN);
231     stbuf[i].art = art;
232     stbuf[i].st_hash = hash;
233     stbuf[i].st_retry = 0;
234     stbuf[i].st_age = 0;
235     stnq++;
236     return i;
237 }
238
239 /* strel(): release for reuse one of the streaming mode entries */
240 static void
241 strel(int i) {
242         if (stbuf[i].art) {
243             article_free(stbuf[i].art);
244             stbuf[i].art = NULL;
245         }
246         if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
247         if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
248         stnq--;
249 }
250
251 /*
252 **  Send a line to the server, adding the dot escape and \r\n.
253 */
254 static bool
255 REMwrite(char *p, int i, bool escdot) {
256     int size;
257
258     /* Buffer too full? */
259     if (REMbuffend - REMbuffptr < i + 3) {
260         if (!REMflush())
261             return false;
262         if (REMbuffend - REMbuffer < i + 3) {
263             /* Line too long -- grow buffer. */
264             size = i * 2;
265             REMbuffer = xrealloc(REMbuffer, size);
266             REMbuffend = &REMbuffer[size];
267         }
268     }
269
270     /* Dot escape, text of the line, line terminator. */
271     if (escdot && (*p == '.'))
272         *REMbuffptr++ = '.';
273     memcpy(REMbuffptr, p, i);
274     REMbuffptr += i;
275     *REMbuffptr++ = '\r';
276     *REMbuffptr++ = '\n';
277
278     return true;
279 }
280
281
282 /*
283 **  Print transfer statistics, clean up, and exit.
284 */
285 static void
286 ExitWithStats(int x)
287 {
288     static char         QUIT[] = "quit";
289     double              usertime;
290     double              systime;
291
292     if (!Purging) {
293         REMwrite(QUIT, strlen(QUIT), false);
294         REMflush();
295     }
296     STATend = TMRnow_double();
297     if (GetResourceUsage(&usertime, &systime) < 0) {
298         usertime = 0;
299         systime = 0;
300     }
301
302     if (STATprint) {
303         printf(STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
304                 STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
305         printf("\n");
306         printf(STAT2, REMhost, usertime, systime, STATend - STATbegin);
307         printf("\n");
308     }
309
310     syslog(L_NOTICE, STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
311                 STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
312     syslog(L_NOTICE, STAT2, REMhost, usertime, systime, STATend - STATbegin);
313     if (retries)
314         syslog(L_NOTICE, "%s %lu Streaming retries", REMhost, retries);
315
316     if (BATCHfp != NULL && unlink(BATCHtemp) < 0 && errno != ENOENT)
317         syswarn("cannot remove %s", BATCHtemp);
318     sleep(1);
319     SMshutdown();
320     HISclose(History);
321     exit(x);
322     /* NOTREACHED */
323 }
324
325
326 /*
327 **  Close the batchfile and the temporary file, and rename the temporary
328 **  to be the batchfile.
329 */
330 static void
331 CloseAndRename(void)
332 {
333     /* Close the files, rename the temporary. */
334     if (BATCHqp) {
335         QIOclose(BATCHqp);
336         BATCHqp = NULL;
337     }
338     if (ferror(BATCHfp)
339      || fflush(BATCHfp) == EOF
340      || fclose(BATCHfp) == EOF) {
341         unlink(BATCHtemp);
342         syswarn("cannot close %s", BATCHtemp);
343         ExitWithStats(1);
344     }
345     if (rename(BATCHtemp, BATCHname) < 0) {
346         syswarn("cannot rename %s", BATCHtemp);
347         ExitWithStats(1);
348     }
349 }
350
351
352 /*
353 **  Requeue an article, opening the temp file if we have to.  If we get
354 **  a file write error, exit so that the original input is left alone.
355 */
356 static void
357 Requeue(const char *Article, const char *MessageID)
358 {
359     int fd;
360
361     /* Temp file already open? */
362     if (BATCHfp == NULL) {
363         fd = mkstemp(BATCHtemp);
364         if (fd < 0) {
365             syswarn("cannot create a temporary file");
366             ExitWithStats(1);
367         }
368         BATCHfp = fdopen(fd, "w");
369         if (BATCHfp == NULL) {
370             syswarn("cannot open %s", BATCHtemp);
371             ExitWithStats(1);
372         }
373     }
374
375     /* Called only to get the file open? */
376     if (Article == NULL)
377         return;
378
379     if (MessageID != NULL)
380         fprintf(BATCHfp, "%s %s\n", Article, MessageID);
381     else
382         fprintf(BATCHfp, "%s\n", Article);
383     if (fflush(BATCHfp) == EOF || ferror(BATCHfp)) {
384         syswarn("cannot requeue %s", Article);
385         ExitWithStats(1);
386     }
387 }
388
389
390 /*
391 **  Requeue an article then copy the rest of the batch file out.
392 */
393 static void
394 RequeueRestAndExit(char *Article, char *MessageID) {
395     char        *p;
396
397     if (!AlwaysRewrite
398      && STATaccepted == 0 && STATrejected == 0 && STATrefused == 0
399      && STATmissing == 0) {
400         warn("nothing sent -- leaving batchfile alone");
401         ExitWithStats(1);
402     }
403
404     warn("rewriting batch file and exiting");
405     if (CanStream) {    /* streaming mode has a buffer of articles */
406         int i;
407
408         for (i = 0; i < STNBUF; i++) {    /* requeue unacknowledged articles */
409             if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
410                 if (Debug)
411                     fprintf(stderr, "stbuf[%d]= %s, %s\n",
412                             i, stbuf[i].st_fname, stbuf[i].st_id);
413                 Requeue(stbuf[i].st_fname, stbuf[i].st_id);
414                 if (Article == stbuf[i].st_fname) Article = NULL;
415                 strel(i); /* release entry */
416             }
417         }
418     }
419     Requeue(Article, MessageID);
420
421     for ( ; BATCHqp; ) {
422         if ((p = QIOread(BATCHqp)) == NULL) {
423             if (QIOtoolong(BATCHqp)) {
424                 warn("skipping long line in %s", BATCHname);
425                 QIOread(BATCHqp);
426                 continue;
427             }
428             if (QIOerror(BATCHqp)) {
429                 syswarn("cannot read %s", BATCHname);
430                 ExitWithStats(1);
431             }
432
433             /* Normal EOF. */
434             break;
435         }
436
437         if (fprintf(BATCHfp, "%s\n", p) == EOF
438          || ferror(BATCHfp)) {
439             syswarn("cannot requeue %s", p);
440             ExitWithStats(1);
441         }
442     }
443
444     CloseAndRename();
445     ExitWithStats(1);
446 }
447
448
449 /*
450 **  Clean up the NNTP escapes from a line.
451 */
452 static char *
453 REMclean(char *buff) {
454     char        *p;
455
456     if ((p = strchr(buff, '\r')) != NULL)
457         *p = '\0';
458     if ((p = strchr(buff, '\n')) != NULL)
459         *p = '\0';
460
461     /* The dot-escape is only in text, not command responses. */
462     return buff;
463 }
464
465
466 /*
467 **  Read a line of input, with timeout.  Also handle \r\n-->\n mapping
468 **  and the dot escape.  Return true if okay, *or we got interrupted.*
469 */
470 static bool
471 REMread(char *start, int size) {
472     static int          count;
473     static char         buffer[BUFSIZ];
474     static char         *bp;
475     char                *p;
476     char                *q;
477     char                *end;
478     struct timeval      t;
479     fd_set              rmask;
480     int                 i;
481     char                c;
482
483     if (!REMflush())
484         return false;
485
486     for (p = start, end = &start[size - 1]; ; ) {
487         if (count == 0) {
488             /* Fill the buffer. */
489     Again:
490             FD_ZERO(&rmask);
491             FD_SET(FromServer, &rmask);
492             t.tv_sec = 10 * 60;
493             t.tv_usec = 0;
494             i = select(FromServer + 1, &rmask, NULL, NULL, &t);
495             if (GotInterrupt)
496                 return true;
497             if (i < 0) {
498                 if (errno == EINTR)
499                     goto Again;
500                 return false;
501             }
502             if (i == 0 || !FD_ISSET(FromServer, &rmask))
503                 return false;
504             count = read(FromServer, buffer, sizeof buffer);
505             if (GotInterrupt)
506                 return true;
507             if (count <= 0)
508                 return false;
509             bp = buffer;
510         }
511
512         /* Process next character. */
513         count--;
514         c = *bp++;
515         if (c == '\n')
516             break;
517         if (p < end)
518             *p++ = c;
519     }
520
521     /* We know we got \n; if previous char was \r, turn it into \n. */
522     if (p > start && p < end && p[-1] == '\r')
523         p[-1] = '\n';
524     *p = '\0';
525
526     /* Handle the dot escape. */
527     if (*p == '.') {
528         if (p[1] == '\n' && p[2] == '\0')
529             /* EOF. */
530             return false;
531         for (q = &start[1]; (*p++ = *q++) != '\0'; )
532             continue;
533     }
534     return true;
535 }
536
537
538 /*
539 **  Handle the interrupt.
540 */
541 static void
542 Interrupted(char *Article, char *MessageID) {
543     warn("interrupted");
544     RequeueRestAndExit(Article, MessageID);
545 }
546
547
548 /*
549 **  Returns the length of the headers.
550 */
551 static int
552 HeadersLen(ARTHANDLE *art, int *iscmsg) {
553     const char  *p;
554     char        lastchar = -1;
555
556     /* from nnrpd/article.c ARTsendmmap() */
557     for (p = art->data; p < (art->data + art->len); p++) {
558         if (*p == '\r')
559             continue;
560         if (*p == '\n') {
561             if (lastchar == '\n') {
562                 if (*(p-1) == '\r')
563                     p--;
564                 break;
565             }
566             if (*(p + 1) == 'C' && strncasecmp(p + 1, "Control: ", 9) == 0)
567                 *iscmsg = 1;
568         }
569         lastchar = *p;
570     }
571     return (p - art->data);
572 }
573
574
575 /*
576 **  Send a whole article to the server.
577 */
578 static bool
579 REMsendarticle(char *Article, char *MessageID, ARTHANDLE *art) {
580     char        buff[NNTP_STRLEN];
581
582     if (!REMflush())
583         return false;
584     if (HeadersFeed) {
585         struct iovec vec[3];
586         char buf[20];
587         int iscmsg = 0;
588         int len = HeadersLen(art, &iscmsg);
589
590         vec[0].iov_base = (char *) art->data;
591         vec[0].iov_len = len;
592         /* Add 14 bytes, which maybe will be the length of the Bytes header */
593         snprintf(buf, sizeof(buf), "Bytes: %lu\r\n",
594                  (unsigned long) art->len + 14);
595         vec[1].iov_base = buf;
596         vec[1].iov_len = strlen(buf);
597         if (iscmsg) {
598             vec[2].iov_base = (char *) art->data + len;
599             vec[2].iov_len = art->len - len;
600         } else {
601             vec[2].iov_base = (char *) "\r\n.\r\n";
602             vec[2].iov_len = 5;
603         }
604         if (xwritev(ToServer, vec, 3) < 0)
605             return false;
606     } else
607         if (xwrite(ToServer, art->data, art->len) < 0)
608             return false;
609     if (GotInterrupt)
610         Interrupted(Article, MessageID);
611     if (Debug) {
612         fprintf(stderr, "> [ article %lu ]\n", (unsigned long) art->len);
613         fprintf(stderr, "> .\n");
614     }
615
616     if (CanStream) return true; /* streaming mode does not wait for ACK */
617
618     /* What did the remote site say? */
619     if (!REMread(buff, (int)sizeof buff)) {
620         syswarn("no reply after sending %s", Article);
621         return false;
622     }
623     if (GotInterrupt)
624         Interrupted(Article, MessageID);
625     if (Debug)
626         fprintf(stderr, "< %s", buff);
627
628     /* Parse the reply. */
629     switch (atoi(buff)) {
630     default:
631         warn("unknown reply after %s -- %s", Article, buff);
632         if (DoRequeue)
633             Requeue(Article, MessageID);
634         break;
635     case NNTP_BAD_COMMAND_VAL:
636     case NNTP_SYNTAX_VAL:
637     case NNTP_ACCESS_VAL:
638         /* The receiving server is likely confused...no point in continuing */
639         syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
640         RequeueRestAndExit(Article, MessageID);
641         /* NOTREACHED */
642     case NNTP_RESENDIT_VAL:
643     case NNTP_GOODBYE_VAL:
644         Requeue(Article, MessageID);
645         break;
646     case NNTP_TOOKIT_VAL:
647         STATaccepted++;
648         STATacceptedsize += (double)art->len;
649         break;
650     case NNTP_REJECTIT_VAL:
651         if (logRejects)
652             syslog(L_NOTICE, REJECTED, REMhost,
653                    MessageID, Article, REMclean(buff));
654         STATrejected++;
655         STATrejectedsize += (double)art->len;
656         break;
657     }
658
659     /* Article sent, or we requeued it. */
660     return true;
661 }
662 \f
663
664 /*
665 **  Get the Message-ID header from an open article.
666 */
667 static char *
668 GetMessageID(ARTHANDLE *art) {
669     static char *buff;
670     static int  buffsize = 0;
671     const char  *p, *q;
672
673     p = wire_findheader(art->data, art->len, "Message-ID");
674     if (p == NULL)
675         return NULL;
676     for (q = p; q < art->data + art->len; q++) {
677         if (*q == '\r' || *q == '\n')
678             break;
679     }
680     if (q == art->data + art->len)
681         return NULL;
682     if (buffsize < q - p) {
683         if (buffsize == 0)
684             buff = xmalloc(q - p + 1);
685         else
686             buff = xrealloc(buff, q - p + 1);
687         buffsize = q - p;
688     }
689     memcpy(buff, p, q - p);
690     buff[q - p] = '\0';
691     return buff;
692 }
693 \f
694
695 /*
696 **  Mark that we got interrupted.
697 */
698 static RETSIGTYPE
699 CATCHinterrupt(int s) {
700     GotInterrupt = true;
701
702     /* Let two interrupts kill us. */
703     xsignal(s, SIG_DFL);
704 }
705
706
707 /*
708 **  Mark that the alarm went off.
709 */
710 static RETSIGTYPE
711 CATCHalarm(int s UNUSED)
712 {
713     GotAlarm = true;
714     if (JMPyes)
715         longjmp(JMPwhere, 1);
716 }
717
718 /* check articles in streaming NNTP mode
719 ** return true on failure.
720 */
721 static bool
722 check(int i) {
723     char        buff[NNTP_STRLEN];
724
725     /* send "check <ID>" to the other system */
726     snprintf(buff, sizeof(buff), "check %s", stbuf[i].st_id);
727     if (!REMwrite(buff, (int)strlen(buff), false)) {
728         syswarn("cannot check article");
729         return true;
730     }
731     STAToffered++;
732     if (Debug) {
733         if (stbuf[i].st_retry)
734             fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
735         else
736             fprintf(stderr, "> %s\n", buff);
737     }
738     if (GotInterrupt)
739         Interrupted(stbuf[i].st_fname, stbuf[i].st_id);
740
741     /* That all.  Response is checked later by strlisten() */
742     return false;
743 }
744
745 /* Send article in "takethis <id> streaming NNTP mode.
746 ** return true on failure.
747 */
748 static bool
749 takethis(int i) {
750     char        buff[NNTP_STRLEN];
751
752     if (!stbuf[i].art) {
753         warn("internal error: null article for %s in takethis",
754              stbuf[i].st_fname);
755         return true;
756     }
757     /* send "takethis <ID>" to the other system */
758     snprintf(buff, sizeof(buff), "takethis %s", stbuf[i].st_id);
759     if (!REMwrite(buff, (int)strlen(buff), false)) {
760         syswarn("cannot send takethis");
761         return true;
762     }
763     if (Debug)
764         fprintf(stderr, "> %s\n", buff);
765     if (GotInterrupt)
766         Interrupted((char *)0, (char *)0);
767     if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id, stbuf[i].art))
768         return true;
769     stbuf[i].st_size = stbuf[i].art->len;
770     article_free(stbuf[i].art); /* should not need file again */
771     stbuf[i].art = 0;           /* so close to free descriptor */
772     stbuf[i].st_age = 0;
773     /* That all.  Response is checked later by strlisten() */
774     return false;
775 }
776
777
778 /* listen for responses.  Process acknowledgments to remove items from
779 ** the queue.  Also sends the articles on request.  Returns true on error.
780 ** return true on failure.
781 */
782 static bool
783 strlisten(void)
784 {
785     int         resp;
786     int         i;
787     char        *id, *p;
788     char        buff[NNTP_STRLEN];
789     int         hash;
790
791     while(true) {
792         if (!REMread(buff, (int)sizeof buff)) {
793             syswarn("no reply to check");
794             return true;
795         }
796         if (GotInterrupt)
797             Interrupted((char *)0, (char *)0);
798         if (Debug)
799             fprintf(stderr, "< %s", buff);
800
801         /* Parse the reply. */
802         resp =  atoi(buff);
803         /* Skip the 1XX informational messages */
804         if ((resp >= 100) && (resp < 200)) continue;
805         switch (resp) { /* first time is to verify it */
806         case NNTP_ERR_GOTID_VAL:
807         case NNTP_OK_SENDID_VAL:
808         case NNTP_OK_RECID_VAL:
809         case NNTP_ERR_FAILID_VAL:
810         case NNTP_RESENDID_VAL:
811             if ((id = strchr(buff, '<')) != NULL) {
812                 p = strchr(id, '>');
813                 if (p) *(p+1) = '\0';
814                 hash = stidhash(id);
815                 i = stindex(id, hash);  /* find table entry */
816                 if (i < 0) { /* should not happen */
817                     syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
818                     return (true); /* can't find it! */
819                 }
820             } else {
821                 syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
822                 return (true);
823             }
824             break;
825         case NNTP_GOODBYE_VAL:
826             /* Most likely out of space -- no point in continuing. */
827             syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
828             return true;
829             /* NOTREACHED */
830         default:
831             syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff));
832             if (Debug)
833                 fprintf(stderr, "Unknown reply \"%s\"",
834                                                     buff);
835             return (true);
836         }
837         switch (resp) { /* now we take some action */
838         case NNTP_RESENDID_VAL: /* remote wants it later */
839             /* try again now because time has passed */
840             if (stbuf[i].st_retry < STNRETRY) {
841                 if (check(i)) return true;
842                 stbuf[i].st_retry++;
843                 stbuf[i].st_age = 0;
844             } else { /* requeue to disk for later */
845                 Requeue(stbuf[i].st_fname, stbuf[i].st_id);
846                 strel(i); /* release entry */
847             }
848             break;
849         case NNTP_ERR_GOTID_VAL:        /* remote doesn't want it */
850             strel(i); /* release entry */
851             STATrefused++;
852             stnofail = 0;
853             break;
854                 
855         case NNTP_OK_SENDID_VAL:        /* remote wants article */
856             if (takethis(i)) return true;
857             stnofail++;
858             break;
859
860         case NNTP_OK_RECID_VAL: /* remote received it OK */
861             STATacceptedsize += (double) stbuf[i].st_size;
862             strel(i); /* release entry */
863             STATaccepted++;
864             break;
865                 
866         case NNTP_ERR_FAILID_VAL:
867             STATrejectedsize += (double) stbuf[i].st_size;
868             if (logRejects)
869                 syslog(L_NOTICE, REJ_STREAM, REMhost,
870                     stbuf[i].st_fname, REMclean(buff));
871 /* XXXXX Caution THERE BE DRAGONS, I don't think this logs properly
872    The message ID is returned in the peer response... so this is redundant
873                     stbuf[i].st_id, stbuf[i].st_fname, REMclean(buff)); */
874             strel(i); /* release entry */
875             STATrejected++;
876             stnofail = 0;
877             break;
878         }
879         break;
880     }
881     return (false);
882 }
883
884 /*
885 **  Print a usage message and exit.
886 */
887 static void
888 Usage(void)
889 {
890     die("Usage: innxmit [-acdHlprs] [-t#] [-T#] host file");
891 }
892
893
894 /*
895 **  Open an article.  If the argument is a token, retrieve the article via
896 **  the storage API.  Otherwise, open the file and fake up an ARTHANDLE for
897 **  it.  Only fill in those fields that we'll need.  Articles not retrieved
898 **  via the storage API will have a type of TOKEN_EMPTY.
899 */
900 static ARTHANDLE *
901 article_open(const char *path, const char *id)
902 {
903     TOKEN token;
904     ARTHANDLE *article;
905     int fd, length;
906     struct stat st;
907     char *p;
908
909     if (IsToken(path)) {
910         token = TextToToken(path);
911         article = SMretrieve(token, RETR_ALL);
912         if (article == NULL) {
913             if (SMerrno == SMERR_NOENT || SMerrno == SMERR_UNINIT)
914                 STATmissing++;
915             else {
916                 warn("requeue %s: %s", path, SMerrorstr);
917                 Requeue(path, id);
918             }
919         }
920         return article;
921     } else {
922         char *data;
923         fd = open(path, O_RDONLY);
924         if (fd < 0)
925             return NULL;
926         if (fstat(fd, &st) < 0) {
927             syswarn("requeue %s", path);
928             Requeue(path, id);
929             return NULL;
930         }
931         article = xmalloc(sizeof(ARTHANDLE));
932         article->type = TOKEN_EMPTY;
933         article->len = st.st_size;
934         data = xmalloc(article->len);
935         if (xread(fd, data, article->len) < 0) {
936             syswarn("requeue %s", path);
937             free(data);
938             free(article);
939             close(fd);
940             Requeue(path, id);
941             return NULL;
942         }
943         close(fd);
944         p = memchr(data, '\n', article->len);
945         if (p == NULL || p == data) {
946             warn("requeue %s: cannot find headers", path);
947             free(data);
948             free(article);
949             Requeue(path, id);
950             return NULL;
951         }
952         if (p[-1] != '\r') {
953             p = ToWireFmt(data, article->len, (size_t *)&length);
954             free(data);
955             data = p;
956             article->len = length;
957         }
958         article->data = data;
959         return article;
960     }
961 }
962
963
964 /*
965 **  Free an article, using the type field to determine whether to free it
966 **  via the storage API.
967 */
968 static void
969 article_free(ARTHANDLE *article)
970 {
971     if (article->type == TOKEN_EMPTY) {
972         free((char *)article->data);
973         free(article);
974     } else
975         SMfreearticle(article);
976 }
977
978
979 int main(int ac, char *av[]) {
980     static char         SKIPPING[] = "Skipping \"%s\" --%s?\n";
981     int                 i;
982     char                *p;
983     ARTHANDLE           *art;
984     FILE                *From;
985     FILE                *To;
986     char                buff[8192+128];
987     char                *Article;
988     char                *MessageID;
989     RETSIGTYPE          (*old)(int) = NULL;
990     unsigned int        ConnectTimeout;
991     unsigned int        TotalTimeout;
992     int                 port = NNTP_PORT;
993     bool                val;
994     char                *path;
995
996     openlog("innxmit", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
997     message_program_name = "innxmit";
998
999     /* Set defaults. */
1000     if (!innconf_read(NULL))
1001         exit(1);
1002
1003     ConnectTimeout = 0;
1004     TotalTimeout = 0;
1005     
1006     umask(NEWSUMASK);
1007
1008     /* Parse JCL. */
1009     while ((i = getopt(ac, av, "lacdHprst:T:vP:")) != EOF)
1010         switch (i) {
1011         default:
1012             Usage();
1013             /* NOTREACHED */
1014         case 'P':
1015             port = atoi(optarg);
1016             break;
1017         case 'a':
1018             AlwaysRewrite = true;
1019             break;
1020         case 'c':
1021             DoCheck = false;
1022             break;
1023         case 'd':
1024             Debug = true;
1025             break;
1026         case 'H':
1027             HeadersFeed = true;
1028             break;
1029         case 'l':
1030             logRejects = true ;
1031             break ;
1032         case 'p':
1033             AlwaysRewrite = true;
1034             Purging = true;
1035             break;
1036         case 'r':
1037             DoRequeue = false;
1038             break;
1039         case 's':
1040             TryStream = false;
1041             break;
1042         case 't':
1043             ConnectTimeout = atoi(optarg);
1044             break;
1045         case 'T':
1046             TotalTimeout = atoi(optarg);
1047             break;
1048         case 'v':
1049             STATprint = true;
1050             break;
1051         }
1052     ac -= optind;
1053     av += optind;
1054
1055     /* Parse arguments; host and filename. */
1056     if (ac != 2)
1057         Usage();
1058     REMhost = av[0];
1059     BATCHname = av[1];
1060
1061     if (chdir(innconf->patharticles) < 0)
1062         sysdie("cannot cd to %s", innconf->patharticles);
1063
1064     val = true;
1065     if (!SMsetup(SM_PREOPEN,(void *)&val))
1066         die("cannot set up the storage manager");
1067     if (!SMinit())
1068         die("cannot initialize the storage manager: %s", SMerrorstr);
1069
1070     /* Open the batch file and lock others out. */
1071     if (BATCHname[0] != '/') {
1072         BATCHname = concatpath(innconf->pathoutgoing, av[1]);
1073     }
1074     if (((i = open(BATCHname, O_RDWR)) < 0) || ((BATCHqp = QIOfdopen(i)) == NULL)) {
1075         syswarn("cannot open %s", BATCHname);
1076         SMshutdown();
1077         exit(1);
1078     }
1079     if (!inn_lock_file(QIOfileno(BATCHqp), INN_LOCK_WRITE, true)) {
1080 #if     defined(EWOULDBLOCK)
1081         if (errno == EWOULDBLOCK) {
1082             SMshutdown();
1083             exit(0);
1084         }
1085 #endif  /* defined(EWOULDBLOCK) */
1086         syswarn("cannot lock %s", BATCHname);
1087         SMshutdown();
1088         exit(1);
1089     }
1090
1091     /* Get a temporary name in the same directory as the batch file. */
1092     p = strrchr(BATCHname, '/');
1093     *p = '\0';
1094     BATCHtemp = concatpath(BATCHname, "bchXXXXXX");
1095     *p = '/';
1096
1097     /* Set up buffer used by REMwrite. */
1098     REMbuffer = xmalloc(OUTPUT_BUFFER_SIZE);
1099     REMbuffend = &REMbuffer[OUTPUT_BUFFER_SIZE];
1100     REMbuffptr = REMbuffer;
1101
1102     /* Start timing. */
1103     STATbegin = TMRnow_double();
1104
1105     if (!Purging) {
1106         /* Open a connection to the remote server. */
1107         if (ConnectTimeout) {
1108             GotAlarm = false;
1109             old = xsignal(SIGALRM, CATCHalarm);
1110             if (setjmp(JMPwhere)) {
1111                 warn("cannot connect to %s: timed out", REMhost);
1112                 SMshutdown();
1113                 exit(1);
1114             }
1115             JMPyes = true;
1116             alarm(ConnectTimeout);
1117         }
1118         if (NNTPconnect(REMhost, port, &From, &To, buff) < 0 || GotAlarm) {
1119             i = errno;
1120             warn("cannot connect to %s: %s", REMhost,
1121                  buff[0] ? REMclean(buff) : strerror(errno));
1122             if (GotAlarm)
1123                 syslog(L_NOTICE, CANT_CONNECT, REMhost, "timeout");
1124             else 
1125                 syslog(L_NOTICE, CANT_CONNECT, REMhost,
1126                     buff[0] ? REMclean(buff) : strerror(i));
1127             SMshutdown();
1128             exit(1);
1129         }
1130         if (Debug)
1131             fprintf(stderr, "< %s\n", REMclean(buff));
1132         if (NNTPsendpassword(REMhost, From, To) < 0 || GotAlarm) {
1133             i = errno;
1134             syswarn("cannot authenticate with %s", REMhost);
1135             syslog(L_ERROR, CANT_AUTHENTICATE,
1136                 REMhost, GotAlarm ? "timeout" : strerror(i));
1137             /* Don't send quit; we want the remote to print a message. */
1138             SMshutdown();
1139             exit(1);
1140         }
1141         if (ConnectTimeout) {
1142             alarm(0);
1143             xsignal(SIGALRM, old);
1144             JMPyes = false;
1145         }
1146
1147         /* We no longer need standard I/O. */
1148         FromServer = fileno(From);
1149         ToServer = fileno(To);
1150
1151         if (TryStream) {
1152             if (!REMwrite(modestream, (int)strlen(modestream), false)) {
1153                 syswarn("cannot negotiate %s", modestream);
1154             }
1155             if (Debug)
1156                 fprintf(stderr, ">%s\n", modestream);
1157             /* Does he understand mode stream? */
1158             if (!REMread(buff, (int)sizeof buff)) {
1159                 syswarn("no reply to %s", modestream);
1160             } else {
1161                 if (Debug)
1162                     fprintf(stderr, "< %s", buff);
1163
1164                 /* Parse the reply. */
1165                 switch (atoi(buff)) {
1166                 default:
1167                     warn("unknown reply to %s -- %s", modestream, buff);
1168                     CanStream = false;
1169                     break;
1170                 case NNTP_OK_STREAM_VAL:        /* YES! */
1171                     CanStream = true;
1172                     break;
1173                 case NNTP_AUTH_NEEDED_VAL: /* authentication refusal */
1174                 case NNTP_BAD_COMMAND_VAL: /* normal refusal */
1175                     CanStream = false;
1176                     break;
1177                 }
1178             }
1179             if (CanStream) {
1180                 for (i = 0; i < STNBUF; i++) { /* reset buffers */
1181                     stbuf[i].st_fname = 0;
1182                     stbuf[i].st_id = 0;
1183                     stbuf[i].art = 0;
1184                 }
1185                 stnq = 0;
1186             }
1187         }
1188         if (HeadersFeed) {
1189             if (!REMwrite(modeheadfeed, strlen(modeheadfeed), false))
1190                 syswarn("cannot negotiate %s", modeheadfeed);
1191             if (Debug)
1192                 fprintf(stderr, ">%s\n", modeheadfeed);
1193             if (!REMread(buff, sizeof buff)) {
1194                 syswarn("no reply to %s", modeheadfeed);
1195             } else {
1196                 if (Debug)
1197                     fprintf(stderr, "< %s", buff);
1198
1199                 /* Parse the reply. */
1200                 switch (atoi(buff)) {
1201                 case 250:               /* YES! */
1202                     break;
1203                 case NNTP_BAD_COMMAND_VAL: /* normal refusal */
1204                     die("%s not allowed -- %s", modeheadfeed, buff);
1205                 default:
1206                     die("unknown reply to %s -- %s", modeheadfeed, buff);
1207                 }
1208             }
1209         }
1210     }
1211
1212     /* Set up signal handlers. */
1213     xsignal(SIGHUP, CATCHinterrupt);
1214     xsignal(SIGINT, CATCHinterrupt);
1215     xsignal(SIGTERM, CATCHinterrupt);
1216     xsignal(SIGPIPE, SIG_IGN);
1217     if (TotalTimeout) {
1218         xsignal(SIGALRM, CATCHalarm);
1219         alarm(TotalTimeout);
1220     }
1221
1222     path = concatpath(innconf->pathdb, _PATH_HISTORY);
1223     History = HISopen(path, innconf->hismethod, HIS_RDONLY);
1224     free(path);
1225
1226     /* Main processing loop. */
1227     GotInterrupt = false;
1228     GotAlarm = false;
1229     for (Article = NULL, MessageID = NULL; ; ) {
1230         if (GotAlarm) {
1231             warn("timed out");
1232             /* Don't resend the current article. */
1233             RequeueRestAndExit((char *)NULL, (char *)NULL);
1234         }
1235         if (GotInterrupt)
1236             Interrupted(Article, MessageID);
1237
1238         if ((Article = QIOread(BATCHqp)) == NULL) {
1239             if (QIOtoolong(BATCHqp)) {
1240                 warn("skipping long line in %s", BATCHname);
1241                 QIOread(BATCHqp);
1242                 continue;
1243             }
1244             if (QIOerror(BATCHqp)) {
1245                 syswarn("cannot read %s", BATCHname);
1246                 ExitWithStats(1);
1247             }
1248
1249             /* Normal EOF -- we're done. */
1250             QIOclose(BATCHqp);
1251             BATCHqp = NULL;
1252             break;
1253         }
1254
1255         /* Ignore blank lines. */
1256         if (*Article == '\0')
1257             continue;
1258
1259         /* Split the line into possibly two fields. */
1260         if (Article[0] == '/'
1261          && Article[strlen(innconf->patharticles)] == '/'
1262          && strncmp(Article, innconf->patharticles, strlen(innconf->patharticles)) == 0)
1263             Article += strlen(innconf->patharticles) + 1;
1264         if ((MessageID = strchr(Article, ' ')) != NULL) {
1265             *MessageID++ = '\0';
1266             if (*MessageID != '<'
1267                 || (p = strrchr(MessageID, '>')) == NULL
1268                 || *++p != '\0') {
1269                 warn("ignoring line %s %s...", Article, MessageID);
1270                 continue;
1271             }
1272         }
1273
1274         if (*Article == '\0') {
1275             if (MessageID)
1276                 warn("empty file name for %s in %s", MessageID, BATCHname);
1277             else
1278                 warn("empty file name, no message ID in %s", BATCHname);
1279             /* We could do a history lookup. */
1280             continue;
1281         }
1282
1283         if (Purging && MessageID != NULL && !Expired(MessageID)) {
1284             Requeue(Article, MessageID);
1285             continue;
1286         }
1287
1288         /* Drop articles with a message ID longer than NNTP_MSGID_MAXLEN to
1289            avoid overrunning buffers and throwing the server on the
1290            receiving end a blow from behind. */
1291         if (MessageID != NULL && strlen(MessageID) > NNTP_MSGID_MAXLEN) {
1292             warn("dropping article in %s: long message ID %s", BATCHname,
1293                  MessageID);
1294             continue;
1295         }
1296
1297         art = article_open(Article, MessageID);
1298         if (art == NULL)
1299             continue;
1300
1301         if (Purging) {
1302             article_free(art);
1303             Requeue(Article, MessageID);
1304             continue;
1305         }
1306
1307         /* Get the Message-ID from the article if we need to. */
1308         if (MessageID == NULL) {
1309             if ((MessageID = GetMessageID(art)) == NULL) {
1310                 warn(SKIPPING, Article, "no message ID");
1311                 article_free(art);
1312                 continue;
1313             }
1314         }
1315         if (GotInterrupt)
1316             Interrupted(Article, MessageID);
1317
1318         /* Offer the article. */
1319         if (CanStream) {
1320             int lim;
1321             int hash;
1322
1323             hash = stidhash(MessageID);
1324             if (stindex(MessageID, hash) >= 0) { /* skip duplicates in queue */
1325                 if (Debug)
1326                     fprintf(stderr, "Skipping duplicate ID %s\n",
1327                                                             MessageID);
1328                 article_free(art);
1329                 continue;
1330             }
1331             /* This code tries to optimize by sending a burst of "check"
1332              * commands before flushing the buffer.  This should result
1333              * in several being sent in one packet reducing the network
1334              * overhead.
1335              */
1336             if (DoCheck && (stnofail < STNC)) lim = STNBUF;
1337             else                              lim = STNBUFL;
1338             if (stnq >= lim) { /* need to empty a buffer */
1339                 while (stnq >= STNBUFL) { /* or several */
1340                     if (strlisten()) {
1341                         RequeueRestAndExit(Article, MessageID);
1342                     }
1343                 }
1344             }
1345             /* save new article in the buffer */
1346             i = stalloc(Article, MessageID, art, hash);
1347             if (i < 0) {
1348                 article_free(art);
1349                 RequeueRestAndExit(Article, MessageID);
1350             }
1351             if (DoCheck && (stnofail < STNC)) {
1352                 if (check(i)) {
1353                     RequeueRestAndExit((char *)NULL, (char *)NULL);
1354                 }
1355             } else {
1356                 STAToffered++ ;
1357                 if (takethis(i)) {
1358                     RequeueRestAndExit((char *)NULL, (char *)NULL);
1359                 }
1360             }
1361             /* check for need to resend any IDs */
1362             for (i = 0; i < STNBUF; i++) {
1363                 if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
1364                     if (stbuf[i].st_age++ > stnq) {
1365                         /* This should not happen but just in case ... */
1366                         if (stbuf[i].st_retry < STNRETRY) {
1367                             if (check(i)) /* resend check */
1368                                 RequeueRestAndExit((char *)NULL, (char *)NULL);
1369                             retries++;
1370                             stbuf[i].st_retry++;
1371                             stbuf[i].st_age = 0;
1372                         } else { /* requeue to disk for later */
1373                             Requeue(stbuf[i].st_fname, stbuf[i].st_id);
1374                             strel(i); /* release entry */
1375                         }
1376                     }
1377                 }
1378             }
1379             continue; /* next article */
1380         }
1381         snprintf(buff, sizeof(buff), "ihave %s", MessageID);
1382         if (!REMwrite(buff, (int)strlen(buff), false)) {
1383             syswarn("cannot offer article");
1384             article_free(art);
1385             RequeueRestAndExit(Article, MessageID);
1386         }
1387         STAToffered++;
1388         if (Debug)
1389             fprintf(stderr, "> %s\n", buff);
1390         if (GotInterrupt)
1391             Interrupted(Article, MessageID);
1392
1393         /* Does he want it? */
1394         if (!REMread(buff, (int)sizeof buff)) {
1395             syswarn("no reply to ihave");
1396             article_free(art);
1397             RequeueRestAndExit(Article, MessageID);
1398         }
1399         if (GotInterrupt)
1400             Interrupted(Article, MessageID);
1401         if (Debug)
1402             fprintf(stderr, "< %s", buff);
1403
1404         /* Parse the reply. */
1405         switch (atoi(buff)) {
1406         default:
1407             warn("unknown reply to %s -- %s", Article, buff);
1408             if (DoRequeue)
1409                 Requeue(Article, MessageID);
1410             break;
1411         case NNTP_BAD_COMMAND_VAL:
1412         case NNTP_SYNTAX_VAL:
1413         case NNTP_ACCESS_VAL:
1414             /* The receiving server is likely confused...no point in continuing */
1415             syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
1416             RequeueRestAndExit(Article, MessageID);
1417             /* NOTREACHED */
1418         case NNTP_AUTH_NEEDED_VAL:
1419         case NNTP_RESENDIT_VAL:
1420         case NNTP_GOODBYE_VAL:
1421             /* Most likely out of space -- no point in continuing. */
1422             syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
1423             RequeueRestAndExit(Article, MessageID);
1424             /* NOTREACHED */
1425         case NNTP_SENDIT_VAL:
1426             if (!REMsendarticle(Article, MessageID, art))
1427                 RequeueRestAndExit(Article, MessageID);
1428             break;
1429         case NNTP_HAVEIT_VAL:
1430             STATrefused++;
1431             break;
1432 #if     defined(NNTP_SENDIT_LATER)
1433         case NNTP_SENDIT_LATER_VAL:
1434             Requeue(Article, MessageID);
1435             break;
1436 #endif  /* defined(NNTP_SENDIT_LATER) */
1437         }
1438
1439         article_free(art);
1440     }
1441     if (CanStream) { /* need to wait for rest of ACKs */
1442         while (stnq > 0) {
1443             if (strlisten()) {
1444                 RequeueRestAndExit((char *)NULL, (char *)NULL);
1445             }
1446         }
1447     }
1448
1449     if (BATCHfp != NULL)
1450         /* We requeued something, so close the temp file. */
1451         CloseAndRename();
1452     else if (unlink(BATCHname) < 0 && errno != ENOENT)
1453         syswarn("cannot remove %s", BATCHtemp);
1454     ExitWithStats(0);
1455     /* NOTREACHED */
1456     return 0;
1457 }