chiark / gitweb /
Follow new transport configuration interface. Add parameters for flow
[jog] / txport.c
1 /* -*-c-*-
2  *
3  * $Id: txport.c,v 1.1 2002/01/25 19:34:45 mdw Exp $
4  *
5  * Transport switch glue
6  *
7  * (c) 2001 Mark Wooding
8  */
9
10 /*----- Licensing notice --------------------------------------------------* 
11  *
12  * This file is part of Jog: Programming for a jogging machine.
13  *
14  * Jog is free software; you can redistribute it and/or modify
15  * it under the terms of the GNU General Public License as published by
16  * the Free Software Foundation; either version 2 of the License, or
17  * (at your option) any later version.
18  * 
19  * Jog is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  * GNU General Public License for more details.
23  * 
24  * You should have received a copy of the GNU General Public License
25  * along with Jog; if not, write to the Free Software Foundation,
26  * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27  */
28
29 /*----- Revision history --------------------------------------------------* 
30  *
31  * $Log: txport.c,v $
32  * Revision 1.1  2002/01/25 19:34:45  mdw
33  * Initial revision
34  *
35  */
36
37 /*----- Header files ------------------------------------------------------*/
38
39 #ifdef HAVE_CONFIG_H
40 #  include "config.h"
41 #endif
42
43 #include <errno.h>
44 #include <stdio.h>
45 #include <stdarg.h>
46
47 #include <sys/types.h>
48 #include <sys/time.h>
49 #include <unistd.h>
50 #include <pthread.h>
51
52 #include <mLib/darray.h>
53 #include <mLib/dstr.h>
54 #include <mLib/lbuf.h>
55 #include <mLib/sub.h>
56 #include <mLib/trace.h>
57 #include <mLib/tv.h>
58
59 #include "err.h"
60 #include "txport.h"
61
62 /*----- Global variables --------------------------------------------------*/
63
64 #define TX_LIST 0
65 #include "tx-socket.h"
66 #include "tx-serial-unix.h"
67 txport_ops *txlist = TX_LIST;
68
69 const char *txname = 0;
70 const char *txfile = 0;
71 const char *txconf = 0;
72
73 /*----- Main code ---------------------------------------------------------*/
74
75 /* --- @newline@ --- *
76  *
77  * Arguments:   @char *s@ = pointer to line
78  *              @size_t len@ = length of line
79  *              @void *txv@ = pointer to transport context
80  *
81  * Returns:     ---
82  *
83  * Use:         Adds a line to the list.
84  */
85
86 static void newline(char *s, size_t len, void *txv)
87 {
88   txport *tx = txv;
89   txline *l;
90
91   if (!s)
92     return;
93   l = CREATE(txline);
94   l->s = xmalloc(len + 1);
95   memcpy(l->s, s, len + 1);
96   l->len = len;
97   l->tx = tx;
98   l->next = 0;
99   l->prev = tx->ll_tail;
100   if (tx->ll_tail)
101     tx->ll_tail->next = l;
102   else
103     tx->ll = l;
104   tx->ll_tail = l;
105 }
106
107 /* --- @tx_create@ --- *
108  *
109  * Arguments:   @const char *name@ = name of transport to instantiate
110  *              @const char *file@ = filename for transport
111  *              @const char *config@ = config string
112  *
113  * Returns:     A pointer to the transport context, or null on error.
114  *
115  * Use:         Creates a new transport.
116  */
117
118 txport *tx_create(const char *name, const char *file, const char *config)
119 {
120   txport_ops *o;
121   txport *tx;
122   pthread_attr_t ta;
123   dstr d = DSTR_INIT;
124   size_t len;
125   int e;
126
127   /* --- Look up the transport by name --- */
128
129   if (!name) {
130     o = txlist;
131     goto found;
132   }
133   len = strlen(name);
134   for (o = txlist; o; o = o->next) {
135     if (strncmp(name, o->name, len) == 0)
136       goto found;
137   }
138   err_report(ERR_TXPORT, ERRTX_BADTX, 0, "unknown transport `%s'", name);
139   return (0);
140
141   /* --- Set up the transport block --- */
142
143 found:
144   if (!file) {
145     const struct txfile *fv;
146     for (fv = o->fv; fv->env || fv->name; fv++) {
147       if (fv->env && (file = getenv(fv->env)) == 0)
148         continue;
149       DRESET(&d);
150       if (file)
151         DPUTS(&d, file);
152       if (file && fv->name)
153         DPUTC(&d, '/');
154       if (fv->name)
155         DPUTS(&d, fv->name);
156       break;
157     }
158     file = d.buf;
159   }
160   if (!config)
161     config = o->config;
162   if ((tx = o->create(file, config)) == 0)
163     goto fail_0;
164   tx->ops = o;
165   DA_CREATE(&tx->buf);
166   tx->ll = 0;
167   tx->ll_tail = 0;
168   if ((e = pthread_mutex_init(&tx->mx, 0)) != 0) {
169     err_report(ERR_TXPORT, ERRTX_CREATE, e,
170                "mutex creation failed: %s", strerror(e));
171     goto fail_1;
172   }
173   if ((e = pthread_cond_init(&tx->cv, 0)) != 0) {
174     err_report(ERR_TXPORT, ERRTX_CREATE, e,
175                "condvar creation failed: %s", strerror(e));
176     goto fail_2;
177   }
178   if ((e = pthread_attr_init(&ta)) != 0) {
179     err_report(ERR_TXPORT, ERRTX_CREATE, e,
180                "thread attribute creation failed: %s", strerror(e));
181     goto fail_3;
182   } 
183   if ((e = pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED)) ||
184       (e = pthread_create(&tx->tid, &ta, tx->ops->fetch, tx)) != 0) {
185     err_report(ERR_TXPORT, ERRTX_CREATE, e,
186                "thread creation failed: %s", strerror(e));
187     goto fail_4;
188   }
189   pthread_attr_destroy(&ta);
190   lbuf_init(&tx->lb, newline, tx);
191   tx->lb.delim = '\r';
192   tx->s = TX_READY;
193   DDESTROY(&d);
194   return (tx);
195
196   /* --- Something went wrong --- */
197
198 fail_4:
199   pthread_attr_destroy(&ta);
200 fail_3:
201   pthread_cond_destroy(&tx->cv);
202 fail_2:
203   pthread_mutex_destroy(&tx->mx);
204 fail_1:
205   tx->ops->destroy(tx);
206 fail_0:
207   DDESTROY(&d);
208   return (0);
209 }
210
211 /* --- @tx_write@ --- *
212  *
213  * Arguments:   @txport *tx@ = pointer to transport context
214  *              @const void *p@ = pointer to buffer to write
215  *              @size_t sz@ = size of buffer
216  *
217  * Returns:     Zero if OK, or @-1@ on error.
218  *
219  * Use:         Writes some data to a transport.
220  */
221
222 int tx_write(txport *tx, const void *p, size_t sz)
223 {
224   if (tx->ops->write(tx, p, sz) < 0) {
225     err_report(ERR_TXPORT, ERRTX_WRITE, errno,
226                "error writing to transport: %s", strerror(errno));
227     return (-1);
228   }
229   return (0);
230 }
231
232 /* --- @tx_printf@ --- *
233  *
234  * Arguments:   @txport *tx@ = pointer to transport context
235  *              @const char *p@ = pointer to string to write
236  *
237  * Returns:     The number of characters printed, or @EOF@ on error.
238  *
239  * Use:         Writes a textual message to a transport.
240  */
241
242 int tx_vprintf(txport *tx, const char *p, va_list *ap)
243 {
244   dstr d = DSTR_INIT;
245   int rc;
246
247   dstr_vputf(&d, p, *ap);
248   rc = d.len;
249   rc = tx_write(tx, d.buf, d.len);
250   DDESTROY(&d);
251   return (rc);
252 }
253
254 int tx_printf(txport *tx, const char *p, ...)
255 {
256   va_list ap;
257   int rc;
258
259   va_start(ap, p);
260   rc = tx_vprintf(tx, p, &ap);
261   va_end(ap);
262   return (rc);
263 }
264
265 /* --- @tx_read@, @tx_readx@ --- *
266  *
267  * Arguments:   @txport *tx@ = pointer to transport context
268  *              @unsigned long t@ = time to wait for data (ms)
269  *              @int (*filter)(const char *s, void *p)@ = filtering function
270  *              @void *p@ = pointer argument for filter
271  *
272  * Returns:     A pointer to a line block, which must be freed using
273  *              @tx_freeline@.
274  *
275  * Use:         Fetches a line from the buffer.  Each line is passed to the
276  *              filter function in oldest-to-newest order; the filter
277  *              function returns nonzero to choose a line.  If no suitable
278  *              line is waiting in the raw buffer, the program blocks while
279  *              more data is fetched, until the time limit @t@ is exceeded,
280  *              in which case a null pointer is returned.  A null filter
281  *              function is equivalent to one which always selects its line.
282  */
283
284 txline *tx_readx(txport *tx, unsigned long t,
285                  int (*filter)(const char *s, void *p), void *p)
286 {
287   txline *l, **ll = &tx->ll;
288   int e;
289   struct timeval now, tv;
290   struct timespec ts;
291   unsigned f = 0;
292
293 #define f_lock 1u
294
295   /* --- Get the time to wait until --- */
296
297   if (t != FOREVER) {
298     gettimeofday(&now, 0);
299     tv_addl(&tv, &now, t / 1000, (t % 1000) * 1000);
300     ts.tv_sec = tv.tv_sec;
301     ts.tv_nsec = tv.tv_usec * 1000;
302   }
303
304   /* --- Check for a matching line --- */
305
306 again:
307   for (; *ll; ll = &l->next) {
308     l = *ll;
309     if (!filter || filter(l->s, p))
310       goto done;
311   }
312   l = 0;
313
314   /* --- Lock the buffer --- *
315    *
316    * The following operations require a lock on the buffer, so we obtain that
317    * here.
318    */
319
320   if (!(f & f_lock)) {
321     if ((e = pthread_mutex_lock(&tx->mx)) != 0) {
322       err_report(ERR_TXPORT, ERRTX_READ, e,
323                  "error locking mutex: %s", strerror(errno));
324       goto done;
325     }
326     f |= f_lock;
327   }
328
329   /* --- Push more stuff through the line buffer --- */
330
331 check:
332   if (DA_LEN(&tx->buf)) {
333     trace_block(1u, "incoming data", DA(&tx->buf), DA_LEN(&tx->buf));
334     lbuf_snarf(&tx->lb, DA(&tx->buf), DA_LEN(&tx->buf));
335     DA_SHRINK(&tx->buf, DA_LEN(&tx->buf));
336     goto again;
337   }
338
339   /* --- If nothing else can arrive, give up --- */
340
341   if (tx->s == TX_CLOSE) {
342     lbuf_close(&tx->lb);
343     tx->s = TX_CLOSED;
344     goto again;
345   }
346   if (!t || tx->s == TX_CLOSED)
347     goto done;
348   gettimeofday(&now, 0);
349   if (TV_CMP(&now, >=, &tv))
350     goto done;
351
352   /* --- Wait for some more data to arrive --- */
353
354   if (t == FOREVER)
355     e = pthread_cond_wait(&tx->cv, &tx->mx);
356   else
357     e = pthread_cond_timedwait(&tx->cv, &tx->mx, &ts);
358   if (e && e != ETIMEDOUT && e != EINTR) {
359     err_report(ERR_TXPORT, ERRTX_READ, e,
360                "error waiting on condvar: %s", strerror(errno));
361     goto done;
362   }
363   goto check;
364
365   /* --- Everything is finished --- */
366
367 done:
368   if (f & f_lock)
369     pthread_mutex_unlock(&tx->mx);
370   return (l);
371
372 #undef f_lock
373 }
374
375 txline *tx_read(txport *tx, unsigned long t)
376 {
377   return (tx_readx(tx, t, 0, 0));
378 }
379
380 /* --- @tx_freeline@ --- *
381  *
382  * Arguments:   @txline *l@ = pointer to line
383  *
384  * Returns:     ---
385  *
386  * Use:         Frees a line block.
387  */
388
389 void tx_freeline(txline *l)
390 {
391   txport *tx = l->tx;
392   if (l->next)
393     l->next->prev = l->prev;
394   else
395     tx->ll_tail = l->prev;
396   if (l->prev)
397     l->prev->next = l->next;
398   else
399     tx->ll = l->next;
400   xfree(l->s);
401   DESTROY(l);
402 }
403
404 /* --- @tx_destroy@ --- *
405  *
406  * Arguments:   @txport *tx@ = transport context
407  *
408  * Returns:     ---
409  *
410  * Use:         Destroys a transport.
411  */
412
413 void tx_destroy(txport *tx)
414 {
415   txline *l, *ll;
416
417   if (tx->s == TX_READY) {
418     pthread_mutex_lock(&tx->mx);
419     if (tx->s == TX_READY)
420       pthread_cancel(tx->tid);
421     pthread_mutex_unlock(&tx->mx);
422   }
423   pthread_mutex_destroy(&tx->mx);
424   pthread_cond_destroy(&tx->cv);
425   DA_DESTROY(&tx->buf);
426   lbuf_destroy(&tx->lb);
427   for (l = tx->ll; l; l = ll) {
428     ll = l->next;
429     xfree(l->s);
430     DESTROY(l);
431   }
432   tx->ops->destroy(tx);    
433 }
434
435 /*----- That's all, folks -------------------------------------------------*/