1 /* $Id: innxmit.c 6716 2004-05-16 20:26:56Z rra $
3 ** Transmit articles to remote site.
4 ** Modified for NNTP streaming: 1996-01-03 Jerry Aguirre
9 #include "portable/socket.h"
10 #include "portable/time.h"
20 /* Needed on AIX 4.1 to get fd_set and friends. */
21 #ifdef HAVE_SYS_SELECT_H
22 # include <sys/select.h>
25 #include "inn/history.h"
26 #include "inn/innconf.h"
27 #include "inn/messages.h"
29 #include "inn/timer.h"
36 #define OUTPUT_BUFFER_SIZE (16 * 1024)
38 /* Streaming extensions to NNTP. This extension removes the lock-step
39 ** limitation of conventional NNTP. Article transfer is several times
40 ** faster. Negotiated and falls back to old mode if receiver refuses.
43 /* max number of articles that can be streamed ahead */
46 /* Send "takethis" without "check" if this many articles were
51 /* typical number of articles to stream */
52 /* must be able to fopen this many articles */
53 #define STNBUFL (STNBUF/2)
55 /* number of retries before requeueing to disk */
58 struct stbufs { /* for each article we are procesing */
59 char *st_fname; /* file name */
60 char *st_id; /* message ID */
61 int st_retry; /* retry count */
62 int st_age; /* age count */
63 ARTHANDLE *art; /* arthandle to read article contents */
64 int st_hash; /* hash value to speed searches */
65 long st_size; /* article size */
67 static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
68 static int stnq; /* current number of active entries in stbuf */
69 static long stnofail; /* Count of consecutive successful sends */
71 static int TryStream = true; /* Should attempt stream negotation? */
72 static int CanStream = false; /* Result of stream negotation */
73 static int DoCheck = true; /* Should check before takethis? */
74 static char modestream[] = "mode stream";
75 static char modeheadfeed[] = "mode headfeed";
76 static long retries = 0;
77 static int logRejects = false ; /* syslog the 437 responses. */
82 ** Syslog formats - collected together so they remain consistent
85 "%s stats offered %lu accepted %lu refused %lu rejected %lu missing %lu accsize %.0f rejsize %.0f";
86 static char STAT2[] = "%s times user %.3f system %.3f elapsed %.3f";
87 static char GOT_BADCOMMAND[] = "%s rejected %s %s";
88 static char REJECTED[] = "%s rejected %s (%s) %s";
89 static char REJ_STREAM[] = "%s rejected (%s) %s";
90 static char CANT_CONNECT[] = "%s connect failed %s";
91 static char CANT_AUTHENTICATE[] = "%s authenticate failed %s";
92 static char IHAVE_FAIL[] = "%s ihave failed %s";
94 static char CANT_FINDIT[] = "%s can't find %s";
95 static char CANT_PARSEIT[] = "%s can't parse ID %s";
96 static char UNEXPECTED[] = "%s unexpected response code %s";
101 static bool AlwaysRewrite;
103 static bool DoRequeue = true;
105 static bool STATprint;
106 static bool HeadersFeed;
107 static char *BATCHname;
108 static char *BATCHtemp;
109 static char *REMhost;
110 static double STATbegin;
111 static double STATend;
112 static FILE *BATCHfp;
113 static int FromServer;
115 static struct history *History;
116 static QIOSTATE *BATCHqp;
117 static sig_atomic_t GotAlarm;
118 static sig_atomic_t GotInterrupt;
119 static sig_atomic_t JMPyes;
120 static jmp_buf JMPwhere;
121 static char *REMbuffer;
122 static char *REMbuffptr;
123 static char *REMbuffend;
124 static unsigned long STATaccepted;
125 static unsigned long STAToffered;
126 static unsigned long STATrefused;
127 static unsigned long STATrejected;
128 static unsigned long STATmissing;
129 static double STATacceptedsize;
130 static double STATrejectedsize;
134 static ARTHANDLE *article_open(const char *path, const char *id);
135 static void article_free(ARTHANDLE *);
139 ** Return true if the history file has the article expired.
142 Expired(char *MessageID) {
143 return !HISlookup(History, MessageID, NULL, NULL, NULL, NULL);
148 ** Flush and reset the site's output buffer. Return false on error.
155 if (REMbuffptr == REMbuffer) return true; /* nothing buffered */
156 i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
157 REMbuffptr = REMbuffer;
158 return i < 0 ? false : true;
162 ** Return index to entry matching this message ID. Else return -1.
163 ** The hash is to speed up the search.
167 stindex(char *MessageID, int hash) {
170 for (i = 0; i < STNBUF; i++) { /* linear search for ID */
171 if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
172 && (stbuf[i].st_hash == hash)) {
175 if (strcasecmp(MessageID, stbuf[i].st_id)) continue;
177 /* left of '@' is case sensitive */
178 for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ;
179 if (strncmp(MessageID, stbuf[i].st_id, n)) continue;
180 else break; /* found a match */
183 if (i >= STNBUF) i = -1; /* no match found ? */
187 /* stidhash(): calculate a hash value for message IDs to speed comparisons */
189 stidhash(char *MessageID) {
194 for (p = MessageID + 1; *p && (*p != '>'); p++) {
196 if (isascii((int)*p) && isupper((int)*p)) {
205 /* stalloc(): save path, ID, and qp into one of the streaming mode entries */
207 stalloc(char *Article, char *MessageID, ARTHANDLE *art, int hash) {
210 for (i = 0; i < STNBUF; i++) {
211 if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
213 if (i >= STNBUF) { /* stnq says not full but can not find unused */
214 syslog(L_ERROR, "stalloc: Internal error");
217 if ((int)strlen(Article) >= SPOOLNAMEBUFF) {
218 syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
221 /* allocate buffers on first use.
222 ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
223 ** If ID is ever longer than NNTP_STRLEN then other code would break.
225 if (!stbuf[i].st_fname)
226 stbuf[i].st_fname = xmalloc(SPOOLNAMEBUFF);
228 stbuf[i].st_id = xmalloc(NNTP_STRLEN);
229 strlcpy(stbuf[i].st_fname, Article, SPOOLNAMEBUFF);
230 strlcpy(stbuf[i].st_id, MessageID, NNTP_STRLEN);
232 stbuf[i].st_hash = hash;
233 stbuf[i].st_retry = 0;
239 /* strel(): release for reuse one of the streaming mode entries */
243 article_free(stbuf[i].art);
246 if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
247 if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
252 ** Send a line to the server, adding the dot escape and \r\n.
255 REMwrite(char *p, int i, bool escdot) {
258 /* Buffer too full? */
259 if (REMbuffend - REMbuffptr < i + 3) {
262 if (REMbuffend - REMbuffer < i + 3) {
263 /* Line too long -- grow buffer. */
265 REMbuffer = xrealloc(REMbuffer, size);
266 REMbuffend = &REMbuffer[size];
270 /* Dot escape, text of the line, line terminator. */
271 if (escdot && (*p == '.'))
273 memcpy(REMbuffptr, p, i);
275 *REMbuffptr++ = '\r';
276 *REMbuffptr++ = '\n';
283 ** Print transfer statistics, clean up, and exit.
288 static char QUIT[] = "quit";
293 REMwrite(QUIT, strlen(QUIT), false);
296 STATend = TMRnow_double();
297 if (GetResourceUsage(&usertime, &systime) < 0) {
303 printf(STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
304 STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
306 printf(STAT2, REMhost, usertime, systime, STATend - STATbegin);
310 syslog(L_NOTICE, STAT1, REMhost, STAToffered, STATaccepted, STATrefused,
311 STATrejected, STATmissing, STATacceptedsize, STATrejectedsize);
312 syslog(L_NOTICE, STAT2, REMhost, usertime, systime, STATend - STATbegin);
314 syslog(L_NOTICE, "%s %lu Streaming retries", REMhost, retries);
316 if (BATCHfp != NULL && unlink(BATCHtemp) < 0 && errno != ENOENT)
317 syswarn("cannot remove %s", BATCHtemp);
327 ** Close the batchfile and the temporary file, and rename the temporary
328 ** to be the batchfile.
333 /* Close the files, rename the temporary. */
339 || fflush(BATCHfp) == EOF
340 || fclose(BATCHfp) == EOF) {
342 syswarn("cannot close %s", BATCHtemp);
345 if (rename(BATCHtemp, BATCHname) < 0) {
346 syswarn("cannot rename %s", BATCHtemp);
353 ** Requeue an article, opening the temp file if we have to. If we get
354 ** a file write error, exit so that the original input is left alone.
357 Requeue(const char *Article, const char *MessageID)
361 /* Temp file already open? */
362 if (BATCHfp == NULL) {
363 fd = mkstemp(BATCHtemp);
365 syswarn("cannot create a temporary file");
368 BATCHfp = fdopen(fd, "w");
369 if (BATCHfp == NULL) {
370 syswarn("cannot open %s", BATCHtemp);
375 /* Called only to get the file open? */
379 if (MessageID != NULL)
380 fprintf(BATCHfp, "%s %s\n", Article, MessageID);
382 fprintf(BATCHfp, "%s\n", Article);
383 if (fflush(BATCHfp) == EOF || ferror(BATCHfp)) {
384 syswarn("cannot requeue %s", Article);
391 ** Requeue an article then copy the rest of the batch file out.
394 RequeueRestAndExit(char *Article, char *MessageID) {
398 && STATaccepted == 0 && STATrejected == 0 && STATrefused == 0
399 && STATmissing == 0) {
400 warn("nothing sent -- leaving batchfile alone");
404 warn("rewriting batch file and exiting");
405 if (CanStream) { /* streaming mode has a buffer of articles */
408 for (i = 0; i < STNBUF; i++) { /* requeue unacknowledged articles */
409 if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
411 fprintf(stderr, "stbuf[%d]= %s, %s\n",
412 i, stbuf[i].st_fname, stbuf[i].st_id);
413 Requeue(stbuf[i].st_fname, stbuf[i].st_id);
414 if (Article == stbuf[i].st_fname) Article = NULL;
415 strel(i); /* release entry */
419 Requeue(Article, MessageID);
422 if ((p = QIOread(BATCHqp)) == NULL) {
423 if (QIOtoolong(BATCHqp)) {
424 warn("skipping long line in %s", BATCHname);
428 if (QIOerror(BATCHqp)) {
429 syswarn("cannot read %s", BATCHname);
437 if (fprintf(BATCHfp, "%s\n", p) == EOF
438 || ferror(BATCHfp)) {
439 syswarn("cannot requeue %s", p);
450 ** Clean up the NNTP escapes from a line.
453 REMclean(char *buff) {
456 if ((p = strchr(buff, '\r')) != NULL)
458 if ((p = strchr(buff, '\n')) != NULL)
461 /* The dot-escape is only in text, not command responses. */
467 ** Read a line of input, with timeout. Also handle \r\n-->\n mapping
468 ** and the dot escape. Return true if okay, *or we got interrupted.*
471 REMread(char *start, int size) {
473 static char buffer[BUFSIZ];
486 for (p = start, end = &start[size - 1]; ; ) {
488 /* Fill the buffer. */
491 FD_SET(FromServer, &rmask);
494 i = select(FromServer + 1, &rmask, NULL, NULL, &t);
502 if (i == 0 || !FD_ISSET(FromServer, &rmask))
504 count = read(FromServer, buffer, sizeof buffer);
512 /* Process next character. */
521 /* We know we got \n; if previous char was \r, turn it into \n. */
522 if (p > start && p < end && p[-1] == '\r')
526 /* Handle the dot escape. */
528 if (p[1] == '\n' && p[2] == '\0')
531 for (q = &start[1]; (*p++ = *q++) != '\0'; )
539 ** Handle the interrupt.
542 Interrupted(char *Article, char *MessageID) {
544 RequeueRestAndExit(Article, MessageID);
549 ** Returns the length of the headers.
552 HeadersLen(ARTHANDLE *art, int *iscmsg) {
556 /* from nnrpd/article.c ARTsendmmap() */
557 for (p = art->data; p < (art->data + art->len); p++) {
561 if (lastchar == '\n') {
566 if (*(p + 1) == 'C' && strncasecmp(p + 1, "Control: ", 9) == 0)
571 return (p - art->data);
576 ** Send a whole article to the server.
579 REMsendarticle(char *Article, char *MessageID, ARTHANDLE *art) {
580 char buff[NNTP_STRLEN];
588 int len = HeadersLen(art, &iscmsg);
590 vec[0].iov_base = (char *) art->data;
591 vec[0].iov_len = len;
592 /* Add 14 bytes, which maybe will be the length of the Bytes header */
593 snprintf(buf, sizeof(buf), "Bytes: %lu\r\n",
594 (unsigned long) art->len + 14);
595 vec[1].iov_base = buf;
596 vec[1].iov_len = strlen(buf);
598 vec[2].iov_base = (char *) art->data + len;
599 vec[2].iov_len = art->len - len;
601 vec[2].iov_base = (char *) "\r\n.\r\n";
604 if (xwritev(ToServer, vec, 3) < 0)
607 if (xwrite(ToServer, art->data, art->len) < 0)
610 Interrupted(Article, MessageID);
612 fprintf(stderr, "> [ article %lu ]\n", (unsigned long) art->len);
613 fprintf(stderr, "> .\n");
616 if (CanStream) return true; /* streaming mode does not wait for ACK */
618 /* What did the remote site say? */
619 if (!REMread(buff, (int)sizeof buff)) {
620 syswarn("no reply after sending %s", Article);
624 Interrupted(Article, MessageID);
626 fprintf(stderr, "< %s", buff);
628 /* Parse the reply. */
629 switch (atoi(buff)) {
631 warn("unknown reply after %s -- %s", Article, buff);
633 Requeue(Article, MessageID);
635 case NNTP_BAD_COMMAND_VAL:
636 case NNTP_SYNTAX_VAL:
637 case NNTP_ACCESS_VAL:
638 /* The receiving server is likely confused...no point in continuing */
639 syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
640 RequeueRestAndExit(Article, MessageID);
642 case NNTP_RESENDIT_VAL:
643 case NNTP_GOODBYE_VAL:
644 Requeue(Article, MessageID);
646 case NNTP_TOOKIT_VAL:
648 STATacceptedsize += (double)art->len;
650 case NNTP_REJECTIT_VAL:
652 syslog(L_NOTICE, REJECTED, REMhost,
653 MessageID, Article, REMclean(buff));
655 STATrejectedsize += (double)art->len;
659 /* Article sent, or we requeued it. */
665 ** Get the Message-ID header from an open article.
668 GetMessageID(ARTHANDLE *art) {
670 static int buffsize = 0;
673 p = wire_findheader(art->data, art->len, "Message-ID");
676 for (q = p; q < art->data + art->len; q++) {
677 if (*q == '\r' || *q == '\n')
680 if (q == art->data + art->len)
682 if (buffsize < q - p) {
684 buff = xmalloc(q - p + 1);
686 buff = xrealloc(buff, q - p + 1);
689 memcpy(buff, p, q - p);
696 ** Mark that we got interrupted.
699 CATCHinterrupt(int s) {
702 /* Let two interrupts kill us. */
708 ** Mark that the alarm went off.
711 CATCHalarm(int s UNUSED)
715 longjmp(JMPwhere, 1);
718 /* check articles in streaming NNTP mode
719 ** return true on failure.
723 char buff[NNTP_STRLEN];
725 /* send "check <ID>" to the other system */
726 snprintf(buff, sizeof(buff), "check %s", stbuf[i].st_id);
727 if (!REMwrite(buff, (int)strlen(buff), false)) {
728 syswarn("cannot check article");
733 if (stbuf[i].st_retry)
734 fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
736 fprintf(stderr, "> %s\n", buff);
739 Interrupted(stbuf[i].st_fname, stbuf[i].st_id);
741 /* That all. Response is checked later by strlisten() */
745 /* Send article in "takethis <id> streaming NNTP mode.
746 ** return true on failure.
750 char buff[NNTP_STRLEN];
753 warn("internal error: null article for %s in takethis",
757 /* send "takethis <ID>" to the other system */
758 snprintf(buff, sizeof(buff), "takethis %s", stbuf[i].st_id);
759 if (!REMwrite(buff, (int)strlen(buff), false)) {
760 syswarn("cannot send takethis");
764 fprintf(stderr, "> %s\n", buff);
766 Interrupted((char *)0, (char *)0);
767 if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id, stbuf[i].art))
769 stbuf[i].st_size = stbuf[i].art->len;
770 article_free(stbuf[i].art); /* should not need file again */
771 stbuf[i].art = 0; /* so close to free descriptor */
773 /* That all. Response is checked later by strlisten() */
778 /* listen for responses. Process acknowledgments to remove items from
779 ** the queue. Also sends the articles on request. Returns true on error.
780 ** return true on failure.
788 char buff[NNTP_STRLEN];
792 if (!REMread(buff, (int)sizeof buff)) {
793 syswarn("no reply to check");
797 Interrupted((char *)0, (char *)0);
799 fprintf(stderr, "< %s", buff);
801 /* Parse the reply. */
803 /* Skip the 1XX informational messages */
804 if ((resp >= 100) && (resp < 200)) continue;
805 switch (resp) { /* first time is to verify it */
806 case NNTP_ERR_GOTID_VAL:
807 case NNTP_OK_SENDID_VAL:
808 case NNTP_OK_RECID_VAL:
809 case NNTP_ERR_FAILID_VAL:
810 case NNTP_RESENDID_VAL:
811 if ((id = strchr(buff, '<')) != NULL) {
813 if (p) *(p+1) = '\0';
815 i = stindex(id, hash); /* find table entry */
816 if (i < 0) { /* should not happen */
817 syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
818 return (true); /* can't find it! */
821 syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
825 case NNTP_GOODBYE_VAL:
826 /* Most likely out of space -- no point in continuing. */
827 syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
831 syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff));
833 fprintf(stderr, "Unknown reply \"%s\"",
837 switch (resp) { /* now we take some action */
838 case NNTP_RESENDID_VAL: /* remote wants it later */
839 /* try again now because time has passed */
840 if (stbuf[i].st_retry < STNRETRY) {
841 if (check(i)) return true;
844 } else { /* requeue to disk for later */
845 Requeue(stbuf[i].st_fname, stbuf[i].st_id);
846 strel(i); /* release entry */
849 case NNTP_ERR_GOTID_VAL: /* remote doesn't want it */
850 strel(i); /* release entry */
855 case NNTP_OK_SENDID_VAL: /* remote wants article */
856 if (takethis(i)) return true;
860 case NNTP_OK_RECID_VAL: /* remote received it OK */
861 STATacceptedsize += (double) stbuf[i].st_size;
862 strel(i); /* release entry */
866 case NNTP_ERR_FAILID_VAL:
867 STATrejectedsize += (double) stbuf[i].st_size;
869 syslog(L_NOTICE, REJ_STREAM, REMhost,
870 stbuf[i].st_fname, REMclean(buff));
871 /* XXXXX Caution THERE BE DRAGONS, I don't think this logs properly
872 The message ID is returned in the peer response... so this is redundant
873 stbuf[i].st_id, stbuf[i].st_fname, REMclean(buff)); */
874 strel(i); /* release entry */
885 ** Print a usage message and exit.
890 die("Usage: innxmit [-acdHlprs] [-t#] [-T#] host file");
895 ** Open an article. If the argument is a token, retrieve the article via
896 ** the storage API. Otherwise, open the file and fake up an ARTHANDLE for
897 ** it. Only fill in those fields that we'll need. Articles not retrieved
898 ** via the storage API will have a type of TOKEN_EMPTY.
901 article_open(const char *path, const char *id)
910 token = TextToToken(path);
911 article = SMretrieve(token, RETR_ALL);
912 if (article == NULL) {
913 if (SMerrno == SMERR_NOENT || SMerrno == SMERR_UNINIT)
916 warn("requeue %s: %s", path, SMerrorstr);
923 fd = open(path, O_RDONLY);
926 if (fstat(fd, &st) < 0) {
927 syswarn("requeue %s", path);
931 article = xmalloc(sizeof(ARTHANDLE));
932 article->type = TOKEN_EMPTY;
933 article->len = st.st_size;
934 data = xmalloc(article->len);
935 if (xread(fd, data, article->len) < 0) {
936 syswarn("requeue %s", path);
944 p = memchr(data, '\n', article->len);
945 if (p == NULL || p == data) {
946 warn("requeue %s: cannot find headers", path);
953 p = ToWireFmt(data, article->len, (size_t *)&length);
956 article->len = length;
958 article->data = data;
965 ** Free an article, using the type field to determine whether to free it
966 ** via the storage API.
969 article_free(ARTHANDLE *article)
971 if (article->type == TOKEN_EMPTY) {
972 free((char *)article->data);
975 SMfreearticle(article);
979 int main(int ac, char *av[]) {
980 static char SKIPPING[] = "Skipping \"%s\" --%s?\n";
989 RETSIGTYPE (*old)(int) = NULL;
990 unsigned int ConnectTimeout;
991 unsigned int TotalTimeout;
992 int port = NNTP_PORT;
996 openlog("innxmit", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
997 message_program_name = "innxmit";
1000 if (!innconf_read(NULL))
1009 while ((i = getopt(ac, av, "lacdHprst:T:vP:")) != EOF)
1015 port = atoi(optarg);
1018 AlwaysRewrite = true;
1033 AlwaysRewrite = true;
1043 ConnectTimeout = atoi(optarg);
1046 TotalTimeout = atoi(optarg);
1055 /* Parse arguments; host and filename. */
1061 if (chdir(innconf->patharticles) < 0)
1062 sysdie("cannot cd to %s", innconf->patharticles);
1065 if (!SMsetup(SM_PREOPEN,(void *)&val))
1066 die("cannot set up the storage manager");
1068 die("cannot initialize the storage manager: %s", SMerrorstr);
1070 /* Open the batch file and lock others out. */
1071 if (BATCHname[0] != '/') {
1072 BATCHname = concatpath(innconf->pathoutgoing, av[1]);
1074 if (((i = open(BATCHname, O_RDWR)) < 0) || ((BATCHqp = QIOfdopen(i)) == NULL)) {
1075 syswarn("cannot open %s", BATCHname);
1079 if (!inn_lock_file(QIOfileno(BATCHqp), INN_LOCK_WRITE, true)) {
1080 #if defined(EWOULDBLOCK)
1081 if (errno == EWOULDBLOCK) {
1085 #endif /* defined(EWOULDBLOCK) */
1086 syswarn("cannot lock %s", BATCHname);
1091 /* Get a temporary name in the same directory as the batch file. */
1092 p = strrchr(BATCHname, '/');
1094 BATCHtemp = concatpath(BATCHname, "bchXXXXXX");
1097 /* Set up buffer used by REMwrite. */
1098 REMbuffer = xmalloc(OUTPUT_BUFFER_SIZE);
1099 REMbuffend = &REMbuffer[OUTPUT_BUFFER_SIZE];
1100 REMbuffptr = REMbuffer;
1103 STATbegin = TMRnow_double();
1106 /* Open a connection to the remote server. */
1107 if (ConnectTimeout) {
1109 old = xsignal(SIGALRM, CATCHalarm);
1110 if (setjmp(JMPwhere)) {
1111 warn("cannot connect to %s: timed out", REMhost);
1116 alarm(ConnectTimeout);
1118 if (NNTPconnect(REMhost, port, &From, &To, buff) < 0 || GotAlarm) {
1120 warn("cannot connect to %s: %s", REMhost,
1121 buff[0] ? REMclean(buff) : strerror(errno));
1123 syslog(L_NOTICE, CANT_CONNECT, REMhost, "timeout");
1125 syslog(L_NOTICE, CANT_CONNECT, REMhost,
1126 buff[0] ? REMclean(buff) : strerror(i));
1131 fprintf(stderr, "< %s\n", REMclean(buff));
1132 if (NNTPsendpassword(REMhost, From, To) < 0 || GotAlarm) {
1134 syswarn("cannot authenticate with %s", REMhost);
1135 syslog(L_ERROR, CANT_AUTHENTICATE,
1136 REMhost, GotAlarm ? "timeout" : strerror(i));
1137 /* Don't send quit; we want the remote to print a message. */
1141 if (ConnectTimeout) {
1143 xsignal(SIGALRM, old);
1147 /* We no longer need standard I/O. */
1148 FromServer = fileno(From);
1149 ToServer = fileno(To);
1152 if (!REMwrite(modestream, (int)strlen(modestream), false)) {
1153 syswarn("cannot negotiate %s", modestream);
1156 fprintf(stderr, ">%s\n", modestream);
1157 /* Does he understand mode stream? */
1158 if (!REMread(buff, (int)sizeof buff)) {
1159 syswarn("no reply to %s", modestream);
1162 fprintf(stderr, "< %s", buff);
1164 /* Parse the reply. */
1165 switch (atoi(buff)) {
1167 warn("unknown reply to %s -- %s", modestream, buff);
1170 case NNTP_OK_STREAM_VAL: /* YES! */
1173 case NNTP_AUTH_NEEDED_VAL: /* authentication refusal */
1174 case NNTP_BAD_COMMAND_VAL: /* normal refusal */
1180 for (i = 0; i < STNBUF; i++) { /* reset buffers */
1181 stbuf[i].st_fname = 0;
1189 if (!REMwrite(modeheadfeed, strlen(modeheadfeed), false))
1190 syswarn("cannot negotiate %s", modeheadfeed);
1192 fprintf(stderr, ">%s\n", modeheadfeed);
1193 if (!REMread(buff, sizeof buff)) {
1194 syswarn("no reply to %s", modeheadfeed);
1197 fprintf(stderr, "< %s", buff);
1199 /* Parse the reply. */
1200 switch (atoi(buff)) {
1201 case 250: /* YES! */
1203 case NNTP_BAD_COMMAND_VAL: /* normal refusal */
1204 die("%s not allowed -- %s", modeheadfeed, buff);
1206 die("unknown reply to %s -- %s", modeheadfeed, buff);
1212 /* Set up signal handlers. */
1213 xsignal(SIGHUP, CATCHinterrupt);
1214 xsignal(SIGINT, CATCHinterrupt);
1215 xsignal(SIGTERM, CATCHinterrupt);
1216 xsignal(SIGPIPE, SIG_IGN);
1218 xsignal(SIGALRM, CATCHalarm);
1219 alarm(TotalTimeout);
1222 path = concatpath(innconf->pathdb, _PATH_HISTORY);
1223 History = HISopen(path, innconf->hismethod, HIS_RDONLY);
1226 /* Main processing loop. */
1227 GotInterrupt = false;
1229 for (Article = NULL, MessageID = NULL; ; ) {
1232 /* Don't resend the current article. */
1233 RequeueRestAndExit((char *)NULL, (char *)NULL);
1236 Interrupted(Article, MessageID);
1238 if ((Article = QIOread(BATCHqp)) == NULL) {
1239 if (QIOtoolong(BATCHqp)) {
1240 warn("skipping long line in %s", BATCHname);
1244 if (QIOerror(BATCHqp)) {
1245 syswarn("cannot read %s", BATCHname);
1249 /* Normal EOF -- we're done. */
1255 /* Ignore blank lines. */
1256 if (*Article == '\0')
1259 /* Split the line into possibly two fields. */
1260 if (Article[0] == '/'
1261 && Article[strlen(innconf->patharticles)] == '/'
1262 && strncmp(Article, innconf->patharticles, strlen(innconf->patharticles)) == 0)
1263 Article += strlen(innconf->patharticles) + 1;
1264 if ((MessageID = strchr(Article, ' ')) != NULL) {
1265 *MessageID++ = '\0';
1266 if (*MessageID != '<'
1267 || (p = strrchr(MessageID, '>')) == NULL
1269 warn("ignoring line %s %s...", Article, MessageID);
1274 if (*Article == '\0') {
1276 warn("empty file name for %s in %s", MessageID, BATCHname);
1278 warn("empty file name, no message ID in %s", BATCHname);
1279 /* We could do a history lookup. */
1283 if (Purging && MessageID != NULL && !Expired(MessageID)) {
1284 Requeue(Article, MessageID);
1288 /* Drop articles with a message ID longer than NNTP_MSGID_MAXLEN to
1289 avoid overrunning buffers and throwing the server on the
1290 receiving end a blow from behind. */
1291 if (MessageID != NULL && strlen(MessageID) > NNTP_MSGID_MAXLEN) {
1292 warn("dropping article in %s: long message ID %s", BATCHname,
1297 art = article_open(Article, MessageID);
1303 Requeue(Article, MessageID);
1307 /* Get the Message-ID from the article if we need to. */
1308 if (MessageID == NULL) {
1309 if ((MessageID = GetMessageID(art)) == NULL) {
1310 warn(SKIPPING, Article, "no message ID");
1316 Interrupted(Article, MessageID);
1318 /* Offer the article. */
1323 hash = stidhash(MessageID);
1324 if (stindex(MessageID, hash) >= 0) { /* skip duplicates in queue */
1326 fprintf(stderr, "Skipping duplicate ID %s\n",
1331 /* This code tries to optimize by sending a burst of "check"
1332 * commands before flushing the buffer. This should result
1333 * in several being sent in one packet reducing the network
1336 if (DoCheck && (stnofail < STNC)) lim = STNBUF;
1338 if (stnq >= lim) { /* need to empty a buffer */
1339 while (stnq >= STNBUFL) { /* or several */
1341 RequeueRestAndExit(Article, MessageID);
1345 /* save new article in the buffer */
1346 i = stalloc(Article, MessageID, art, hash);
1349 RequeueRestAndExit(Article, MessageID);
1351 if (DoCheck && (stnofail < STNC)) {
1353 RequeueRestAndExit((char *)NULL, (char *)NULL);
1358 RequeueRestAndExit((char *)NULL, (char *)NULL);
1361 /* check for need to resend any IDs */
1362 for (i = 0; i < STNBUF; i++) {
1363 if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
1364 if (stbuf[i].st_age++ > stnq) {
1365 /* This should not happen but just in case ... */
1366 if (stbuf[i].st_retry < STNRETRY) {
1367 if (check(i)) /* resend check */
1368 RequeueRestAndExit((char *)NULL, (char *)NULL);
1370 stbuf[i].st_retry++;
1371 stbuf[i].st_age = 0;
1372 } else { /* requeue to disk for later */
1373 Requeue(stbuf[i].st_fname, stbuf[i].st_id);
1374 strel(i); /* release entry */
1379 continue; /* next article */
1381 snprintf(buff, sizeof(buff), "ihave %s", MessageID);
1382 if (!REMwrite(buff, (int)strlen(buff), false)) {
1383 syswarn("cannot offer article");
1385 RequeueRestAndExit(Article, MessageID);
1389 fprintf(stderr, "> %s\n", buff);
1391 Interrupted(Article, MessageID);
1393 /* Does he want it? */
1394 if (!REMread(buff, (int)sizeof buff)) {
1395 syswarn("no reply to ihave");
1397 RequeueRestAndExit(Article, MessageID);
1400 Interrupted(Article, MessageID);
1402 fprintf(stderr, "< %s", buff);
1404 /* Parse the reply. */
1405 switch (atoi(buff)) {
1407 warn("unknown reply to %s -- %s", Article, buff);
1409 Requeue(Article, MessageID);
1411 case NNTP_BAD_COMMAND_VAL:
1412 case NNTP_SYNTAX_VAL:
1413 case NNTP_ACCESS_VAL:
1414 /* The receiving server is likely confused...no point in continuing */
1415 syslog(L_FATAL, GOT_BADCOMMAND, REMhost, MessageID, REMclean(buff));
1416 RequeueRestAndExit(Article, MessageID);
1418 case NNTP_AUTH_NEEDED_VAL:
1419 case NNTP_RESENDIT_VAL:
1420 case NNTP_GOODBYE_VAL:
1421 /* Most likely out of space -- no point in continuing. */
1422 syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
1423 RequeueRestAndExit(Article, MessageID);
1425 case NNTP_SENDIT_VAL:
1426 if (!REMsendarticle(Article, MessageID, art))
1427 RequeueRestAndExit(Article, MessageID);
1429 case NNTP_HAVEIT_VAL:
1432 #if defined(NNTP_SENDIT_LATER)
1433 case NNTP_SENDIT_LATER_VAL:
1434 Requeue(Article, MessageID);
1436 #endif /* defined(NNTP_SENDIT_LATER) */
1441 if (CanStream) { /* need to wait for rest of ACKs */
1444 RequeueRestAndExit((char *)NULL, (char *)NULL);
1449 if (BATCHfp != NULL)
1450 /* We requeued something, so close the temp file. */
1452 else if (unlink(BATCHname) < 0 && errno != ENOENT)
1453 syswarn("cannot remove %s", BATCHtemp);