chiark / gitweb /
870e0e52ec38e83d0f06d77fdc08190e5e4b4db3
[innduct.git] / recv.c
1 /*========== handling responses from peer ==========*/
2
3 static const oop_rd_style peer_rd_style= {
4   OOP_RD_DELIM_STRIP, '\n',
5   OOP_RD_NUL_FORBID,
6   OOP_RD_SHORTREC_FORBID
7 };
8
9 static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
10                          const char *errmsg, int errnoval,
11                          const char *data, size_t recsz, void *conn_v) {
12   Conn *conn= conn_v;
13   connfail(conn, "error receiving from peer: %s", errmsg);
14   return OOP_CONTINUE;
15 }
16
17 static Article *article_reply_check(Conn *conn, const char *response,
18                                     int code_indicates_streaming,
19                                     int must_have_sent
20                                         /* 1:yes, -1:no, 0:dontcare */,
21                                     const char *sanitised_response) {
22   Article *art= LIST_HEAD(conn->sent);
23
24   if (!art) {
25     connfail(conn,
26              "peer gave unexpected response when no commands outstanding: %s",
27              sanitised_response);
28     return 0;
29   }
30
31   if (code_indicates_streaming) {
32     assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
33     if (!conn->stream) {
34       connfail(conn, "peer gave streaming response code "
35                " to IHAVE or subsequent body: %s", sanitised_response);
36       return 0;
37     }
38     const char *got_mid= response+4;
39     int got_midlen= strcspn(got_mid, " \n\r");
40     if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
41       connfail(conn, "peer gave streaming response with syntactically invalid"
42                " messageid: %s", sanitised_response);
43       return 0;
44     }
45     if (got_midlen != art->midlen ||
46         memcmp(got_mid, art->messageid, got_midlen)) {
47       connfail(conn, "peer gave streaming response code to wrong article -"
48                " probable synchronisation problem; we offered: %s;"
49                " peer said: %s",
50                art->messageid, sanitised_response);
51       return 0;
52     }
53   } else {
54     if (conn->stream) {
55       connfail(conn, "peer gave non-streaming response code to"
56                " CHECK/TAKETHIS: %s", sanitised_response);
57       return 0;
58     }
59   }
60
61   if (must_have_sent>0 && art->state < art_Wanted) {
62     connfail(conn, "peer says article accepted but"
63              " we had not sent the body: %s", sanitised_response);
64     return 0;
65   }
66   if (must_have_sent<0 && art->state >= art_Wanted) {
67     connfail(conn, "peer says please sent the article but we just did: %s",
68              sanitised_response);
69     return 0;
70   }
71
72   Article *art_again= LIST_REMHEAD(conn->sent);
73   assert(art_again == art);
74   return art;
75 }
76
77 static void update_nocheck(int accepted) {
78   accept_proportion *= nocheck_decay;
79   accept_proportion += accepted * (1.0 - nocheck_decay);
80   int new_nocheck= accept_proportion >= nocheck_thresh;
81   if (new_nocheck && !nocheck_reported) {
82     notice("entering nocheck mode for the first time");
83     nocheck_reported= 1;
84   } else if (new_nocheck != nocheck) {
85     dbg("nocheck mode %s", new_nocheck ? "start" : "stop");
86   }
87   nocheck= new_nocheck;
88 }
89
90 static void article_done(Article *art, int whichcount) {
91   if (whichcount>=0 && !art->missing)
92     art->ipf->counts[art->state][whichcount]++;
93
94   if (whichcount == RC_accepted) update_nocheck(1);
95   else if (whichcount == RC_unwanted) update_nocheck(0);
96
97   InputFile *ipf= art->ipf;
98
99   while (art->blanklen) {
100     static const char spaces[]=
101       "                                                                "
102       "                                                                "
103       "                                                                "
104       "                                                                "
105       "                                                                "
106       "                                                                "
107       "                                                                "
108       "                                                                "
109       "                                                                ";
110     int nspaces= sizeof(spaces)-1;
111     int w= art->blanklen;  if (w > nspaces) w= nspaces;
112     int r= pwrite(ipf->fd, spaces, w, art->offset);
113     if (r==-1) {
114       if (errno==EINTR) continue;
115       syscrash("failed to blank entry for %s (length %d at offset %lu) in %s",
116                art->messageid, art->blanklen,
117                (unsigned long)art->offset, ipf->path);
118     }
119     assert(r>=0 && r<=w);
120     art->blanklen -= w;
121     art->offset += w;
122   }
123
124   ipf->inprogress--;
125   assert(ipf->inprogress >= 0);
126   free(art);
127
128   if (!ipf->inprogress && ipf != main_input_file)
129     queue_check_input_done();
130 }
131
132 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
133                         const char *errmsg, int errnoval,
134                         const char *data, size_t recsz, void *conn_v) {
135   Conn *conn= conn_v;
136
137   if (ev == OOP_RD_EOF) {
138     connfail(conn, "unexpected EOF from peer");
139     return OOP_CONTINUE;
140   }
141   assert(ev == OOP_RD_OK);
142
143   char *sani= sanitise(data,-1);
144
145   char *ep;
146   unsigned long code= strtoul(data, &ep, 10);
147   if (ep != data+3 || *ep != ' ' || data[0]=='0') {
148     connfail(conn, "badly formatted response from peer: %s", sani);
149     return OOP_CONTINUE;
150   }
151
152   int busy= conn_busy(conn);
153
154   if (conn->quitting) {
155     if (code!=205 && code!=400) {
156       connfail(conn, "peer gave unexpected response to QUIT (%s): %s",
157                conn->quitting, sani);
158     } else {
159       LIST_REMOVE(conns,conn);
160       notice("C%d (now %d) idle connection closed (%s)",
161              conn->fd, conns.count, conn->quitting);
162       assert(!busy);
163       conn_dispose(conn);
164     }
165     return OOP_CONTINUE;
166   }
167
168   conn->since_activity= 0;
169   Article *art;
170
171 #define GET_ARTICLE(musthavesent) do{                                         \
172     art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
173     if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
174   }while(0) 
175
176 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{       \
177     code_streaming= (streaming);                                \
178     GET_ARTICLE(musthavesent);                                  \
179     article_done(art, RC_##how);                                \
180     goto dealtwith;                                             \
181   }while(0)
182
183 #define PEERBADMSG(m) do {                                      \
184     connfail(conn, m ": %s", sani);  return OOP_CONTINUE;       \
185   }while(0)
186
187   int code_streaming= 0;
188
189   switch (code) {
190
191   default:  PEERBADMSG("peer sent unexpected message");
192
193   case 400:
194     if (busy)
195       PEERBADMSG("peer timed us out or stopped accepting articles");
196
197     LIST_REMOVE(conns,conn);
198     notice("C%d (now %d) idle connection closed by peer",
199            conns.count, conn->fd);
200     conn_dispose(conn);
201     return OOP_CONTINUE;
202
203   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
204   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
205
206   case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
207   case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
208
209   case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
210   case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
211
212   case 238: /* CHECK says send it */
213     code_streaming= 1;
214   case 335: /* IHAVE says send it */
215     GET_ARTICLE(-1);
216     assert(art->state == art_Unchecked);
217     art->ipf->counts[art->state][RC_accepted]++;
218     art->state= art_Wanted;
219     LIST_ADDTAIL(conn->priority, art);
220     break;
221
222   case 431: /* CHECK or TAKETHIS says try later */
223     code_streaming= 1;
224   case 436: /* IHAVE says try later */
225     GET_ARTICLE(0);
226     article_defer(art, RC_deferred);
227     break;
228
229   }
230 dealtwith:
231
232   conn_maybe_write(conn);
233   check_assign_articles();
234   return OOP_CONTINUE;
235 }
236
237