chiark / gitweb /
changelog: Finalise 2.2
[innduct.git] / recv.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  recv.c - receiving peer responses and disposing of articles
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13 /*========== handling responses from peer ==========*/
14
15 const oop_rd_style peer_rd_style= {
16   OOP_RD_DELIM_STRIP, '\n',
17   OOP_RD_NUL_FORBID,
18   OOP_RD_SHORTREC_FORBID
19 };
20
21 void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
22                   const char *errmsg, int errnoval,
23                   const char *data, size_t recsz, void *conn_v) {
24   Conn *conn= conn_v;
25   connfail(conn, "error receiving from peer: %s", errmsg);
26   return OOP_CONTINUE;
27 }
28
29 static Article *article_reply_check(Conn *conn, const char *response,
30                                     int code_indicates_streaming,
31                                     int must_have_sent
32                                         /* 1:yes, -1:no, 0:dontcare */,
33                                     const char *sanitised_response) {
34   Article *art= LIST_HEAD(conn->sent);
35
36   if (!art) {
37     connfail(conn,
38              "peer gave unexpected response when no commands outstanding: %s",
39              sanitised_response);
40     return 0;
41   }
42
43   if (code_indicates_streaming) {
44     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
45     if (!conn->stream) {
46       connfail(conn, "peer gave streaming response code "
47                " to IHAVE or subsequent body: %s", sanitised_response);
48       return 0;
49     }
50     const char *got_mid= response+4;
51     int got_midlen= strcspn(got_mid, " \n\r");
52     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
53       connfail(conn, "peer gave streaming response with syntactically invalid"
54                " messageid: %s", sanitised_response);
55       return 0;
56     }
57     if (got_midlen != art->midlen ||
58         memcmp(got_mid, art->messageid, got_midlen)) {
59       connfail(conn, "peer gave streaming response code to wrong article -"
60                " probable synchronisation problem; we offered: %s;"
61                " peer said: %s",
62                art->messageid, sanitised_response);
63       return 0;
64     }
65   } else {
66     if (conn->stream) {
67       connfail(conn, "peer gave non-streaming response code to"
68                " CHECK/TAKETHIS: %s", sanitised_response);
69       return 0;
70     }
71   }
72
73   if (must_have_sent>0 && art->state < art_Wanted) {
74     connfail(conn, "peer says article accepted but"
75              " we had not sent the body: %s", sanitised_response);
76     return 0;
77   }
78   if (must_have_sent<0 && art->state >= art_Wanted) {
79     connfail(conn, "peer says please sent the article but we just did: %s",
80              sanitised_response);
81     return 0;
82   }
83
84   Article *art_again= LIST_REMHEAD(conn->sent);
85   assert(art_again == art);
86   return art;
87 }
88
89 static void update_nocheck(int accepted) {
90   accept_proportion *= nocheck_decay;
91   accept_proportion += accepted * (1.0 - nocheck_decay);
92   int new_nocheck= accept_proportion >= nocheck_thresh;
93   if (new_nocheck && !nocheck_reported) {
94     notice("entering nocheck mode for the first time");
95     nocheck_reported= 1;
96   } else if (new_nocheck != nocheck) {
97     dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
98   }
99   nocheck= new_nocheck;
100 }
101
102 void article_done(Article *art, int whichcount) {
103   if (whichcount>=0 && !art->missing)
104     art->ipf->counts.results[art->state][whichcount]++;
105
106   if (whichcount == RC_accepted)
107     update_nocheck(1);
108   else if (whichcount == RC_unwanted ||
109            (whichcount == RC_rejected && art->state == art_Unsolicited))
110     update_nocheck(0);
111
112   InputFile *ipf= art->ipf;
113
114   while (art->blanklen) {
115     static const char spaces[]=
116       "                                                                "
117       "                                                                "
118       "                                                                "
119       "                                                                "
120       "                                                                "
121       "                                                                "
122       "                                                                "
123       "                                                                "
124       "                                                                ";
125     int nspaces= sizeof(spaces)-1;
126     int w= art->blanklen;  if (w > nspaces) w= nspaces;
127     int r= pwrite(ipf->fd, spaces, w, art->offset);
128     if (r==-1) {
129       if (errno==EINTR) continue;
130       syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
131                art->messageid, art->blanklen,
132                (unsigned long)art->offset, ipf->path);
133     }
134     assert(r>=0 && r<=w);
135     art->blanklen -= w;
136     art->offset += w;
137   }
138
139   ipf->inprogress--;
140   assert(ipf->inprogress >= 0);
141   free(art);
142
143   if (!ipf->inprogress && ipf != main_input_file)
144     queue_check_input_done();
145 }
146
147 void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
148                  const char *errmsg, int errnoval,
149                  const char *data, size_t recsz, void *conn_v) {
150   Conn *conn= conn_v;
151
152   if (ev == OOP_RD_EOF) {
153     connfail(conn, "unexpected EOF from peer");
154     return OOP_CONTINUE;
155   }
156   assert(ev == OOP_RD_OK);
157
158   char *sani= sanitise(data,-1);
159
160   char *ep;
161   unsigned long code= strtoul(data, &ep, 10);
162   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
163     connfail(conn, "badly formatted response from peer: %s", sani);
164     return OOP_CONTINUE;
165   }
166
167   int busy= conn_busy(conn);
168
169   if (conn->quitting) {
170     if (code!=205 && code!=400) {
171       connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
172                conn->quitting, sani);
173     } else {
174       LIST_REMOVE(conns,conn);
175       info("C%d (now %d) idle connection closed (%s)",
176              conn->fd, conns.count, conn->quitting);
177       notice_conns_fewer();
178       assert(!busy);
179       conn_dispose(conn);
180     }
181     return OOP_CONTINUE;
182   }
183
184   conn->since_activity= 0;
185   Article *art;
186
187 #define GET_ARTICLE(musthavesent) do{                                         \
188     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
189     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
190   }while(0) 
191
192 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
193     code_streaming= (streaming);                                \
194     GET_ARTICLE(musthavesent);                                  \
195     article_done(art, RC_##how);                                \
196     goto dealtwith;                                             \
197   }while(0)
198
199 #define PEERBADMSG(m) do {                                      \
200     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
201   }while(0)
202
203   int code_streaming= 0;
204
205   switch (code) {
206
207   default:  PEERBADMSG("peer sent unexpected message");
208
209   case 400:
210     if (busy)
211       PEERBADMSG("peer timed us out or stopped accepting articles");
212
213     LIST_REMOVE(conns,conn);
214     info("C%d (now %d) idle connection closed by peer",
215          conns.count, conn->fd);
216     notice_conns_fewer();
217     conn_dispose(conn);
218     return OOP_CONTINUE;
219
220   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
221   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
222
223   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
224   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
225
226   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
227   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
228
229   case 238: /* CHECK says send it */
230     code_streaming= 1;
231   case 335: /* IHAVE says send it */
232     GET_ARTICLE(-1);
233     assert(art->state == art_Unchecked);
234     art->ipf->counts.results[art->state][RC_accepted]++;
235     art->state= art_Wanted;
236     LIST_ADDTAIL(conn->priority, art);
237     break;
238
239   case 431: /* CHECK or TAKETHIS says try later */
240     code_streaming= 1;
241   case 436: /* IHAVE says try later */
242     GET_ARTICLE(0);
243     article_defer(art, RC_deferred);
244     break;
245
246   }
247 dealtwith:
248
249   conn_maybe_write(conn);
250   check_assign_articles();
251   return OOP_CONTINUE;
252 }
253
254