chiark / gitweb /
Check for articles expiring, both in queue heads and reading backlog
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 2 May 2010 23:42:48 +0000 (00:42 +0100)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 2 May 2010 23:42:48 +0000 (00:42 +0100)
backends/innduct.c

index befc3af70d05d8c7096fabc6eb9c36c8be05af98..c4d42e1300e2f5101005b37b6cc9e37924a98b2c 100644 (file)
@@ -315,7 +315,7 @@ static void statemc_start_flush(const char *why); /* Normal => Flushing */
 static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
 static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */
 
-static void article_done(Conn *conn, Article *art, int whichcount);
+static void article_done(Article *art, int whichcount);
 
 static void check_assign_articles(void);
 static void queue_check_input_done(void);
@@ -1525,6 +1525,27 @@ static void conn_maybe_write(Conn *conn)  {
   }
 }
 
+static int article_check_expired(Article *art /* must be queued, not conn */) {
+  ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
+  if (artdata) { SMfreearticle(artdata); return 0; }
+
+  LIST_REMOVE(art->ipf->queue, art);
+  art->missing= 1;
+  art->ipf->counts[art_Unchecked][RC_missing]++;
+  article_done(art,-1);
+  return 1;
+}
+
+static void inputfile_queue_check_expired(InputFile *ipf) {
+  if (!ipf) return;
+
+  for (;;) {
+    Article *art= LIST_HEAD(ipf->queue);
+    int exp= article_check_expired(art);
+    if (!exp) break;
+  }
+}
+
 /*========== article transmission ==========*/
 
 static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
@@ -1622,7 +1643,7 @@ static void conn_make_some_xmits(Conn *conn) {
          XMIT_LITERAL("\r\n");
          xmit_artbody(conn, artdata);
        } else {
-         article_done(conn, art, -1);
+         article_done(art, -1);
          continue;
        }
       } else {
@@ -1743,7 +1764,7 @@ static void update_nocheck(int accepted) {
   nocheck= new_nocheck;
 }
 
-static void article_done(Conn *conn, Article *art, int whichcount) {
+static void article_done(Article *art, int whichcount) {
   if (!art->missing) art->ipf->counts[art->state][whichcount]++;
 
   if (whichcount == RC_accepted) update_nocheck(1);
@@ -1832,7 +1853,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
 #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{      \
     code_streaming= (streaming);                               \
     GET_ARTICLE(musthavesent);                                 \
-    article_done(conn, art, RC_##how);                         \
+    article_done(art, RC_##how);                               \
     goto dealtwith;                                            \
   }while(0)
 
@@ -1881,7 +1902,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
        || fflush(defer))
       sysfatal("write to defer file %s",path_defer);
-    article_done(conn, art, RC_deferred);
+    article_done(art, RC_deferred);
     break;
 
   }
@@ -2023,6 +2044,9 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
   strcpy(art->messageid, space+1);
   LIST_ADDTAIL(ipf->queue, art);
 
+  if (ipf==backlog_input_file)
+    article_check_expired(art);
+
   if (sms==sm_NORMAL && ipf==main_input_file &&
       ipf->offset >= target_max_feedfile_size)
     statemc_start_flush("feed file size");
@@ -3131,6 +3155,7 @@ static void period(void) {
 
   if (until_connect) until_connect--;
 
+  inputfile_queue_check_expired(backlog_input_file);
   poll_backlog_file();
   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
   statemc_period_poll();