阅读anjuke/zguide-cn后作的一些简单整理。

ZMQ连接协议
ZMQ提供多种传输协议用于不同场景下的通信,主要有三个单播传输协议和两个广播协议。

  • 单播传输
    • inproc,用于进程内不同线程之间的消息传输
    • ipc,适用于进程间的通信
    • tcp
  • 广播协议
    • pgm
    • epgm

同时服务端也可以使用一个套接字,就可以绑定多个协议,也就是说可以使用不同协议建立连接:

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");

套接字操作

  • 创建销毁套接字
zmq_socket();
zmq_close();
  • 配置套接字
zmq_setsockopt();
zmq_getsockopt();
  • 建立连接
# 该方式创建的套接字,对应着服务端
zmq_bind(); 
# 该方式创建的套接字,对应着客户端
zmq_connect(); 

当客户端使用zmq_connect()进行连接时,并不要求服务端已经进行了端口绑定.

  • 发送和接受消息
	zmq_send();
	zmq_recv();

TCP套接字和ZMQ套接字的区别

  • ZMQ套接字传输的是指定长度的二进制数据块,而TCP是字节。
  • ZMQ发送消息,先传输到本地的缓冲队列
  • ZMQ套接字支持和多个套接字连接,而TCP只能点对点

套接字类型
套接字类型决定了套接字的行为,即在发送或接受消息时的规则。

  • ZMQ_REP
  • ZMQ_PUB
  • ZMQ_SUB
  • ZMQ_PUSH
  • ZMQ_PULL

消息模式

  • 请求应答模式,将一组服务端和一组客户端连接,
  • 发布订阅,将一组发布者和一组订阅者相连接
  • 管道模式,使用扇入或者扇出组装多个节点
  • 排他对接模式,将两个套接字一对一连接起来

常用的套接字连接对:

  • PUB - SUB
  • REQ - REP
  • REQ - ROUTER
  • DEALER - REP
  • DEALER - ROUTER
  • DEALER - DEALER
  • ROUTER - ROUTER
  • PUSH - PULL
  • PAIR - PAIR

Demo

PUB - SUB(发布订阅)

下面的天气信息发布订阅服务,Server绑定PUB套接字在5556端口,Client端创建SUB套接字并连接至服务端的5556端口。
PUB-SUB
服务端代码:

#include "zhelpers.h"
 
int main (void)
{
    //  准备上下文和PUB套接字
    void *context = zmq_init (1);
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5556");
    zmq_bind (publisher, "ipc://weather.ipc");
 
    //  初始化随机数生成器
    srandom ((unsigned) time (NULL));
    while (1) {
        //  生成数据
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;
 
        //  向所有订阅者发送消息
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_term (context);
    return 0;
}

客户端代码:

#include "zhelpers.h"
 
int main (int argc, char *argv [])
{
    void *context = zmq_init (1);
 
    //  创建连接至服务端的套接字
    printf ("正在收集气象信息...\n");
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
 
    //  设置订阅信息,默认为纽约,邮编10001
    char *filter = (argc > 1)? argv [1]: "10001 ";
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
 
    //  处理100条更新信息
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        char *string = s_recv (subscriber);
        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("地区邮编 '%s' 的平均温度为 %dF\n",
        filter, (int) (total_temp / update_nbr));
 
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

PUSH - PULL

下面的服务架构中,任务分发器(ventilator)绑定PUSH套接字在5557端口,在通知任务收集器(sink)后,产生100个任务分发给Worker;Worker从任务分发器PULL任务,任务结束后将结果PUSH给
任务分发器PULL任务,任务结束后将结果PUSH给任务收集器。
PUSH - PULL

任务分发器代码:

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  用于发送消息的套接字
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (sender, "tcp://*:5557");
 
    //  用于发送开始信号的套接字
    void *sink = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sink, "tcp://localhost:5558");
 
    printf ("准备好worker后按任意键开始: ");
    getchar ();
    printf ("正在向worker分配任务...\n");
 
    //  发送开始信号
    s_send (sink, "0");
 
    //  初始化随机数生成器
    srandom ((unsigned) time (NULL));
 
    //  发送100个任务
    int task_nbr;
    int total_msec = 0;     //  预计执行时间(毫秒)
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        //  随机产生1-100毫秒的工作量
        workload = randof (100) + 1;
        total_msec += workload;
        char string [10];
        sprintf (string, "%d", workload);
        s_send (sender, string);
    }
    printf ("预计执行时间: %d 毫秒\n", total_msec);
    sleep (1);              //  延迟一段时间,让任务分发完成
 
    zmq_close (sink);
    zmq_close (sender);
    zmq_term (context);
    return 0;
}

Worker代码:

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  获取任务的套接字
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  发送结果的套接字
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sender, "tcp://localhost:5558");
 
    //  循环处理任务
    while (1) {
        char *string = s_recv (receiver);
        //  输出处理进度
        fflush (stdout);
        printf ("%s.", string);
 
        //  开始处理
        s_sleep (atoi (string));
        free (string);
 
        //  发送结果
        s_send (sender, "");
    }
    zmq_close (receiver);
    zmq_close (sender);
    zmq_term (context);
    return 0;
}

收集器代码:

#include "zhelpers.h"
 
int main (void) 
{
    //  准备上下文和套接字
    void *context = zmq_init (1);
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_bind (receiver, "tcp://*:5558");
 
    //  等待开始信号
    char *string = s_recv (receiver);
    free (string);
 
    //  开始计时
    int64_t start_time = s_clock ();

    //  确定100个任务均已处理
    int task_nbr;
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        char *string = s_recv (receiver);
        free (string);
        if ((task_nbr / 10) * 10 == task_nbr)
            printf (":");
        else
            printf (".");
        fflush (stdout);
    }
    //  计算并输出总执行时间
    printf ("执行时间: %d 毫秒\n", 
        (int) (s_clock () - start_time));
 
    zmq_close (receiver);
    zmq_term (context);
    return 0;
}

REQ-ROUTER、DEALER-REP

简答的请求应答模式中(如下图),REQ直接与REP绑定,即客户端直接连接服务端,但是如果客户端数量变化了,对客户端是不可知的。
rep-req

所以,为了为了能够支持服务端的动态扩展,可以引入一个代理(Broker)装置,Broker通过ROUTER套接字将客户端的请求进行路由,通过Dealer套接字分发给服务端,那么我们的服务架构可以调整为:
Broker

客户端代码:

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  用于和服务端通信的套接字
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5559");
 
    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        s_send (requester, "Hello");
        char *string = s_recv (requester);
        printf ("收到应答 %d [%s]\n", request_nbr, string);
        free (string);
    }
    zmq_close (requester);
    zmq_term (context);
    return 0;
}

代理代码:

#include "zhelpers.h"
 
int main (void) 
{
    //  准备上下文和套接字
    void *context = zmq_init (1);
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    void *backend  = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (frontend, "tcp://*:5559");
    zmq_bind (backend,  "tcp://*:5560");
 
    //  初始化轮询集合
    zmq_pollitem_t items [] = {
        { frontend, 0, ZMQ_POLLIN, 0 },
        { backend,  0, ZMQ_POLLIN, 0 }
    };
    //  在套接字间转发消息
    while (1) {
        zmq_msg_t message;
        int64_t more;           //  检测多帧消息
 
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            while (1) {
                //  处理所有消息帧
                zmq_msg_init (&message);
                zmq_recv (frontend, &message, 0);
                size_t more_size = sizeof (more);
                zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
                zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  最后一帧
            }
        }
        if (items [1].revents & ZMQ_POLLIN) {
            while (1) {
                //  处理所有消息帧
                zmq_msg_init (&message);
                zmq_recv (backend, &message, 0);
                size_t more_size = sizeof (more);
                zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
                zmq_send (frontend, &message, more? ZMQ_SNDMORE: 0);
                zmq_msg_close (&message);
                if (!more)
                    break;      //  最后一帧
            }
        }
    }
    //  程序不会运行到这里,不过还是做好清理工作
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

服务端代码:

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  用于何客户端通信的套接字
    void *responder = zmq_socket (context, ZMQ_REP);
    zmq_connect (responder, "tcp://localhost:5560");
 
    while (1) {
        //  等待下一个请求
        char *string = s_recv (responder);
        printf ("Received request: [%s]\n", string);
        free (string);
 
        //  做一些“工作”
        sleep (1);
 
        //  返回应答信息
        s_send (responder, "World");
    }
    //  程序不会运行到这里,不过还是做好清理工作
    zmq_close (responder);
    zmq_term (context);
    return 0;
}

代理组件

ZMQ中为Broker提供了一些内置装置:

  • QUEUE,可用作请求-应答代理;
  • FORWARDER,可用作发布-订阅代理服务;
  • STREAMER,可用作管道模式代理。

在之前的例子中,Broker的实现较为复杂,而使用QUEUE之后,代码会变得非常简洁:

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  客户端套接字
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (frontend, "tcp://*:5559");
 
    //  服务端套接字
    void *backend = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (backend, "tcp://*:5560");
 
    //  启动内置装置
    zmq_device (ZMQ_QUEUE, frontend, backend);
 
    //  程序不会运行到这里
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

ZMQ多线程编程

上图的请求应答服务中,都是通过启动多个服务来达到并行处理,也就是利用了TCP通信协议进行实现,接下来主要借助ZMQ提供的进程内通信协议实现服务内的多线程编程,下图虚线内是服务内结构:
multithread-server

Server端代码如下,需要注意的是,进程间通信需要共享同一个ZMQ上下文:

#include "zhelpers.h"
#include <pthread.h>
 
static void *
worker_routine (void *context) {
    //  连接至代理的套接字
    void *receiver = zmq_socket (context, ZMQ_REP);
    zmq_connect (receiver, "inproc://workers");
 
    while (1) {
        char *string = s_recv (receiver);
        printf ("Received request: [%s]\n", string);
        free (string);
        //  工作
        sleep (1);
        //  返回应答
        s_send (receiver, "World");
    }
    zmq_close (receiver);
    return NULL;
}
 
int main (void)
{
    void *context = zmq_init (1);
 
    //  用于和client进行通信的套接字
    void *clients = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (clients, "tcp://*:5555");
 
    //  用于和worker进行通信的套接字
    void *workers = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (workers, "inproc://workers");
 
    //  启动一个worker池
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }
    //  启动队列装置
    zmq_device (ZMQ_QUEUE, clients, workers);
 
    //  程序不会运行到这里,但仍进行清理工作
    zmq_close (clients);
    zmq_close (workers);
    zmq_term (context);
    return 0;
}