chiark / gitweb /
Make FOR_LIST_NODE and use it for FOR_CONN
[innduct.git] / recv.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  recv.c - receiving peer responses and disposing of articles
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*========== handling responses from peer ==========*/
30
31 const oop_rd_style peer_rd_style= {
32   OOP_RD_DELIM_STRIP, '\n',
33   OOP_RD_NUL_FORBID,
34   OOP_RD_SHORTREC_FORBID
35 };
36
37 void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
38                   const char *errmsg, int errnoval,
39                   const char *data, size_t recsz, void *conn_v) {
40   Conn *conn= conn_v;
41   connfail(conn, "error receiving from peer: %s", errmsg);
42   return OOP_CONTINUE;
43 }
44
45 static Article *article_reply_check(Conn *conn, const char *response,
46                                     int code_indicates_streaming,
47                                     int must_have_sent
48                                         /* 1:yes, -1:no, 0:dontcare */,
49                                     const char *sanitised_response) {
50   Article *art= LIST_HEAD(conn->sent);
51
52   if (!art) {
53     connfail(conn,
54              "peer gave unexpected response when no commands outstanding: %s",
55              sanitised_response);
56     return 0;
57   }
58
59   if (code_indicates_streaming) {
60     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
61     if (!conn->stream) {
62       connfail(conn, "peer gave streaming response code "
63                " to IHAVE or subsequent body: %s", sanitised_response);
64       return 0;
65     }
66     const char *got_mid= response+4;
67     int got_midlen= strcspn(got_mid, " \n\r");
68     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
69       connfail(conn, "peer gave streaming response with syntactically invalid"
70                " messageid: %s", sanitised_response);
71       return 0;
72     }
73     if (got_midlen != art->midlen ||
74         memcmp(got_mid, art->messageid, got_midlen)) {
75       connfail(conn, "peer gave streaming response code to wrong article -"
76                " probable synchronisation problem; we offered: %s;"
77                " peer said: %s",
78                art->messageid, sanitised_response);
79       return 0;
80     }
81   } else {
82     if (conn->stream) {
83       connfail(conn, "peer gave non-streaming response code to"
84                " CHECK/TAKETHIS: %s", sanitised_response);
85       return 0;
86     }
87   }
88
89   if (must_have_sent>0 && art->state < art_Wanted) {
90     connfail(conn, "peer says article accepted but"
91              " we had not sent the body: %s", sanitised_response);
92     return 0;
93   }
94   if (must_have_sent<0 && art->state >= art_Wanted) {
95     connfail(conn, "peer says please sent the article but we just did: %s",
96              sanitised_response);
97     return 0;
98   }
99
100   Article *art_again= LIST_REMHEAD(conn->sent);
101   assert(art_again == art);
102   return art;
103 }
104
105 static void update_nocheck(int accepted) {
106   accept_proportion *= nocheck_decay;
107   accept_proportion += accepted * (1.0 - nocheck_decay);
108   int new_nocheck= accept_proportion >= nocheck_thresh;
109   if (new_nocheck && !nocheck_reported) {
110     notice("entering nocheck mode for the first time");
111     nocheck_reported= 1;
112   } else if (new_nocheck != nocheck) {
113     dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
114   }
115   nocheck= new_nocheck;
116 }
117
118 void article_done(Article *art, int whichcount) {
119   if (whichcount>=0 && !art->missing)
120     art->ipf->counts.results[art->state][whichcount]++;
121
122   if (whichcount == RC_accepted) update_nocheck(1);
123   else if (whichcount == RC_unwanted) update_nocheck(0);
124
125   InputFile *ipf= art->ipf;
126
127   while (art->blanklen) {
128     static const char spaces[]=
129       "                                                                "
130       "                                                                "
131       "                                                                "
132       "                                                                "
133       "                                                                "
134       "                                                                "
135       "                                                                "
136       "                                                                "
137       "                                                                ";
138     int nspaces= sizeof(spaces)-1;
139     int w= art->blanklen;  if (w > nspaces) w= nspaces;
140     int r= pwrite(ipf->fd, spaces, w, art->offset);
141     if (r==-1) {
142       if (errno==EINTR) continue;
143       syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
144                art->messageid, art->blanklen,
145                (unsigned long)art->offset, ipf->path);
146     }
147     assert(r>=0 && r<=w);
148     art->blanklen -= w;
149     art->offset += w;
150   }
151
152   ipf->inprogress--;
153   assert(ipf->inprogress >= 0);
154   free(art);
155
156   if (!ipf->inprogress && ipf != main_input_file)
157     queue_check_input_done();
158 }
159
160 void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
161                  const char *errmsg, int errnoval,
162                  const char *data, size_t recsz, void *conn_v) {
163   Conn *conn= conn_v;
164
165   if (ev == OOP_RD_EOF) {
166     connfail(conn, "unexpected EOF from peer");
167     return OOP_CONTINUE;
168   }
169   assert(ev == OOP_RD_OK);
170
171   char *sani= sanitise(data,-1);
172
173   char *ep;
174   unsigned long code= strtoul(data, &ep, 10);
175   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
176     connfail(conn, "badly formatted response from peer: %s", sani);
177     return OOP_CONTINUE;
178   }
179
180   int busy= conn_busy(conn);
181
182   if (conn->quitting) {
183     if (code!=205 && code!=400) {
184       connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
185                conn->quitting, sani);
186     } else {
187       LIST_REMOVE(conns,conn);
188       info("C%d (now %d) idle connection closed (%s)",
189              conn->fd, conns.count, conn->quitting);
190       notice_conns_fewer();
191       assert(!busy);
192       conn_dispose(conn);
193     }
194     return OOP_CONTINUE;
195   }
196
197   conn->since_activity= 0;
198   Article *art;
199
200 #define GET_ARTICLE(musthavesent) do{                                         \
201     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
202     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
203   }while(0) 
204
205 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
206     code_streaming= (streaming);                                \
207     GET_ARTICLE(musthavesent);                                  \
208     article_done(art, RC_##how);                                \
209     goto dealtwith;                                             \
210   }while(0)
211
212 #define PEERBADMSG(m) do {                                      \
213     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
214   }while(0)
215
216   int code_streaming= 0;
217
218   switch (code) {
219
220   default:  PEERBADMSG("peer sent unexpected message");
221
222   case 400:
223     if (busy)
224       PEERBADMSG("peer timed us out or stopped accepting articles");
225
226     LIST_REMOVE(conns,conn);
227     info("C%d (now %d) idle connection closed by peer",
228          conns.count, conn->fd);
229     notice_conns_fewer();
230     conn_dispose(conn);
231     return OOP_CONTINUE;
232
233   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
234   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
235
236   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
237   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
238
239   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
240   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
241
242   case 238: /* CHECK says send it */
243     code_streaming= 1;
244   case 335: /* IHAVE says send it */
245     GET_ARTICLE(-1);
246     assert(art->state == art_Unchecked);
247     art->ipf->counts.results[art->state][RC_accepted]++;
248     art->state= art_Wanted;
249     LIST_ADDTAIL(conn->priority, art);
250     break;
251
252   case 431: /* CHECK or TAKETHIS says try later */
253     code_streaming= 1;
254   case 436: /* IHAVE says try later */
255     GET_ARTICLE(0);
256     article_defer(art, RC_deferred);
257     break;
258
259   }
260 dealtwith:
261
262   conn_maybe_write(conn);
263   check_assign_articles();
264   return OOP_CONTINUE;
265 }
266
267