chiark / gitweb /
Group counters into struct
[innduct.git] / recv.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  recv.c - receiving peer responses and disposing of articles
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*========== handling responses from peer ==========*/
30
31 const oop_rd_style peer_rd_style= {
32   OOP_RD_DELIM_STRIP, '\n',
33   OOP_RD_NUL_FORBID,
34   OOP_RD_SHORTREC_FORBID
35 };
36
37 void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
38                   const char *errmsg, int errnoval,
39                   const char *data, size_t recsz, void *conn_v) {
40   Conn *conn= conn_v;
41   connfail(conn, "error receiving from peer: %s", errmsg);
42   return OOP_CONTINUE;
43 }
44
45 static Article *article_reply_check(Conn *conn, const char *response,
46                                     int code_indicates_streaming,
47                                     int must_have_sent
48                                         /* 1:yes, -1:no, 0:dontcare */,
49                                     const char *sanitised_response) {
50   Article *art= LIST_HEAD(conn->sent);
51
52   if (!art) {
53     connfail(conn,
54              "peer gave unexpected response when no commands outstanding: %s",
55              sanitised_response);
56     return 0;
57   }
58
59   if (code_indicates_streaming) {
60     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
61     if (!conn->stream) {
62       connfail(conn, "peer gave streaming response code "
63                " to IHAVE or subsequent body: %s", sanitised_response);
64       return 0;
65     }
66     const char *got_mid= response+4;
67     int got_midlen= strcspn(got_mid, " \n\r");
68     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
69       connfail(conn, "peer gave streaming response with syntactically invalid"
70                " messageid: %s", sanitised_response);
71       return 0;
72     }
73     if (got_midlen != art->midlen ||
74         memcmp(got_mid, art->messageid, got_midlen)) {
75       connfail(conn, "peer gave streaming response code to wrong article -"
76                " probable synchronisation problem; we offered: %s;"
77                " peer said: %s",
78                art->messageid, sanitised_response);
79       return 0;
80     }
81   } else {
82     if (conn->stream) {
83       connfail(conn, "peer gave non-streaming response code to"
84                " CHECK/TAKETHIS: %s", sanitised_response);
85       return 0;
86     }
87   }
88
89   if (must_have_sent>0 && art->state < art_Wanted) {
90     connfail(conn, "peer says article accepted but"
91              " we had not sent the body: %s", sanitised_response);
92     return 0;
93   }
94   if (must_have_sent<0 && art->state >= art_Wanted) {
95     connfail(conn, "peer says please sent the article but we just did: %s",
96              sanitised_response);
97     return 0;
98   }
99
100   Article *art_again= LIST_REMHEAD(conn->sent);
101   assert(art_again == art);
102   return art;
103 }
104
105 static void update_nocheck(int accepted) {
106   accept_proportion *= nocheck_decay;
107   accept_proportion += accepted * (1.0 - nocheck_decay);
108   int new_nocheck= accept_proportion >= nocheck_thresh;
109   if (new_nocheck && !nocheck_reported) {
110     notice("entering nocheck mode for the first time");
111     nocheck_reported= 1;
112   } else if (new_nocheck != nocheck) {
113     dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
114   }
115   nocheck= new_nocheck;
116 }
117
118 void article_done(Article *art, int whichcount) {
119   if (whichcount>=0 && !art->missing)
120     art->ipf->counts.counts[art->state][whichcount]++;
121
122   if (whichcount == RC_accepted) update_nocheck(1);
123   else if (whichcount == RC_unwanted) update_nocheck(0);
124
125   InputFile *ipf= art->ipf;
126
127   while (art->blanklen) {
128     static const char spaces[]=
129       "                                                                "
130       "                                                                "
131       "                                                                "
132       "                                                                "
133       "                                                                "
134       "                                                                "
135       "                                                                "
136       "                                                                "
137       "                                                                ";
138     int nspaces= sizeof(spaces)-1;
139     int w= art->blanklen;  if (w > nspaces) w= nspaces;
140     int r= pwrite(ipf->fd, spaces, w, art->offset);
141     if (r==-1) {
142       if (errno==EINTR) continue;
143       syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
144                art->messageid, art->blanklen,
145                (unsigned long)art->offset, ipf->path);
146     }
147     assert(r>=0 && r<=w);
148     art->blanklen -= w;
149     art->offset += w;
150   }
151
152   ipf->inprogress--;
153   assert(ipf->inprogress >= 0);
154   free(art);
155
156   if (!ipf->inprogress && ipf != main_input_file)
157     queue_check_input_done();
158 }
159
160 void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
161                  const char *errmsg, int errnoval,
162                  const char *data, size_t recsz, void *conn_v) {
163   Conn *conn= conn_v;
164
165   if (ev == OOP_RD_EOF) {
166     connfail(conn, "unexpected EOF from peer");
167     return OOP_CONTINUE;
168   }
169   assert(ev == OOP_RD_OK);
170
171   char *sani= sanitise(data,-1);
172
173   char *ep;
174   unsigned long code= strtoul(data, &ep, 10);
175   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
176     connfail(conn, "badly formatted response from peer: %s", sani);
177     return OOP_CONTINUE;
178   }
179
180   int busy= conn_busy(conn);
181
182   if (conn->quitting) {
183     if (code!=205 && code!=400) {
184       connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
185                conn->quitting, sani);
186     } else {
187       LIST_REMOVE(conns,conn);
188       notice("C%d (now %d) idle connection closed (%s)",
189              conn->fd, conns.count, conn->quitting);
190       assert(!busy);
191       conn_dispose(conn);
192     }
193     return OOP_CONTINUE;
194   }
195
196   conn->since_activity= 0;
197   Article *art;
198
199 #define GET_ARTICLE(musthavesent) do{                                         \
200     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
201     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
202   }while(0) 
203
204 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
205     code_streaming= (streaming);                                \
206     GET_ARTICLE(musthavesent);                                  \
207     article_done(art, RC_##how);                                \
208     goto dealtwith;                                             \
209   }while(0)
210
211 #define PEERBADMSG(m) do {                                      \
212     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
213   }while(0)
214
215   int code_streaming= 0;
216
217   switch (code) {
218
219   default:  PEERBADMSG("peer sent unexpected message");
220
221   case 400:
222     if (busy)
223       PEERBADMSG("peer timed us out or stopped accepting articles");
224
225     LIST_REMOVE(conns,conn);
226     notice("C%d (now %d) idle connection closed by peer",
227            conns.count, conn->fd);
228     conn_dispose(conn);
229     return OOP_CONTINUE;
230
231   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
232   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
233
234   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
235   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
236
237   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
238   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
239
240   case 238: /* CHECK says send it */
241     code_streaming= 1;
242   case 335: /* IHAVE says send it */
243     GET_ARTICLE(-1);
244     assert(art->state == art_Unchecked);
245     art->ipf->counts.counts[art->state][RC_accepted]++;
246     art->state= art_Wanted;
247     LIST_ADDTAIL(conn->priority, art);
248     break;
249
250   case 431: /* CHECK or TAKETHIS says try later */
251     code_streaming= 1;
252   case 436: /* IHAVE says try later */
253     GET_ARTICLE(0);
254     article_defer(art, RC_deferred);
255     break;
256
257   }
258 dealtwith:
259
260   conn_maybe_write(conn);
261   check_assign_articles();
262   return OOP_CONTINUE;
263 }
264
265