3 * tailing reliable realtime streaming feeder for inn
4 * recv.c - receiving peer responses and disposing of articles
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 /*========== handling responses from peer ==========*/
31 const oop_rd_style peer_rd_style= {
32 OOP_RD_DELIM_STRIP, '\n',
34 OOP_RD_SHORTREC_FORBID
37 void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
38 const char *errmsg, int errnoval,
39 const char *data, size_t recsz, void *conn_v) {
41 connfail(conn, "error receiving from peer: %s", errmsg);
45 static Article *article_reply_check(Conn *conn, const char *response,
46 int code_indicates_streaming,
48 /* 1:yes, -1:no, 0:dontcare */,
49 const char *sanitised_response) {
50 Article *art= LIST_HEAD(conn->sent);
54 "peer gave unexpected response when no commands outstanding: %s",
59 if (code_indicates_streaming) {
60 assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
62 connfail(conn, "peer gave streaming response code "
63 " to IHAVE or subsequent body: %s", sanitised_response);
66 const char *got_mid= response+4;
67 int got_midlen= strcspn(got_mid, " \n\r");
68 if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
69 connfail(conn, "peer gave streaming response with syntactically invalid"
70 " messageid: %s", sanitised_response);
73 if (got_midlen != art->midlen ||
74 memcmp(got_mid, art->messageid, got_midlen)) {
75 connfail(conn, "peer gave streaming response code to wrong article -"
76 " probable synchronisation problem; we offered: %s;"
78 art->messageid, sanitised_response);
83 connfail(conn, "peer gave non-streaming response code to"
84 " CHECK/TAKETHIS: %s", sanitised_response);
89 if (must_have_sent>0 && art->state < art_Wanted) {
90 connfail(conn, "peer says article accepted but"
91 " we had not sent the body: %s", sanitised_response);
94 if (must_have_sent<0 && art->state >= art_Wanted) {
95 connfail(conn, "peer says please sent the article but we just did: %s",
100 Article *art_again= LIST_REMHEAD(conn->sent);
101 assert(art_again == art);
105 static void update_nocheck(int accepted) {
106 accept_proportion *= nocheck_decay;
107 accept_proportion += accepted * (1.0 - nocheck_decay);
108 int new_nocheck= accept_proportion >= nocheck_thresh;
109 if (new_nocheck && !nocheck_reported) {
110 notice("entering nocheck mode for the first time");
112 } else if (new_nocheck != nocheck) {
113 dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
115 nocheck= new_nocheck;
118 void article_done(Article *art, int whichcount) {
119 if (whichcount>=0 && !art->missing)
120 art->ipf->counts.results[art->state][whichcount]++;
122 if (whichcount == RC_accepted)
124 else if (whichcount == RC_unwanted ||
125 (whichcount == RC_rejected && art->state == art_Unsolicited))
128 InputFile *ipf= art->ipf;
130 while (art->blanklen) {
131 static const char spaces[]=
141 int nspaces= sizeof(spaces)-1;
142 int w= art->blanklen; if (w > nspaces) w= nspaces;
143 int r= pwrite(ipf->fd, spaces, w, art->offset);
145 if (errno==EINTR) continue;
146 syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
147 art->messageid, art->blanklen,
148 (unsigned long)art->offset, ipf->path);
150 assert(r>=0 && r<=w);
156 assert(ipf->inprogress >= 0);
159 if (!ipf->inprogress && ipf != main_input_file)
160 queue_check_input_done();
163 void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
164 const char *errmsg, int errnoval,
165 const char *data, size_t recsz, void *conn_v) {
168 if (ev == OOP_RD_EOF) {
169 connfail(conn, "unexpected EOF from peer");
172 assert(ev == OOP_RD_OK);
174 char *sani= sanitise(data,-1);
177 unsigned long code= strtoul(data, &ep, 10);
178 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
179 connfail(conn, "badly formatted response from peer: %s", sani);
183 int busy= conn_busy(conn);
185 if (conn->quitting) {
186 if (code!=205 && code!=400) {
187 connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
188 conn->quitting, sani);
190 LIST_REMOVE(conns,conn);
191 info("C%d (now %d) idle connection closed (%s)",
192 conn->fd, conns.count, conn->quitting);
193 notice_conns_fewer();
200 conn->since_activity= 0;
203 #define GET_ARTICLE(musthavesent) do{ \
204 art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
205 if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
208 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
209 code_streaming= (streaming); \
210 GET_ARTICLE(musthavesent); \
211 article_done(art, RC_##how); \
215 #define PEERBADMSG(m) do { \
216 connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
219 int code_streaming= 0;
223 default: PEERBADMSG("peer sent unexpected message");
227 PEERBADMSG("peer timed us out or stopped accepting articles");
229 LIST_REMOVE(conns,conn);
230 info("C%d (now %d) idle connection closed by peer",
231 conns.count, conn->fd);
232 notice_conns_fewer();
236 case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
237 case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
239 case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
240 case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
242 case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
243 case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
245 case 238: /* CHECK says send it */
247 case 335: /* IHAVE says send it */
249 assert(art->state == art_Unchecked);
250 art->ipf->counts.results[art->state][RC_accepted]++;
251 art->state= art_Wanted;
252 LIST_ADDTAIL(conn->priority, art);
255 case 431: /* CHECK or TAKETHIS says try later */
257 case 436: /* IHAVE says try later */
259 article_defer(art, RC_deferred);
265 conn_maybe_write(conn);
266 check_assign_articles();