阅读anjuke/zguide-cn后作的一些简单整理。
ZMQ连接协议
ZMQ提供多种传输协议用于不同场景下的通信,主要有三个单播传输协议和两个广播协议。
- 单播传输
- inproc,用于进程内不同线程之间的消息传输
- ipc,适用于进程间的通信
- tcp
- 广播协议
- pgm
- epgm
同时服务端也可以使用一个套接字,就可以绑定多个协议,也就是说可以使用不同协议建立连接:
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
套接字操作
- 创建销毁套接字
zmq_socket();
zmq_close();
- 配置套接字
zmq_setsockopt();
zmq_getsockopt();
- 建立连接
# 该方式创建的套接字,对应着服务端
zmq_bind();
# 该方式创建的套接字,对应着客户端
zmq_connect();
当客户端使用zmq_connect()进行连接时,并不要求服务端已经进行了端口绑定.
- 发送和接受消息
zmq_send();
zmq_recv();
TCP套接字和ZMQ套接字的区别
- ZMQ套接字传输的是指定长度的二进制数据块,而TCP是字节。
- ZMQ发送消息,先传输到本地的缓冲队列
- ZMQ套接字支持和多个套接字连接,而TCP只能点对点
套接字类型
套接字类型决定了套接字的行为,即在发送或接受消息时的规则。
- ZMQ_REP
- ZMQ_PUB
- ZMQ_SUB
- ZMQ_PUSH
- ZMQ_PULL
消息模式
- 请求应答模式,将一组服务端和一组客户端连接,
- 发布订阅,将一组发布者和一组订阅者相连接
- 管道模式,使用扇入或者扇出组装多个节点
- 排他对接模式,将两个套接字一对一连接起来
常用的套接字连接对:
- PUB - SUB
- REQ - REP
- REQ - ROUTER
- DEALER - REP
- DEALER - ROUTER
- DEALER - DEALER
- ROUTER - ROUTER
- PUSH - PULL
- PAIR - PAIR
Demo
PUB - SUB(发布订阅)
下面的天气信息发布订阅服务,Server绑定PUB套接字在5556端口,Client端创建SUB套接字并连接至服务端的5556端口。
服务端代码:
#include "zhelpers.h"
int main (void)
{
// 准备上下文和PUB套接字
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5556");
zmq_bind (publisher, "ipc://weather.ipc");
// 初始化随机数生成器
srandom ((unsigned) time (NULL));
while (1) {
// 生成数据
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// 向所有订阅者发送消息
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_term (context);
return 0;
}
客户端代码:
#include "zhelpers.h"
int main (int argc, char *argv [])
{
void *context = zmq_init (1);
// 创建连接至服务端的套接字
printf ("正在收集气象信息...\n");
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
// 设置订阅信息,默认为纽约,邮编10001
char *filter = (argc > 1)? argv [1]: "10001 ";
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
// 处理100条更新信息
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("地区邮编 '%s' 的平均温度为 %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_term (context);
return 0;
}
PUSH - PULL
下面的服务架构中,任务分发器(ventilator)绑定PUSH套接字在5557端口,在通知任务收集器(sink)后,产生100个任务分发给Worker;Worker从任务分发器PULL任务,任务结束后将结果PUSH给
任务分发器PULL任务,任务结束后将结果PUSH给任务收集器。
任务分发器代码:
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于发送消息的套接字
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// 用于发送开始信号的套接字
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("准备好worker后按任意键开始: ");
getchar ();
printf ("正在向worker分配任务...\n");
// 发送开始信号
s_send (sink, "0");
// 初始化随机数生成器
srandom ((unsigned) time (NULL));
// 发送100个任务
int task_nbr;
int total_msec = 0; // 预计执行时间(毫秒)
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// 随机产生1-100毫秒的工作量
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("预计执行时间: %d 毫秒\n", total_msec);
sleep (1); // 延迟一段时间,让任务分发完成
zmq_close (sink);
zmq_close (sender);
zmq_term (context);
return 0;
}
Worker代码:
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 获取任务的套接字
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 发送结果的套接字
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// 循环处理任务
while (1) {
char *string = s_recv (receiver);
// 输出处理进度
fflush (stdout);
printf ("%s.", string);
// 开始处理
s_sleep (atoi (string));
free (string);
// 发送结果
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_term (context);
return 0;
}
收集器代码:
#include "zhelpers.h"
int main (void)
{
// 准备上下文和套接字
void *context = zmq_init (1);
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// 等待开始信号
char *string = s_recv (receiver);
free (string);
// 开始计时
int64_t start_time = s_clock ();
// 确定100个任务均已处理
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// 计算并输出总执行时间
printf ("执行时间: %d 毫秒\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_term (context);
return 0;
}
REQ-ROUTER、DEALER-REP
简答的请求应答模式中(如下图),REQ直接与REP绑定,即客户端直接连接服务端,但是如果客户端数量变化了,对客户端是不可知的。
所以,为了为了能够支持服务端的动态扩展,可以引入一个代理(Broker)装置,Broker通过ROUTER套接字将客户端的请求进行路由,通过Dealer套接字分发给服务端,那么我们的服务架构可以调整为:
客户端代码:
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于和服务端通信的套接字
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5559");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (requester, "Hello");
char *string = s_recv (requester);
printf ("收到应答 %d [%s]\n", request_nbr, string);
free (string);
}
zmq_close (requester);
zmq_term (context);
return 0;
}
代理代码:
#include "zhelpers.h"
int main (void)
{
// 准备上下文和套接字
void *context = zmq_init (1);
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (frontend, "tcp://*:5559");
zmq_bind (backend, "tcp://*:5560");
// 初始化轮询集合
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
// 在套接字间转发消息
while (1) {
zmq_msg_t message;
int64_t more; // 检测多帧消息
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
while (1) {
// 处理所有消息帧
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // 最后一帧
}
}
if (items [1].revents & ZMQ_POLLIN) {
while (1) {
// 处理所有消息帧
zmq_msg_init (&message);
zmq_recv (backend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (frontend, &message, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // 最后一帧
}
}
}
// 程序不会运行到这里,不过还是做好清理工作
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
服务端代码:
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于何客户端通信的套接字
void *responder = zmq_socket (context, ZMQ_REP);
zmq_connect (responder, "tcp://localhost:5560");
while (1) {
// 等待下一个请求
char *string = s_recv (responder);
printf ("Received request: [%s]\n", string);
free (string);
// 做一些“工作”
sleep (1);
// 返回应答信息
s_send (responder, "World");
}
// 程序不会运行到这里,不过还是做好清理工作
zmq_close (responder);
zmq_term (context);
return 0;
}
代理组件
ZMQ中为Broker提供了一些内置装置:
- QUEUE,可用作请求-应答代理;
- FORWARDER,可用作发布-订阅代理服务;
- STREAMER,可用作管道模式代理。
在之前的例子中,Broker的实现较为复杂,而使用QUEUE之后,代码会变得非常简洁:
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 客户端套接字
void *frontend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "tcp://*:5559");
// 服务端套接字
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (backend, "tcp://*:5560");
// 启动内置装置
zmq_device (ZMQ_QUEUE, frontend, backend);
// 程序不会运行到这里
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
ZMQ多线程编程
上图的请求应答服务中,都是通过启动多个服务来达到并行处理,也就是利用了TCP通信协议进行实现,接下来主要借助ZMQ提供的进程内通信协议实现服务内的多线程编程,下图虚线内是服务内结构:
Server端代码如下,需要注意的是,进程间通信需要共享同一个ZMQ上下文:
#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
// 连接至代理的套接字
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
char *string = s_recv (receiver);
printf ("Received request: [%s]\n", string);
free (string);
// 工作
sleep (1);
// 返回应答
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// 用于和client进行通信的套接字
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
// 用于和worker进行通信的套接字
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// 启动一个worker池
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// 启动队列装置
zmq_device (ZMQ_QUEUE, clients, workers);
// 程序不会运行到这里,但仍进行清理工作
zmq_close (clients);
zmq_close (workers);
zmq_term (context);
return 0;
}