首页 > Computer > ZeroMq with libevent
2013
06-04

ZeroMq with libevent

ve been working on integrating zeromq with libevent in order to implement some kind of high performance proxy server. The server is built as a mxn system, where we maintain m streamers that all respond to requests in the the same port (in this example, port 12000) and then forward requests to ndownloaders that do the real content download. The downloader then responds back to the streamer that the download has ended and that the content is ready.

The main architecture can be seen in the following diagram:

The core features of this example are:

  • First, all the streamers and downloaders are different processes. This provides some interesting advantages, like respawning dead processes when they die (even when a hard crash occurs), as well as the improved response times thanks to all streamers using the same port. All the processes are started from a fork() of the main program, and then we start independent libevent dispatchers on each one. We could also use regular threads, but then we would lose this crash resiliance…
  • Second, the message queue for download requests is a zeromq queue device (see this). This device provides a fair-queueing mechanism on the frontend, and pass the requests to the backend in a load-balancing mode that distributes requests to the downloaders. The downloaders then copy the original request identity parts in the response in order to build the return path. This gives the queue device a hint on how to route the response back to the right streamer that started the request.

The following code implements this model:

#include <assert.h>

#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <sys/time.h>

#include <event.h>

#include <errno.h>
#include <err.h>

#include <signal.h>

#include <map>
#include <list>
#include <vector>
#include <string>

#include <zmq.hpp>

////////////////////////////////////////////////////////////////


//#define STREAMERS_EP        "tcp://127.0.0.1:54325"
//#define DOWNLOADERS_EP      "tcp://127.0.0.1:54326"

#define STREAMERS_EP        "ipc://streamers"
#define DOWNLOADERS_EP      "ipc://downloaders"


static const unsigned int kStreamerPort     = 12000;
static const unsigned int kMaxStreamerConns = 1024;

static const unsigned int kMaxStreamers     = 2;
static const unsigned int kMaxDownloaders   = 2;
static const unsigned int kTimeout          = 1;

void create_streamer (const int lsd);
void create_downloader (const int lsd);
void accept_client (const int lsd, short event, void *ev);
void handle_client (const int lsd, short event, void *ev);
void handle_responses (const int lsd, short event, void *ev);
void accept_job (const int lsd, short event, void *ev_v);


/**
 * A client of the service
 */
typedef struct
{
    unsigned int   fd;
    struct event   ev;
} client_t;

/**
 * A streamer
 */
typedef struct streamer
{
    pid_t               pid;

    int                 listen_fd;
    struct event        listen_ev;

    // zeromq
    zmq::context_t   *  context;
    zmq::socket_t    *  sock;
    struct event        sock_ev;
    unsigned int        msg_count;

    // the clients
    std::map< unsigned int, client_t * >   clients;
}
streamer_t;
typedef std::list< streamer_t * >   streamers_lst_t;
static streamers_lst_t              streamers;

/**
 * A downloader
 */
typedef struct downloader
{
    pid_t                pid;

    // zeromq
    zmq::context_t     * context;
    zmq::socket_t      * sock;
    struct event         sock_ev;
}
downloader_t;
typedef std::list< downloader_t * >   downloaders_lst_t;
static            downloaders_lst_t   downloaders;




// children dead handler: it will remove the children from the lists and remove all the events...
static void child_exit(int sig, siginfo_t * info, void * context) {
    (void) fprintf(stderr, "Child %d exited due to signal %d\n", info->si_pid, info->si_signo);

     for (streamers_lst_t::iterator it = streamers.begin(); it != streamers.end(); it++)
     {
         streamer_t * str = *it;
         if ((*it)->pid == info->si_pid)
         {
             fprintf(stderr, "... it was a streamer\n");

             event_del(&str->sock_ev);
             event_del(&str->listen_ev);

             delete (*it);
             streamers.erase(it);
             break;
         }
     }

     for (downloaders_lst_t::iterator it = downloaders.begin(); it != downloaders.end(); it++)
     {
         downloader_t * down = *it;
         if (down->pid == info->si_pid)
         {
             fprintf(stderr, "... it was a downloader\n");

             event_del(&down->sock_ev);

             delete (*it);
             downloaders.erase(it);
             break;
         }
     }
}



int main(void) {
    int                   rc;
    int                   lsd, reuse = 1;
    struct sockaddr_in    sa;
    pid_t                 pid;

    bzero(&sa, sizeof(sa));

    if ((lsd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        errx(1, "socket: %s (%d)\n", strerror(errno), errno);

    if (setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
        errx(2, "setsockopt: %s (%d)\n", strerror(errno), errno);

    sa.sin_family = AF_INET;
    sa.sin_port = htons(kStreamerPort);
    sa.sin_addr.s_addr = htonl(INADDR_ANY);

    // create the signal handlers, for children deads...
    struct sigaction saction;
    memset(&saction, 0, sizeof(struct sigaction));
    saction.sa_sigaction = child_exit;
    saction.sa_flags = SA_SIGINFO | SA_NOCLDSTOP | SA_NOCLDWAIT;
    sigaction(SIGCHLD, &saction, NULL);

    if (bind(lsd, (struct sockaddr *) &sa, sizeof(sa)) < 0)
        errx(3, "bind: %s (%d)\n", strerror(errno), errno);

    if (listen(lsd, kMaxStreamerConns) < 0)
        errx(4, "listen: %s (%d)\n", strerror(errno), errno);

    fprintf(stderr, "[0] Master: listening on port %d\n",
            (int) kStreamerPort);

    fprintf(stderr, "[0] Master: creating zeromq environment\n");

    if ((pid = fork()) == 0)
    {
        uint64_t     s;
        size_t           slen = sizeof(s);

        fprintf(stderr, "[%d] Forwarder created\n", getpid());
        zmq::context_t  context(3);

        zmq::socket_t frontend(context, ZMQ_XREP);
        zmq::socket_t backend(context,  ZMQ_XREQ);

        s = 10000;
        frontend.setsockopt(ZMQ_HWM, &s, slen);
        backend.setsockopt(ZMQ_HWM, &s, slen);

        frontend.bind(STREAMERS_EP);
        backend.bind(DOWNLOADERS_EP);

        zmq::device (ZMQ_QUEUE, frontend, backend);

        //  We never get here...
        return 0;
    }
    else
    {
        sleep(1);
    }

    fprintf(stderr, "[0] Master: launching processes...\n");

    for (;;) {
        if (streamers.size() < kMaxStreamers) {
            if ((pid = fork()) == 0) {
                fprintf(stderr, "[%d] Streamer created\n", getpid());
                create_streamer(lsd);
            } else if (pid > 0) {
                struct streamer *str = new struct streamer;
                str->pid       = pid;
                str->listen_fd = lsd;
                streamers.push_back(str);
                fprintf(stderr, "[0] Master: %d streamers\n",
                        (int) streamers.size());
            } else
                errx(6, "fork: %s(%d)\n", strerror(errno), errno);
        }

        if (downloaders.size() < kMaxDownloaders) {
            if ((pid = fork()) == 0) {
                fprintf(stderr, "[%d] Downloader created\n", getpid());
                create_downloader(lsd);
            } else if (pid > 0) {
                struct downloader *down = new struct downloader;
                down->pid = pid;
                downloaders.push_back(down);
                fprintf(stderr, "[0] Master: %d downloaders\n",
                        (int) downloaders.size());
            } else
                errx(6, "fork: %s(%d)\n", strerror(errno), errno);
        }

        sleep(kTimeout);
    }

    return 0;
}


/********************************************************
 * Streamers
 *******************************************************/

void create_streamer (const int lsd)
{
    streamer_t     streamer;
    int            rc;

    streamer.listen_fd = lsd;
    streamer.pid       = getpid();
    streamer.msg_count = 0;

    assert(streamer.listen_fd > 0);

    event_init();

    // create the listener for listening for client's connections
    bzero(&streamer.listen_ev, sizeof(streamer.listen_ev));
    event_set(&streamer.listen_ev, streamer.listen_fd, EV_READ | EV_PERSIST, accept_client, &streamer);
    if (event_add(&streamer.listen_ev, NULL) < 0)
        errx(5, "event_add: %s (%d)\n", strerror(errno), errno);

    // create the zeromq stuff
    streamer.context = new zmq::context_t(1);
    streamer.sock    = new zmq::socket_t(*streamer.context, ZMQ_XREQ);
    assert(streamer.sock != NULL);

//    rc = zmq_setsockopt (streamer.sock, ZMQ_IDENTITY, "STREAMER", sizeof("STREAMER") - 1);
//    assert(rc == 0);

    // set some socket parameters
    uint64_t     s;
    size_t       slen = sizeof(s);

    s = 10000;
    streamer.sock->setsockopt(ZMQ_HWM, &s, slen);

    fprintf(stderr, "[%d] Streamer: connecting to %s\n", streamer.pid, STREAMERS_EP);
    streamer.sock->connect(STREAMERS_EP);

    // get the fd used by the zsocket
    int       fd        = (-1);
    size_t    fd_size   = sizeof(fd);
    streamer.sock->getsockopt(ZMQ_FD, &fd, &fd_size);
    assert(fd > 0);

    // ... and register the fd with libevent
    bzero(&streamer.sock_ev, sizeof(streamer.sock_ev));
    event_set(&streamer.sock_ev, fd, EV_READ | EV_PERSIST, handle_responses, &streamer);
    if (event_add(&streamer.sock_ev, NULL) < 0)
        errx(5, "event_add: %s (%d)\n", strerror(errno), errno);

    fprintf(stderr, "[%d] Streamer: registering zsocket 0x%p (fd:%d) on context 0x%p\n",
                    streamer.pid, streamer.sock, fd, streamer.context);

    event_dispatch();

    (void) fprintf(stderr, "aborted\n");
}


// handle a new user
void accept_client (const int lsd, short event, void *data) {
    struct sockaddr_in   remote;
    int                  asd      = 0;
    int                  socklen  = 0;
    streamer_t         * streamer = (streamer_t*) data;
    pid_t                pid      = getpid();

    assert(streamer->listen_fd == lsd);

    //event_add(&streamer->listen_ev, NULL);


    socklen = sizeof(remote);
    bzero(&remote, socklen);

    asd = accept(streamer->listen_fd, (struct sockaddr *) &remote, (socklen_t*) &socklen);
    if (asd > 0)
    {
        fprintf(stderr, "[%d] Streamer: received connection from: %s:%d, socket: %d\n",
                        pid, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), asd);
    }
    else
    {
        errx(6, "accept: %s (%d)\n", strerror(errno), errno);
    }

    // we have a new client: create a client struct and register the fd handlers
    fprintf(stderr, "[%d] Streamer: ... creating new client.\n", pid);

    client_t * client = new client_t;
    client->fd = asd;

    bzero(&client->ev, sizeof(client->ev));
    event_set(&client->ev, client->fd, EV_READ | EV_PERSIST, handle_client, streamer);
    if (event_add(&client->ev, NULL) < 0)
        errx(5, "event_add: %s (%d)\n", strerror(errno), errno);

    fprintf(stderr, "[%d] Streamer: ... adding client to the list.\n", pid);
    streamer->clients[asd] = client;
}


void handle_client (const int fd, short event, void *data)
{
    streamer_t * streamer = (streamer_t*) data;

    if (streamer->clients.find(fd) != streamer->clients.end())
    {
        client_t * client  = streamer->clients[fd];
        assert((int) client->fd == fd);

        fprintf(stderr, "[%d] Streamer: activity on regular socket fd:%d, events:%d\n", streamer->pid, fd, event);

        if (event & EV_READ)
        {
            const unsigned int   buf_size              = 1024;
            unsigned char        buf[buf_size];
            int                  read_len;

            read_len = recv(fd, buf, buf_size, 0);
            if (read_len == 0)
            {
                fprintf(stderr, "[%d] Streamer: ... the client has disconnected\n", streamer->pid);

                // the client has disconnected
                streamer->clients.erase((unsigned int)fd);
                close(fd);
            }
            else
            {
                fprintf(stderr, "[%d] Streamer: ... sending zeromq message to %p\n", streamer->pid, streamer->sock);

                {
                    // the client is asking for something: send a zeromq message
                    char  msg_str[1024];

                    snprintf(msg_str, 1024, "%d %d", streamer->pid, fd);

                    zmq::message_t reply(strlen(msg_str));
                    memcpy ((void *) reply.data(), msg_str, reply.size());
                    bool send_ok = streamer->sock->send(reply, ZMQ_NOBLOCK);
                    if (!send_ok)
                    {
                        fprintf(stderr, "[%d] Streamer: ERROR: when sending message.\n", streamer->pid);
                    }
                }

                streamer->msg_count++;

                fprintf(stderr, "[%d] Streamer: ...... sent!\n", streamer->pid);
            }
        }
    }
    else
    {
        fprintf(stderr, "[%d] Streamer: WARNING: fd:%d not found on clients list (size:%d)\n",
                        streamer->pid, fd, (int)streamer->clients.size());
    }
}

// handle responses from the backends
void handle_responses (const int fd, short event, void *data)
{
    streamer_t         * streamer = (streamer_t*) data;

    // detect the pending events on the zeromq world
    unsigned int     zmq_events;
    size_t           zmq_events_size  = sizeof(zmq_events);

    fprintf(stderr, "[%d] Streamer: zsocket activity on fd:%d, events:%d\n", streamer->pid, fd, event);

    streamer->sock->getsockopt(ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    if (zmq_events & ZMQ_POLLIN)
    {
        fprintf(stderr, "[%d] Streamer: ... uhu! we were waiting for a message and there seems we have some mail!\n", streamer->pid);

        std::vector< std::string >    responses;

        while (1)
        {
            int64_t more;
            size_t more_size = sizeof (more);

            {
                zmq::message_t reply;

                streamer->sock->recv(&reply, ZMQ_NOBLOCK);
                streamer->sock->getsockopt (ZMQ_RCVMORE, &more, &more_size);

                unsigned char * msg_data   = (unsigned char *) reply.data();
                unsigned int    msg_len    = (unsigned int) reply.size();

                char msg_printf[1024];
                snprintf (msg_printf, msg_len + 1, "%s", msg_data);

                responses.push_back(std::string(msg_printf));

                fprintf(stderr, "[%d] Streamer: ... received message: '%s' (%d bytes)\n", streamer->pid, msg_data, msg_len);
            }

            if (!more)
                break;      //  Last message part
        }

        // verify that we are receiving the response for the message that we sent
        unsigned int resp_pid, resp_fd;
        sscanf(responses[0].c_str(), "%d %d", &resp_pid, &resp_fd);
        if ((int)resp_pid != (int)streamer->pid)
        {
            fprintf(stderr, "[%d] Streamer: ...... WARNING: resp_pid (%d) != streamer->pid (%d)\n", streamer->pid, resp_pid, streamer->pid);
        }
        else
        {
            fprintf(stderr, "[%d] Streamer: ...... response looks GOOD!\n", streamer->pid);
        }
    }
}

/************************************************
 * Downloaders
 ***********************************************/

void create_downloader (const int lsd)
{
    downloader_t     downloader;

    downloader.pid = getpid();

    event_init();

    // create the zeromq stuff
    downloader.context = new zmq::context_t (1);
    downloader.sock    = new zmq::socket_t (*downloader.context, ZMQ_XREP);
    assert(downloader.sock != NULL);

//    rc = zmq_setsockopt (downloader.sock, ZMQ_IDENTITY, "DOWNLOADER", sizeof("DOWNLOADER") - 1);
//    assert(rc == 0);

    fprintf(stderr, "[%d] Downloader: connecting to %s\n", downloader.pid, DOWNLOADERS_EP);
    downloader.sock->connect(DOWNLOADERS_EP);

    int       fd        = (-1);
    size_t    fd_size   = sizeof(fd);
    downloader.sock->getsockopt(ZMQ_FD, &fd, &fd_size);

    // set some socket parameters
    uint64_t     s;
    size_t       slen = sizeof(s);

    s = 10000;
    downloader.sock->setsockopt(ZMQ_HWM, &s, slen);

    // register the zeromq socket
    bzero(&downloader.sock_ev, sizeof(downloader.sock_ev));

    event_set(&downloader.sock_ev, fd, EV_READ | EV_PERSIST,
                           accept_job, &downloader);
    if(event_add(&downloader.sock_ev, NULL) < 0)
        errx(5, "event_add: %s (%d)\n", strerror(errno), errno);

    fprintf(stderr, "[%d] Downloader: registering zsocket 0x%p (fd:%d) on context 0x%p\n",
                    downloader.pid, downloader.sock, fd, downloader.context);

    event_dispatch();

    (void) fprintf(stderr, "aborted\n");
}



void accept_job (const int fd, short event, void *data)
{
    downloader_t       * downloader =  (downloader_t*) data;

    // detect the pending events on the zeromq world
    unsigned int     zmq_events;
    size_t           zmq_events_size  = sizeof(zmq_events);

    fprintf(stderr, "[%d] Downloader: zsocket activity on fd:%d, events:%d\n", downloader->pid, fd, event);

    assert(downloader->sock != NULL);

    downloader->sock->getsockopt(ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    if (zmq_events & ZMQ_POLLIN)
    {
        fprintf(stderr, "[%d] Downloader: ... uhu! we were waiting for a message and there seems we have some mail!\n", downloader->pid);

        while(1)
        {
            int64_t more;
            size_t more_size = sizeof (more);

            {
                zmq::message_t request;

                downloader->sock->recv (&request, ZMQ_NOBLOCK);
                downloader->sock->getsockopt (ZMQ_RCVMORE, &more, &more_size);

                unsigned char * msg_data   = (unsigned char *) request.data();
                unsigned int    msg_len    = (unsigned int) request.size();

                char msg_printf[1024];
                snprintf (msg_printf, msg_len + 1, "%s", msg_data);

                fprintf(stderr, "[%d] Downloader: ... received message: '%s' (%d bytes)\n", downloader->pid,
                                 msg_printf, msg_len);

                {
                    // we send back this message part

                    zmq::message_t  reply;
                    reply.copy(&request);

                    fprintf(stderr, "[%d] Downloader: ...... sending back response (%d bytes)\n", downloader->pid, (int) reply.size());

                    bool send_ok = downloader->sock->send(reply, 
                              ZMQ_SNDMORE | ZMQ_NOBLOCK);
                    if (!send_ok)
                    {
                        fprintf(stderr, "[%d] Downloader: ERROR: when sending message.\n",
                                                          downloader->pid);
                    }
                }
            }

            if (!more)
                break;      //  Last message part
        }

        //sleep(10);   // we have been doing some hard work!

        {
            char            reply_str[1024];

            // send the reponse!
            snprintf(reply_str, 1024, "%d %d", downloader->pid, fd);

            zmq::message_t reply(strlen(reply_str));
            memcpy ((void *) reply.data(), reply_str, reply.size());
            bool send_ok = downloader->sock->send(reply, ZMQ_NOBLOCK);
            if (!send_ok)
            {
                fprintf(stderr, "[%d] Downloader: ERROR: when sending message.\n", downloader->pid);
            }
        }

        fprintf(stderr, "[%d] Downloader: ...... response sent!\n", 
                              downloader->pid);
    }

    fprintf(stderr, "[%d] Downloader: ... done!\n",
                           downloader->pid);
}

Compile this code with

gcc workers.c -o workers -levent -lzmq -luuid

There are some things we must take into account in this example code:

  • First, the main point in this test was to see how it scales and if messages route forth and back the right way, so the downloaders do not really download anything. I have skipped this part of the process and our downloaders just return a proper response to the streamers.
  • Second, the zeromq queue device is not integrated in the libevent dispatch loop, so I had to do an initial fork for launching the queue and let ir run its own dispatcher. A production version of this code should build a queue device from scratch and integrate it in the main loop.
最后编辑:
作者:wy182000
这个作者貌似有点懒,什么都没有留下。

留下一个回复