chiark / gitweb /
532f0981c22b4d40271f19db0e2bda8b6b24def2
[innduct.git] / cli.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  cli.c - command and control connections
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*========== command and control (CLI) connections ==========*/
30
31 static int cli_master;
32
33 typedef struct CliConn CliConn;
34 struct CliConn {
35   void (*destroy)(CliConn*);
36   int fd;
37   oop_read *rd;
38   FILE *out;
39   union {
40     struct sockaddr sa;
41     struct sockaddr_un un;
42   } sa;
43   socklen_t salen;
44 };
45
46 static const oop_rd_style cli_rd_style= {
47   OOP_RD_DELIM_STRIP, '\n',
48   OOP_RD_NUL_FORBID,
49   OOP_RD_SHORTREC_FORBID
50 };
51
52 static void cli_destroy(CliConn *cc) {
53   cc->destroy(cc);
54 }
55
56 static void cli_checkouterr(CliConn *cc /* may destroy*/) {
57   if (ferror(cc->out) | fflush(cc->out)) {
58     info("CTRL%d write error %s", cc->fd, strerror(errno));
59     cli_destroy(cc);
60   }
61 }
62
63 static void cli_prompt(CliConn *cc /* may destroy*/) {
64   fprintf(cc->out, "%s| ", sitename);
65   cli_checkouterr(cc);
66 }
67
68 struct CliCommand {
69   const char *cmd;
70   void (*f)(CliConn *cc, const CliCommand *ccmd,
71             const char *arg, size_t argsz);
72   void *xdata;
73   int xval;
74 };
75
76 static const CliCommand cli_commands[];
77
78 #define CCMD(wh)                                                \
79   static void ccmd_##wh(CliConn *cc, const CliCommand *c,       \
80                         const char *arg, size_t argsz)
81
82 CCMD(help) {
83   fputs("commands:\n", cc->out);
84   const CliCommand *ccmd;
85   for (ccmd=cli_commands; ccmd->cmd; ccmd++)
86     fprintf(cc->out, " %s\n", ccmd->cmd);
87   fputs("NB: permissible arguments are not shown above."
88         "  Not all commands listed are safe.  See innduct(8).\n", cc->out);
89 }
90
91 CCMD(flush) {
92   int ok= trigger_flush_ok("manual request");
93   if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]);
94 }
95
96 CCMD(stop) {
97   preterminate();
98   notice("terminating (CTRL%d)",cc->fd);
99   raise_default(SIGTERM);
100   abort();
101 }
102
103 CCMD(logstats) { showstats(); }
104
105 CCMD(dump);
106 CCMD(dumphere);
107
108 /* messing with our head: */
109 CCMD(period) { period(); }
110 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
111 CCMD(setint) { *(int*)c->xdata= c->xval; }
112 CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
113
114 static const CliCommand cli_commands[]= {
115   { "h",             ccmd_help      },
116   { "flush",         ccmd_flush     },
117   { "stop",          ccmd_stop      },
118   { "logstats",      ccmd_logstats  },
119   { "dump q",        ccmd_dump, 0,0 },
120   { "dump a",        ccmd_dump, 0,1 },
121   { "show",          ccmd_dumphere  },
122
123   { "p",             ccmd_period    },
124
125 #define POKES(cmd,func)                                                 \
126   { cmd "flush",     func,           &until_flush,             1 },     \
127   { cmd "conn",      func,           &until_connect,           0 },     \
128   { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
129 POKES("next ", ccmd_setint)
130 POKES("prod ", ccmd_setint_period)
131
132   { "pretend flush", ccmd_setintarg, &simulate_flush             },
133   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
134   { 0 }
135 };
136
137 static void *cli_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
138                        const char *errmsg, int errnoval,
139                        const char *data, size_t recszu, void *cc_v) {
140   CliConn *cc= cc_v;
141
142   if (!data) {
143     info("CTRL%d closed", cc->fd);
144     cc->destroy(cc);
145     return OOP_CONTINUE;
146   }
147
148   if (recszu == 0) goto prompt;
149   assert(recszu <= INT_MAX);
150   int recsz= recszu;
151
152   const CliCommand *ccmd;
153   for (ccmd=cli_commands; ccmd->cmd; ccmd++) {
154     int l= strlen(ccmd->cmd);
155     if (recsz < l) continue;
156     if (recsz > l && data[l] != ' ') continue;
157     if (memcmp(data, ccmd->cmd, l)) continue;
158
159     int argl= (int)recsz - (l+1); 
160     ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl);
161     goto prompt;
162   }
163
164   fputs("unknown command; h for help\n", cc->out);
165
166  prompt:
167   cli_prompt(cc);
168   return OOP_CONTINUE;
169 }
170
171 static void *cli_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
172                         const char *errmsg, int errnoval,
173                         const char *data, size_t recsz, void *cc_v) {
174   CliConn *cc= cc_v;
175   
176   info("CTRL%d read error %s", cc->fd, errmsg);
177   cc->destroy(cc);
178   return OOP_CONTINUE;
179 }
180
181 static int cli_conn_startup(CliConn *cc /* may destroy*/,
182                                 const char *how) {
183   cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0);
184   if (!cc->rd) { warn("oop_rd_new_fd cli failed"); return -1; }
185
186   int er= oop_rd_read(cc->rd, &cli_rd_style, MAX_CLI_COMMAND,
187                       cli_rd_ok, cc,
188                       cli_rd_err, cc);
189   if (er) { errno= er; syswarn("oop_rd_read cli failed"); return -1; }
190
191   info("CTRL%d %s ready", cc->fd, how);
192   cli_prompt(cc);
193   return 0;
194 }
195
196 static void cli_stdio_destroy(CliConn *cc) {
197   if (cc->rd) {
198     oop_rd_cancel(cc->rd);
199     errno= oop_rd_delete_tidy(cc->rd);
200     if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)");
201   }
202   free(cc);
203 }
204
205 void cli_stdio(void) {
206   NEW_DECL(CliConn *,cc);
207   cc->destroy= cli_stdio_destroy;
208
209   cc->fd= 0;
210   cc->out= stdout;
211   int r= cli_conn_startup(cc,"stdio");
212   if (r) cc->destroy(cc);
213 }
214
215 static void cli_accepted_destroy(CliConn *cc) {
216   if (cc->rd) {
217     oop_rd_cancel(cc->rd);
218     oop_rd_delete_kill(cc->rd);
219   }
220   if (cc->out) { fclose(cc->out); cc->fd=0; }
221   close_perhaps(&cc->fd);
222   free(cc);
223 }
224
225 static void *cli_master_readable(oop_source *lp, int master,
226                                  oop_event ev, void *u) {
227   NEW_DECL(CliConn *,cc);
228   cc->destroy= cli_accepted_destroy;
229
230   cc->salen= sizeof(cc->sa);
231   cc->fd= accept(master, &cc->sa.sa, &cc->salen);
232   if (cc->fd<0) { syswarn("error accepting cli connection"); goto x; }
233
234   cc->out= fdopen(cc->fd, "w");
235   if (!cc->out) { syswarn("error fdopening accepted cli connection"); goto x; }
236
237   int r= cli_conn_startup(cc, "accepted");
238   if (r) goto x;
239
240   return OOP_CONTINUE;
241
242  x:
243   cc->destroy(cc);
244   return OOP_CONTINUE;
245 }
246
247 #define NOCLI(...) do{                                          \
248     syswarn("no cli listener, because failed to " __VA_ARGS__); \
249     goto nocli;                                                 \
250   }while(0)
251
252 void cli_init(void) {
253   union {
254     struct sockaddr sa;
255     struct sockaddr_un un;
256   } sa;
257
258   memset(&sa,0,sizeof(sa));
259   int maxlen= sizeof(sa.un.sun_path);
260
261   if (!path_cli) {
262     info("control command line disabled");
263     return;
264   }
265
266   int pathlen= strlen(path_cli);
267   if (pathlen > maxlen) {
268     warn("no cli listener, because cli socket path %s too long (%d>%d)",
269          path_cli, pathlen, maxlen);
270     return;
271   }
272
273   if (path_cli_dir) {
274     int r= mkdir(path_cli_dir, 0700);
275     if (r && errno!=EEXIST)
276       NOCLI("create cli socket directory %s", path_cli_dir);
277   }
278
279   int r= unlink(path_cli);
280   if (r && errno!=ENOENT)
281     NOCLI("remove old cli socket %s", path_cli);
282
283   cli_master= socket(PF_UNIX, SOCK_STREAM, 0);
284   if (cli_master<0) NOCLI("create new cli master socket");
285
286   int sl= pathlen + offsetof(struct sockaddr_un, sun_path);
287   sa.un.sun_family= AF_UNIX;
288   memcpy(sa.un.sun_path, path_cli, pathlen);
289
290   r= bind(cli_master, &sa.sa, sl);
291   if (r) NOCLI("bind to cli socket path %s", sa.un.sun_path);
292
293   r= listen(cli_master, 5);
294   if (r) NOCLI("listen to cli master socket");
295
296   xsetnonblock(cli_master, 1);
297
298   loop->on_fd(loop, cli_master, OOP_READ, cli_master_readable, 0);
299   info("cli ready, listening on %s", path_cli);
300
301   return;
302
303  nocli:
304   xclose_perhaps(&cli_master, "cli master",0);
305   return;
306 }
307
308 /*========== dumping state ==========*/
309
310 static void dump_article_list(FILE *f, const CliCommand *c,
311                               const ArticleList *al) {
312   fprintf(f, " count=%d\n", al->count);
313   if (!c->xval) return;
314   
315   int i; Article *art;
316   for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
317     fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
318     DUMPV("%p", art->,ipf);
319     DUMPV("%d", art->,missing);
320     DUMPV("%lu", (unsigned long)art->,offset);
321     DUMPV("%d", art->,blanklen);
322     DUMPV("%d", art->,midlen);
323     fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
324   }
325 }
326
327 static void dump_counts_events(FILE *f, const Counts *counts) {
328   DUMPV("%d", counts->,events[read_ok]);
329   DUMPV("%d", counts->,events[read_blank]);
330   DUMPV("%d", counts->,events[read_err]);
331   DUMPV("%d", counts->,events[nooffer_missing]);
332 }
333
334 static void dump_counts_results(FILE *f, const Counts *counts,
335                                 const char *wh1, const char *wh2) {
336   ArtState state; const char *const *statename;
337   for (state=0, statename=artstate_names; *statename; state++,statename++) {
338 #define RC_DUMP_FMT(x) " " #x "=%d"
339 #define RC_DUMP_VAL(x) ,counts->results[state][RC_##x]
340     fprintf(f,"%s%s counts %-11s"
341             RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
342             wh1,wh2, *statename
343             RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
344   }
345 }
346
347 static void dump_input_file(FILE *f, const CliCommand *c,
348                             InputFile *ipf, const char *wh) {
349   char *dipf= dbg_report_ipf(ipf);
350   fprintf(f,"input %s %s", wh, dipf);
351   free(dipf);
352   if (ipf) dump_counts_events(f, &ipf->counts);
353   fprintf(f,"\n");
354   if (ipf) {
355     dump_counts_results(f, &ipf->counts, "input ",wh);
356     fprintf(f,"input %s queue", wh);
357     dump_article_list(f,c,&ipf->queue);
358   }
359 }
360
361 static void dumpinfo(const CliCommand *c, FILE *f) {
362   int i;
363   fprintf(f,"general");
364   DUMPV("%s", sms_names,[sms]);
365   DUMPV("%d", ,until_flush);
366   DUMPV("%ld", (long),self_pid);
367   DUMPV("%p", , defer);
368   DUMPV("%d", , until_connect);
369   DUMPV("%d", , until_backlog_nextscan);
370   DUMPV("%d", , simulate_flush);
371   fprintf(f,"\nnocheck");
372   DUMPV("%#.10f", , accept_proportion);
373   DUMPV("%d", , nocheck);
374   DUMPV("%d", , nocheck_reported);
375   fprintf(f,"\n");
376
377   fprintf(f,"special");
378   DUMPV("%ld", (long),connecting_child);
379   DUMPV("%d", , connecting_fdpass_sock);
380   DUMPV("%d", , cli_master);
381   fprintf(f,"\n");
382
383   fprintf(f,"lowvol");
384   DUMPV("%d", , lowvol_circptr);
385   DUMPV("%d", , lowvol_total);
386   fprintf(f,":");
387   for (i=0; i<lowvol_periods; i++) {
388     fprintf(f," ");
389     if (i==lowvol_circptr) fprintf(f,"*");
390     fprintf(f,"%d",lowvol_perperiod[i]);
391   }
392   fprintf(f,"\n");
393
394   fprintf(f,"filemon ");
395   filemon_method_dump_info(f);
396
397   dump_input_file(f,c, main_input_file,     "main"    );
398   dump_input_file(f,c, flushing_input_file, "flushing");
399   dump_input_file(f,c, backlog_input_file,  "backlog" );
400   if (backlog_counts_report) {
401     fprintf(f,"completed backlogs");
402     dump_counts_events(f, &backlog_counts);
403     fprintf(f,"\n");
404     dump_counts_results(f, &backlog_counts, "completed backlogs","");
405   }
406
407   fprintf(f,"conns count=%d\n", conns.count);
408
409   Conn *conn;
410   FOR_CONN(conn) {
411
412     fprintf(f,"C%d",conn->fd);
413     DUMPV("%p",conn->,rd);             DUMPV("%d",conn->,max_queue);
414     DUMPV("%d",conn->,stream);         DUMPV("\"%s\"",conn->,quitting);
415     DUMPV("%d",conn->,since_activity);
416     fprintf(f,"\n");
417
418     fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
419     fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
420     fprintf(f,"C%d sent",    conn->fd); dump_article_list(f,c,&conn->sent);
421
422     fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
423     for (i=0; i<conn->xmitu; i++) {
424       const struct iovec *iv= &conn->xmit[i];
425       const XmitDetails *xd= &conn->xmitd[i];
426       char *dinfo;
427       switch (xd->kind) {
428       case xk_Const:    dinfo= masprintf("Const");                 break;
429       case xk_Artdata:  dinfo= masprintf("A%p", xd->info.sm_art);  break;
430       default:
431         abort();
432       }
433       fprintf(f," #%03d %-11s l=%zd %s\n", i, dinfo, iv->iov_len,
434               sanitise(iv->iov_base, iv->iov_len));
435       free(dinfo);
436     }
437   }
438
439   fprintf(f,"paths");
440   DUMPV("%s", , feedfile);
441   DUMPV("%s", , path_cli);
442   DUMPV("%s", , path_lock);
443   DUMPV("%s", , path_flushing);
444   DUMPV("%s", , path_defer);
445   DUMPV("%s", , path_dump);
446   DUMPV("%s", , globpat_backlog);
447   fprintf(f,"\n");
448 }
449
450 CCMD(dump) {
451   fprintf(cc->out, "dumping state to %s\n", path_dump);
452   FILE *f= fopen(path_dump, "w");
453   if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
454   dumpinfo(c,f);
455   if (!!ferror(f) + !!fclose(f)) {
456     fprintf(cc->out, "failed: write: %s\n", strerror(errno));
457     return;
458   }
459 }
460
461 CCMD(dumphere) {
462   dumpinfo(c,cc->out);
463   fprintf(cc->out, ".\n");
464 }