chiark / gitweb /
Install the READMEs
[innduct.git] / recv.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  recv.c - receiving peer responses and disposing of articles
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*========== handling responses from peer ==========*/
30
31 const oop_rd_style peer_rd_style= {
32   OOP_RD_DELIM_STRIP, '\n',
33   OOP_RD_NUL_FORBID,
34   OOP_RD_SHORTREC_FORBID
35 };
36
37 void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
38                   const char *errmsg, int errnoval,
39                   const char *data, size_t recsz, void *conn_v) {
40   Conn *conn= conn_v;
41   connfail(conn, "error receiving from peer: %s", errmsg);
42   return OOP_CONTINUE;
43 }
44
45 static Article *article_reply_check(Conn *conn, const char *response,
46                                     int code_indicates_streaming,
47                                     int must_have_sent
48                                         /* 1:yes, -1:no, 0:dontcare */,
49                                     const char *sanitised_response) {
50   Article *art= LIST_HEAD(conn->sent);
51
52   if (!art) {
53     connfail(conn,
54              "peer gave unexpected response when no commands outstanding: %s",
55              sanitised_response);
56     return 0;
57   }
58
59   if (code_indicates_streaming) {
60     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
61     if (!conn->stream) {
62       connfail(conn, "peer gave streaming response code "
63                " to IHAVE or subsequent body: %s", sanitised_response);
64       return 0;
65     }
66     const char *got_mid= response+4;
67     int got_midlen= strcspn(got_mid, " \n\r");
68     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
69       connfail(conn, "peer gave streaming response with syntactically invalid"
70                " messageid: %s", sanitised_response);
71       return 0;
72     }
73     if (got_midlen != art->midlen ||
74         memcmp(got_mid, art->messageid, got_midlen)) {
75       connfail(conn, "peer gave streaming response code to wrong article -"
76                " probable synchronisation problem; we offered: %s;"
77                " peer said: %s",
78                art->messageid, sanitised_response);
79       return 0;
80     }
81   } else {
82     if (conn->stream) {
83       connfail(conn, "peer gave non-streaming response code to"
84                " CHECK/TAKETHIS: %s", sanitised_response);
85       return 0;
86     }
87   }
88
89   if (must_have_sent>0 && art->state < art_Wanted) {
90     connfail(conn, "peer says article accepted but"
91              " we had not sent the body: %s", sanitised_response);
92     return 0;
93   }
94   if (must_have_sent<0 && art->state >= art_Wanted) {
95     connfail(conn, "peer says please sent the article but we just did: %s",
96              sanitised_response);
97     return 0;
98   }
99
100   Article *art_again= LIST_REMHEAD(conn->sent);
101   assert(art_again == art);
102   return art;
103 }
104
105 static void update_nocheck(int accepted) {
106   accept_proportion *= nocheck_decay;
107   accept_proportion += accepted * (1.0 - nocheck_decay);
108   int new_nocheck= accept_proportion >= nocheck_thresh;
109   if (new_nocheck && !nocheck_reported) {
110     notice("entering nocheck mode for the first time");
111     nocheck_reported= 1;
112   } else if (new_nocheck != nocheck) {
113     dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
114   }
115   nocheck= new_nocheck;
116 }
117
118 void article_done(Article *art, int whichcount) {
119   if (whichcount>=0 && !art->missing)
120     art->ipf->counts.results[art->state][whichcount]++;
121
122   if (whichcount == RC_accepted)
123     update_nocheck(1);
124   else if (whichcount == RC_unwanted ||
125            (whichcount == RC_rejected && art->state == art_Unsolicited))
126     update_nocheck(0);
127
128   InputFile *ipf= art->ipf;
129
130   while (art->blanklen) {
131     static const char spaces[]=
132       "                                                                "
133       "                                                                "
134       "                                                                "
135       "                                                                "
136       "                                                                "
137       "                                                                "
138       "                                                                "
139       "                                                                "
140       "                                                                ";
141     int nspaces= sizeof(spaces)-1;
142     int w= art->blanklen;  if (w > nspaces) w= nspaces;
143     int r= pwrite(ipf->fd, spaces, w, art->offset);
144     if (r==-1) {
145       if (errno==EINTR) continue;
146       syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
147                art->messageid, art->blanklen,
148                (unsigned long)art->offset, ipf->path);
149     }
150     assert(r>=0 && r<=w);
151     art->blanklen -= w;
152     art->offset += w;
153   }
154
155   ipf->inprogress--;
156   assert(ipf->inprogress >= 0);
157   free(art);
158
159   if (!ipf->inprogress && ipf != main_input_file)
160     queue_check_input_done();
161 }
162
163 void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
164                  const char *errmsg, int errnoval,
165                  const char *data, size_t recsz, void *conn_v) {
166   Conn *conn= conn_v;
167
168   if (ev == OOP_RD_EOF) {
169     connfail(conn, "unexpected EOF from peer");
170     return OOP_CONTINUE;
171   }
172   assert(ev == OOP_RD_OK);
173
174   char *sani= sanitise(data,-1);
175
176   char *ep;
177   unsigned long code= strtoul(data, &ep, 10);
178   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
179     connfail(conn, "badly formatted response from peer: %s", sani);
180     return OOP_CONTINUE;
181   }
182
183   int busy= conn_busy(conn);
184
185   if (conn->quitting) {
186     if (code!=205 && code!=400) {
187       connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
188                conn->quitting, sani);
189     } else {
190       LIST_REMOVE(conns,conn);
191       info("C%d (now %d) idle connection closed (%s)",
192              conn->fd, conns.count, conn->quitting);
193       notice_conns_fewer();
194       assert(!busy);
195       conn_dispose(conn);
196     }
197     return OOP_CONTINUE;
198   }
199
200   conn->since_activity= 0;
201   Article *art;
202
203 #define GET_ARTICLE(musthavesent) do{                                         \
204     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
205     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
206   }while(0) 
207
208 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
209     code_streaming= (streaming);                                \
210     GET_ARTICLE(musthavesent);                                  \
211     article_done(art, RC_##how);                                \
212     goto dealtwith;                                             \
213   }while(0)
214
215 #define PEERBADMSG(m) do {                                      \
216     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
217   }while(0)
218
219   int code_streaming= 0;
220
221   switch (code) {
222
223   default:  PEERBADMSG("peer sent unexpected message");
224
225   case 400:
226     if (busy)
227       PEERBADMSG("peer timed us out or stopped accepting articles");
228
229     LIST_REMOVE(conns,conn);
230     info("C%d (now %d) idle connection closed by peer",
231          conns.count, conn->fd);
232     notice_conns_fewer();
233     conn_dispose(conn);
234     return OOP_CONTINUE;
235
236   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
237   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
238
239   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
240   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
241
242   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
243   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
244
245   case 238: /* CHECK says send it */
246     code_streaming= 1;
247   case 335: /* IHAVE says send it */
248     GET_ARTICLE(-1);
249     assert(art->state == art_Unchecked);
250     art->ipf->counts.results[art->state][RC_accepted]++;
251     art->state= art_Wanted;
252     LIST_ADDTAIL(conn->priority, art);
253     break;
254
255   case 431: /* CHECK or TAKETHIS says try later */
256     code_streaming= 1;
257   case 436: /* IHAVE says try later */
258     GET_ARTICLE(0);
259     article_defer(art, RC_deferred);
260     break;
261
262   }
263 dealtwith:
264
265   conn_maybe_write(conn);
266   check_assign_articles();
267   return OOP_CONTINUE;
268 }
269
270