chiark / gitweb /
socket: add POSIX mqueue support
authorLennart Poettering <lennart@poettering.net>
Tue, 17 May 2011 17:37:03 +0000 (19:37 +0200)
committerLennart Poettering <lennart@poettering.net>
Tue, 17 May 2011 17:37:03 +0000 (19:37 +0200)
13 files changed:
TODO
man/sd_is_fifo.xml
man/systemd.socket.xml
src/conf-parser.c
src/conf-parser.h
src/dbus-common.c
src/dbus-common.h
src/dbus-socket.c
src/load-fragment.c
src/sd-daemon.c
src/sd-daemon.h
src/socket.c
src/socket.h

diff --git a/TODO b/TODO
index a26bcbcb6c0a4b5191b3f22ca06af219c1cfbde8..1d6ed5a490aa41b3bb5500b7cdd1953e796f36a5 100644 (file)
--- a/TODO
+++ b/TODO
@@ -24,7 +24,16 @@ Features:
 
 * add dbus call to convert snapshot ino target
 
-* make use of TIOCVHANGUP
+* move nss-myhostname into systemd
+
+* figure out a standard place to configure timezone name, inform myllynen@redhat.com
+
+* add dbus call to convert snapshot into target
+
+* make use of TIOCVHANGUP to revoke access to tty before we spawn a getty on it
+
+* release VT before we spawn a getty on it to entirely clear scrollback buffer
+  https://bugzilla.redhat.com/show_bug.cgi?id=701704
 
 * move /selinux to /sys/fs/selinux
 
@@ -32,15 +41,10 @@ Features:
 
 * add prefix match to sysctl, tmpfiles, ...
 
-* Add ConditionPathExists= checks to binfmt automount units, to avoid
-  installing the automount point if the directory does not exist.
-
 * drop /.readahead on bigger upgrades with yum
 
 * add inode stat() check to readahead to suppress preloading changed files
 
-* POSIX mqueue support in .socket units
-
 * allow list of pathes in config_parse_condition_path()
 
 * introduce dbus calls for enabling/disabling a service
index 0e3f3d038f09ec2e7c75ac6a6967093c3db52c4d..251f45ce0f6b4273ee1ace668fbe0211a97e32c7 100644 (file)
@@ -47,6 +47,7 @@
                 <refname>sd_is_socket</refname>
                 <refname>sd_is_socket_inet</refname>
                 <refname>sd_is_socket_unix</refname>
+                <refname>sd_is_mq</refname>
                 <refpurpose>Check the type of a file descriptor</refpurpose>
         </refnamediv>
 
                                 <paramdef>size_t <parameter>length</parameter></paramdef>
                         </funcprototype>
 
+                        <funcprototype>
+                                <funcdef>int <function>sd_is_mq</function></funcdef>
+                                <paramdef>int <parameter>fd</parameter></paramdef>
+                                <paramdef>const char *<parameter>path</parameter></paramdef>
+                        </funcprototype>
+
                 </funcsynopsis>
         </refsynopsisdiv>
 
                 address, including the initial 0 byte and set
                 <parameter>path</parameter> to the initial 0 byte of
                 the socket address.</para>
+
+                <para><function>sd_is_mq()</function> may be called to
+                check whether the specified file descriptor refers to
+                a POSIX message queue. If the
+                <parameter>path</parameter> parameter is not NULL, it
+                is checked whether the message queue is bound to the
+                specified name.</para>
         </refsect1>
 
         <refsect1>
index 3ea3154db2152876213186c700c70c82e3e27c8e..22567d483e8804b8a0e67f76ef65a53eb48f930c 100644 (file)
                                 directive above.</para></listitem>
                         </varlistentry>
 
+                        <varlistentry>
+                                <term><varname>ListenMessageQueue=</varname></term>
+                                <listitem><para>Specifies a POSIX
+                                message queue name to listen on. This
+                                expects a valid message queue name
+                                (i.e. beginning with /). Behaviour
+                                otherwise is very similar to the
+                                <varname>ListenFIFO=</varname>
+                                directive above. On Linux message
+                                queue descriptors are actually file
+                                descriptors and can be inherited
+                                between processes.</para></listitem>
+                        </varlistentry>
+
                         <varlistentry>
                                 <term><varname>BindIPv6Only=</varname></term>
                                 <listitem><para>Takes a one of
                                 for details.</para></listitem>
                         </varlistentry>
 
+                        <varlistentry>
+                                <term><varname>MessageQueueMaxMessages=</varname>,
+                                <varname>MessageQueueMessageSize=</varname></term>
+                                <listitem><para>These two settings
+                                take integer values and control the
+                                mq_maxmsg resp. mq_msgsize field when
+                                creating the message queue. Note that
+                                either none or both of these variables
+                                need to be set. See
+                                <citerefentry><refentrytitle>mq_setattr</refentrytitle><manvolnum>3</manvolnum></citerefentry>
+                                for details.</para></listitem>
+                        </varlistentry>
+
                         <varlistentry>
                                 <term><varname>FreeBind=</varname></term>
                                 <listitem><para>Takes a boolean
index a086cf7a02170ab87ebbf03f320955a92c034dfd..02f740a04fb10b650c9514e02028854a4739c6a3 100644 (file)
@@ -247,6 +247,32 @@ int config_parse_int(
         return 0;
 }
 
+int config_parse_long(
+                const char *filename,
+                unsigned line,
+                const char *section,
+                const char *lvalue,
+                int ltype,
+                const char *rvalue,
+                void *data,
+                void *userdata) {
+
+        long *i = data;
+        int r;
+
+        assert(filename);
+        assert(lvalue);
+        assert(rvalue);
+        assert(data);
+
+        if ((r = safe_atoli(rvalue, i)) < 0) {
+                log_error("[%s:%u] Failed to parse numeric value: %s", filename, line, rvalue);
+                return r;
+        }
+
+        return 0;
+}
+
 int config_parse_uint64(
                 const char *filename,
                 unsigned line,
index 3432695db7406bf85963f264dd9fd19ad5111d5b..51efe0078660d2272b20db6264d00b7bbd35e8a1 100644 (file)
@@ -47,6 +47,7 @@ int config_parse(const char *filename, FILE *f, const char* const *sections, con
 /* Generic parsers */
 int config_parse_int(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_unsigned(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
+int config_parse_long(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_uint64(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_size(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_bool(const char *filename, unsigned line, const char *section, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
index b23373c5d1d28946a876c61b87ec995053a14011..fe7f84b119352772173f44d10586c5a2ff891e47 100644 (file)
@@ -568,6 +568,21 @@ int bus_property_append_ul(DBusMessageIter *i, const char *property, void *data)
         return 0;
 }
 
+int bus_property_append_long(DBusMessageIter *i, const char *property, void *data) {
+        uint64_t u;
+
+        assert(i);
+        assert(property);
+        assert(data);
+
+        u = (int64_t) *(long*) data;
+
+        if (!dbus_message_iter_append_basic(i, DBUS_TYPE_INT64, &u))
+                return -ENOMEM;
+
+        return 0;
+}
+
 const char *bus_errno_to_dbus(int error) {
 
         switch(error) {
index 729519c526803eb98ba9b4bc70367db2c7c4fd95..a88cb13b1f2869a71ec3b0b4dc94c29bd402e152 100644 (file)
@@ -125,6 +125,7 @@ int bus_property_append_uint32(DBusMessageIter *i, const char *property, void *d
 int bus_property_append_uint64(DBusMessageIter *i, const char *property, void *data);
 int bus_property_append_size(DBusMessageIter *i, const char *property, void *data);
 int bus_property_append_ul(DBusMessageIter *i, const char *property, void *data);
+int bus_property_append_long(DBusMessageIter *i, const char *property, void *data);
 
 #define bus_property_append_int bus_property_append_int32
 #define bus_property_append_pid bus_property_append_uint32
index 88727bbbc1770753f0cc27e94d05da9e97131f88..3ec78a0f4c2292759807cc615cc385c3a83cddbe 100644 (file)
@@ -111,6 +111,8 @@ DBusHandlerResult bus_socket_message_handler(Unit *u, DBusConnection *c, DBusMes
                 { "org.freedesktop.systemd1.Socket", "MaxConnections", bus_property_append_unsigned,     "u", &u->socket.max_connections },
                 { "org.freedesktop.systemd1.Socket", "NConnections",   bus_property_append_unsigned,     "u", &u->socket.n_connections   },
                 { "org.freedesktop.systemd1.Socket", "NAccepted",      bus_property_append_unsigned,     "u", &u->socket.n_accepted      },
+                { "org.freedesktop.systemd1.Socket", "MessageQueueMaxMessages", bus_property_append_long,"t", &u->socket.mq_maxmsg       },
+                { "org.freedesktop.systemd1.Socket", "MessageQueueMessageSize", bus_property_append_long,"t", &u->socket.mq_msgsize      },
                 { NULL, NULL, NULL, NULL, NULL }
         };
 
index f8be4dbaa4882ef527b6eeb31cbc0687c9b2598d..7c39d238f63f46ccc30c28a662eef1150cdbd3fb 100644 (file)
@@ -231,6 +231,17 @@ static int config_parse_listen(
 
                 path_kill_slashes(p->path);
 
+        } else if (streq(lvalue, "ListenMessageQueue")) {
+
+                p->type = SOCKET_MQUEUE;
+
+                if (!(p->path = strdup(rvalue))) {
+                        free(p);
+                        return -ENOMEM;
+                }
+
+                path_kill_slashes(p->path);
+
         } else if (streq(lvalue, "ListenNetlink")) {
                 p->type = SOCKET_SOCKET;
 
@@ -1921,6 +1932,7 @@ static int load_from_path(Unit *u, const char *path) {
                 { "ListenFIFO",             config_parse_listen,          0, &u->socket,                                      "Socket"  },
                 { "ListenNetlink",          config_parse_listen,          0, &u->socket,                                      "Socket"  },
                 { "ListenSpecial",          config_parse_listen,          0, &u->socket,                                      "Socket"  },
+                { "ListenMessageQueue",     config_parse_listen,          0, &u->socket,                                      "Socket"  },
                 { "BindIPv6Only",           config_parse_socket_bind,     0, &u->socket,                                      "Socket"  },
                 { "Backlog",                config_parse_unsigned,        0, &u->socket.backlog,                              "Socket"  },
                 { "BindToDevice",           config_parse_bindtodevice,    0, &u->socket,                                      "Socket"  },
@@ -1943,6 +1955,8 @@ static int load_from_path(Unit *u, const char *path) {
                 { "PipeSize",               config_parse_size,            0, &u->socket.pipe_size,                            "Socket"  },
                 { "FreeBind",               config_parse_bool,            0, &u->socket.free_bind,                            "Socket"  },
                 { "TCPCongestion",          config_parse_string,          0, &u->socket.tcp_congestion,                       "Socket"  },
+                { "MessageQueueMaxMessages", config_parse_long,           0, &u->socket.mq_maxmsg,                            "Socket"  },
+                { "MessageQueueMessageSize", config_parse_long,           0, &u->socket.mq_msgsize,                           "Socket"  },
                 { "Service",                config_parse_socket_service,  0, &u->socket,                                      "Socket"  },
                 EXEC_CONTEXT_CONFIG_ITEMS(u->socket.exec_context, "Socket"),
 
index 6d1eebff07c68a977abc325a3c9c3592839a24ee..b30db5d5b33ab259a4649108b049798b78424930 100644 (file)
 #include <stdarg.h>
 #include <stdio.h>
 #include <stddef.h>
+#include <limits.h>
+
+#if defined(__linux__)
+#include <mqueue.h>
+#endif
 
 #include "sd-daemon.h"
 
@@ -325,6 +330,43 @@ int sd_is_socket_unix(int fd, int type, int listening, const char *path, size_t
         return 1;
 }
 
+int sd_is_mq(int fd, const char *path) {
+#if !defined(__linux__)
+        return 0;
+#else
+        struct mq_attr attr;
+
+        if (fd < 0)
+                return -EINVAL;
+
+        if (mq_getattr(fd, &attr) < 0)
+                return -errno;
+
+        if (path) {
+                char fpath[PATH_MAX];
+                struct stat a, b;
+
+                if (path[0] != '/')
+                        return -EINVAL;
+
+                if (fstat(fd, &a) < 0)
+                        return -errno;
+
+                strncpy(stpcpy(fpath, "/dev/mqueue"), path, sizeof(fpath) - 12);
+                fpath[sizeof(fpath)-1] = 0;
+
+                if (stat(fpath, &b) < 0)
+                        return -errno;
+
+                if (a.st_dev != b.st_dev ||
+                    a.st_ino != b.st_ino)
+                        return 0;
+        }
+
+        return 1;
+#endif
+}
+
 int sd_notify(int unset_environment, const char *state) {
 #if defined(DISABLE_SYSTEMD) || !defined(__linux__) || !defined(SOCK_CLOEXEC)
         return 0;
index 4b853a15bec92fa98785fdb6638cfd2d22f67dd9..c3d9b6fb0da266db50db4555b3e47639a66722ac 100644 (file)
@@ -177,6 +177,14 @@ int sd_is_socket_inet(int fd, int family, int type, int listening, uint16_t port
 */
 int sd_is_socket_unix(int fd, int type, int listening, const char *path, size_t length) _sd_hidden_;
 
+/*
+  Helper call for identifying a passed file descriptor. Returns 1 if
+  the file descriptor is a POSIX Message Queue of the specified name,
+  0 otherwise. If path is NULL a message queue name check is not
+  done. Returns a negative errno style error code on failure.
+*/
+int sd_is_mq(int fd, const char *path) _sd_hidden_;
+
 /*
   Informs systemd about changed daemon state. This takes a number of
   newline separated environment-style variable assignments in a
index 0a18716cd752ab145f02832e4de7b17f90bdda25..364d3169217d3b18a59de46c870255ec2372235c 100644 (file)
@@ -27,6 +27,7 @@
 #include <sys/epoll.h>
 #include <signal.h>
 #include <arpa/inet.h>
+#include <mqueue.h>
 
 #include "unit.h"
 #include "socket.h"
@@ -248,8 +249,7 @@ static bool socket_needs_mount(Socket *s, const char *prefix) {
                 if (p->type == SOCKET_SOCKET) {
                         if (socket_address_needs_mount(&p->address, prefix))
                                 return true;
-                } else {
-                        assert(p->type == SOCKET_FIFO || p->type == SOCKET_SPECIAL);
+                } else if (p->type == SOCKET_FIFO || p->type == SOCKET_SPECIAL) {
                         if (path_startswith(p->path, prefix))
                                 return true;
                 }
@@ -468,6 +468,16 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         "%sMark: %i\n",
                         prefix, s->mark);
 
+        if (s->mq_maxmsg > 0)
+                fprintf(f,
+                        "%sMessageQueueMaxMessages: %li\n",
+                        prefix, s->mq_maxmsg);
+
+        if (s->mq_msgsize > 0)
+                fprintf(f,
+                        "%sMessageQueueMessageSize: %li\n",
+                        prefix, s->mq_msgsize);
+
         LIST_FOREACH(port, p, s->ports) {
 
                 if (p->type == SOCKET_SOCKET) {
@@ -484,6 +494,8 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         free(k);
                 } else if (p->type == SOCKET_SPECIAL)
                         fprintf(f, "%sListenSpecial: %s\n", prefix, p->path);
+                else if (p->type == SOCKET_MQUEUE)
+                        fprintf(f, "%sListenMessageQueue: %s\n", prefix, p->path);
                 else
                         fprintf(f, "%sListenFIFO: %s\n", prefix, p->path);
         }
@@ -790,6 +802,66 @@ fail:
         return r;
 }
 
+static int mq_address_create(
+                const char *path,
+                mode_t mq_mode,
+                long maxmsg,
+                long msgsize,
+                int *_fd) {
+
+        int fd = -1, r = 0;
+        struct stat st;
+        mode_t old_mask;
+        struct mq_attr _attr, *attr = NULL;
+
+        assert(path);
+        assert(_fd);
+
+        if (maxmsg > 0 && msgsize > 0) {
+                zero(_attr);
+                _attr.mq_flags = O_NONBLOCK;
+                _attr.mq_maxmsg = maxmsg;
+                _attr.mq_msgsize = msgsize;
+                attr = &_attr;
+        }
+
+        /* Enforce the right access mode for the mq */
+        old_mask = umask(~ mq_mode);
+
+        /* Include the original umask in our mask */
+        umask(~mq_mode | old_mask);
+
+        fd = mq_open(path, O_RDONLY|O_CLOEXEC|O_NONBLOCK|O_CREAT, mq_mode, attr);
+        umask(old_mask);
+
+        if (fd < 0 && errno != EEXIST) {
+                r = -errno;
+                goto fail;
+        }
+
+        if (fstat(fd, &st) < 0) {
+                r = -errno;
+                goto fail;
+        }
+
+        if ((st.st_mode & 0777) != (mq_mode & ~old_mask) ||
+            st.st_uid != getuid() ||
+            st.st_gid != getgid()) {
+
+                r = -EEXIST;
+                goto fail;
+        }
+
+        *_fd = fd;
+        return 0;
+
+fail:
+        if (fd >= 0)
+                close_nointr_nofail(fd);
+
+        return r;
+}
+
 static int socket_open_fds(Socket *s) {
         SocketPort *p;
         int r;
@@ -850,7 +922,15 @@ static int socket_open_fds(Socket *s) {
                                 goto rollback;
 
                         socket_apply_fifo_options(s, p->fd);
+                } else if (p->type == SOCKET_MQUEUE) {
 
+                        if ((r = mq_address_create(
+                                             p->path,
+                                             s->socket_mode,
+                                             s->mq_maxmsg,
+                                             s->mq_msgsize,
+                                             &p->fd)) < 0)
+                                goto rollback;
                 } else
                         assert_not_reached("Unknown port type");
         }
index b83c34cf6185e9f9fce5fff0bff2518ab063105d..01ea48d62f7b82460f0d944b9fdf735bd1e03c97 100644 (file)
@@ -59,6 +59,7 @@ typedef enum SocketType {
         SOCKET_SOCKET,
         SOCKET_FIFO,
         SOCKET_SPECIAL,
+        SOCKET_MQUEUE,
         _SOCKET_FIFO_MAX,
         _SOCKET_FIFO_INVALID = -1
 } SocketType;
@@ -124,6 +125,8 @@ struct Socket {
         size_t pipe_size;
         char *bind_to_device;
         char *tcp_congestion;
+        long mq_maxmsg;
+        long mq_msgsize;
 };
 
 /* Called from the service code when collecting fds */