首页 > Computer > ZeroMQ指南:第3章:高级请求-应答模式
2013
06-09

ZeroMQ指南:第3章:高级请求-应答模式

1 请求-应答信封

请求-应答模式中,信封用于存储应答的返回地址。无状态的ZeroMQ网络通过使用信封才能创建来回的请求-应答会话。

一般的使用中不用理解请求-应答信封是如何工作的。使用REQ和REP的时候,套接字会自动处理信封。像上一章中那样编写设备的时候,也只需要读写消息的所有部分就可以了。ZeroMQ使用多段消息来实现信封,所以只要安全地复制多段消息,就隐含地复制信封了。

然而对于高级应用,你应该理解请求-应答信封是如何工作的。从信封的使用方面来看,ROUTER是这样工作的:

  • 对于从REQ端收到的消息,ROUTER套接字会在消息前面加上信封,信封中有签名:这条消息来自Lucy。然后ROUTER转发消息。也就是说,ROUTER对于流出的消息会添加包含应答地址的信封。
  • 对于从DEALER端发送给ROUTER的消息,ROUTER套接字会剥离信封,读取信封中的数据,从而知道消息是给Lucy的;而且ROUTER知道谁是Lucy,可以把消息发回给Lucy。这个过程与从REQ端接收消息的过程相反。
  • 如果不对信封进行任何处理,直接把消息传递给另一个ROUTER,则第二个ROUTER会在消息前面添加第二个信封,包含新的应答地址。

要点在于ROUTER知道如何把应答发送回正确的地方。REP套接字收到消息的时候会一层层地剥离信封并且保存起来,然后把包含实际应用数据的原始消息传递给应用层。通过REP套接字发送回应的时候,REP套接字会一层层地加上原来的信封,使得消息可以正确地发回请求端。

可以在消息链路中插入ROUTER-DEALER设备,形成下面的请求-应答模式:

 

如果将REQ套接字连接到ROUTER套接字,然后发送请求消息,则消息会包含三个帧:一个回应地址、一个空的消息帧、然后才是“真实”的消息。

 

  • 帧3中的数据是应用程序发送给REQ套接字的数据。
  • 空帧2是REQ套接字在把消息发送给ROUTER的时候添加的。
  • 帧1中的应答地址是ROUTER在把消息传递给接收方的时候添加的。

如果进行设备链扩展,则会在信封上添加信封,新的信封总是被添加到原来消息的前面。

 

下面是对我们在请求-应答模式中使用的四种套接字的更详细解释:

  • DEALER只是将你发送的消息负载平衡(售出)给所有已经连接的对端;而对收到的消息进行公平排队(买入)。它就像是PUSH和PULL的结合。
  • REQ会在你发送的每条消息前面增加空消息帧;而对收到的每条消息移除空消息帧。除此之外,REQ的工作与DEALER类似(实际上REQ是建立在DEALER之上的),只是REQ遵循严格的发送/接收循环。
  • 在将收到的消息传递给应用层之前,ROUTER会在消息前面增加包含应答地址的信封。ROUTER还会对自己发送的消息剥离信封(第一个消息帧),使用信封中的应答地址来确定将消息发送到哪里。
  • REP会存储所有的消息帧,直到遇到第一个空消息帧,然后将剩余部分(应用数据)传递给应用层。发送回应的时候,REP会加上已经保存的信封到消息上,然后使用和ROUTER一样的语义(实际上REP是建立在ROUTER之上的)发送回应,但是为了与REQ匹配,REP需要遵循严格的接收/发送循环。

 

注意理解router的工作模式:设想有一个router和多个peer通过某传输端点连接到了一起。

  • zmq_sendmsg(peer,…)将消息发送给router;zmq_recvmsg(router,…)读取消息,但是在消息前面加了一帧:回应地址。
  • zmq_sendmsg(router,..)将消息分发给peer,剥离第一帧当做消息信封,根据其中的地址进行路由(如果找不到信封指示地址,则丢弃消息);zmq_recvmsg(peer,…)收到的消息已经不含回应地址。
  • REP要求信封以空消息帧结束。如果在消息链的另一端不是使用REQ套接字,则你必须自己添加空消息帧。

有一个问题是:ROUTER从哪里得到应答地址?答案是使用套接字标识。如果没有为套接字设置标识,则ROUTER会自动生成一个标识,将其关联到与套接字的连接上。

 

如果为套接字设置了标识,则这个标识会传递给ROUTER套接字,并且会用作信封中的应答地址。

 

我们用程序来观察一下。下面的程序会输出ROUTER从两个REQ套接字收到的消息帧,其中一个套接字没有设置标识,另一个设置了标识Hello。

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* sink = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(sink,"inproc://example");
void* anonymous = zmq_socket(ctx,ZMQ_REQ);
zmq_connect(anonymous,"inproc://example");
s_send(anonymous,"router自动生成一个UUID");
s_dump(sink);
void* identified = zmq_socket(ctx,ZMQ_REQ);
zmq_setsockopt(identified,ZMQ_IDENTITY,"Hello",5);
zmq_connect(identified,"inproc://example");
s_send(identified,"router使用已经定义的套接字标识");
s_dump(sink);
zmq_close(sink);
zmq_close(anonymous);
zmq_close(identified);
zmq_ctx_destroy(ctx);
return 0;
}

dump函数的输出如下:

 

2 定制请求-应答路由

前面我们学习过,ROUTER使用消息信封来确定将回应发往哪里。现在我告诉你:只要你通过合理构造的信封提供正确的路由地址,ROUTER可以将消息异步地路由到任何连接到它的对端。也就是说:ROUTER是一个完全可控的路由器。

我们先来看看REQ和REP套接字的特征:

  • REQ是mama套接字,它不会倾听,只会要求答案。mama是严格同步的,使用的时候必须是消息链的“请求”端。
  • REP是papa套接字,只提供答案,不会主动开始对话。papa是严格同步的,使用的时候必须是消息链的“应答”端。

对于mama(妈妈)套接字,我们就像是孩子:不要说话,除非告诉你开始说话。mama不像papa那样眼界开阔,也不会像DEALER那样耸耸肩说“sure,whatever”。要跟mama说话,你必须让mama先跟你说话。幸运的是,mama不在意你是立刻回答,还是很久之后才回答,只要回答就好。

papa套接字是强壮、沉默,而且博学的。它只做一件事情,就是对于你提出的任何问题给出一个答案。papa套接字不会多嘴,不会主动给谁传递信息。

mama和papa只能位于消息链的末端,不能在中间,并且总是异步的。

我们只需要知道对方的地址,然后就可以通过路由器异步地向它发送消息。ROUTER是ZeroMQ中唯一的一种套接字类型,你可以告诉它“把这个消息发送给X”,这里X是某连接的对端。ROUTER可以用多种方法了解消息要发往的地址:

  • 默认情况下对端的标识为空,路由器会生成一个UUID;从另一端收到消息的时候,路由器会使用这个UUID来代表连接。
  • 如果已经为对端设置标识,则转发消息的时候路由器会在信封里面填入这个标识。
  • 具有显式标识的对端可以通过其他机制向路由器发送消息,比如说,通过其他类型(非REQ)的套接字。
  • 对端可能预先知道其他人的标识,比如说,通过配置文件或者其他方式。

至少有三种路由模式,每种对应一种可以很容易地连接到路由器的套接字类型:

  • Router-to-dealer
  • Router-to-mama(REQ)
  • Router-to-papa(REP)

每种情况中我们都可以完全控制如何路由消息,但是不同的路由模式涵盖不同的使用情况和消息流。下面的几节会结合不同的路由算法具体介绍它们。但是对于定制路由,请注意这些警告:

  • 定制路由会打破ZeroMQ的固有原则:代理端寻址套接字(delegate peer addressing to the socket)。使用定制路由的唯一原因是ZeroMQ缺少大范围的路由算法(默认的路由不能跨越两个或者更多个router)。
  • 未来版本的ZeroMQ可能会提供今天我们创建的路由算法,那么我们今天的设计可能会不再适用,或者变得多余。
  • 内置的路由有一定的可扩展性保证,比如说可以容易地与设备配合使用,而定制的路由则没有这种保证:你需要创建自己的设备。

总之,使用定制路由比让ZeroMQ完成路由开销更大,更脆弱,请仅仅在必须的时候才使用定制路由。

2.1 Router-to-Dealer路由

Router-to-dealer模式是最简单的。将一个router连接到多个dealer,然后使用你喜欢的任何算法将消息分发给dealer。dealer可以是汇聚点(处理消息而不回应)、代理(转发消息到其他节点),或者服务(发回回应)。

如果dealer需要回应,则只能有一个router连接到它:dealer不知道如何回应特定的对端,所以如果有多个对端,则会进行负载平衡,这是很奇怪的(请求来自哪里,回应就应该发回哪里,而不是在多个请求者间进行平衡)。如果dealer是汇聚点,则可以连接到任意数量的router。

router-to-dealer模式中可以进行什么类型的路由?如果dealer回应router,则可以告诉router自己已经完成工作了,这样可以根据dealer有多快的信息来进行路由。因为router和dealer都是异步的,这就需要一点技巧了,至少你需要zmq_poll。

我们会做一个dealer不回应,纯粹是汇聚点的例子。我们的路由算法是带权重的随机分发:我们有两个dealer,发送给一个的消息量是发送给另一个的两倍。

 

#include <zmq.h>

#include <zmq_helpers.h>

#include <boost/thread.hpp>

#include <stdio.h>

#pragma comment(lib,”libzmq-v100-mt.lib”)

//#define ONE_CONTEXT_OF_EVERY_THREAD

#if defined(ONE_CONTEXT_OF_EVERY_THREAD)

static void worker_proc(const char* p_id){

#else

static void worker_proc(void* ctx,const char* p_id){

#endif

#if defined(ONE_CONTEXT_OF_EVERY_THREAD)

void* ctx = zmq_ctx_new();

#endif

void* worker = zmq_socket(ctx,ZMQ_DEALER);

zmq_setsockopt(worker,ZMQ_IDENTITY,p_id,strlen(p_id));

zmq_connect(worker,”tcp://localhost:6000″);

int task_cnt = 0;

while(true){

char* p_task = s_recv(worker);

bool finished = (strcmp(p_task,”END”) == 0);

free(p_task);

if (finished){

printf(“工作者%s执行了%d个任务\n”,p_id,task_cnt);

break;

}

else{

++task_cnt;

}

}

zmq_close(worker);

#if defined(ONE_CONTEXT_OF_EVERY_THREAD)

zmq_ctx_destroy(ctx);

#endif

}

int main(void){

void* ctx = zmq_ctx_new();

void* client = zmq_socket(ctx,ZMQ_ROUTER);

zmq_bind(client,”tcp://*:6000″);

#if defined(ONE_CONTEXT_OF_EVERY_THREAD)

boost::thread worker_a(worker_proc,”A”);

boost::thread worker_b(worker_proc,”B”);

#else

boost::thread worker_a(worker_proc,ctx,”A”);

boost::thread worker_b(worker_proc,ctx,”B”);

#endif

s_sleep(1000);

srand((unsigned int)time(NULL));

for(int idx = 0; idx < 100; ++idx){

s_sendmore(client,((randof(3) > 0) ? “A” : “B”));

s_send(client,”工作负载”);

}

s_sendmore(client,”A”);

s_send(client,”END”);

s_sendmore(client,”B”);

s_send(client,”END”);

s_sleep(1000);

zmq_close(client);

zmq_ctx_destroy(ctx);

return 0;

}

  • router不知道dealer何时就绪。如果加入dealer通知router自己就绪的代码,则会分散我们的注意力。所以这里router只是执行s_sleep(1000),然后再启动dealer线程。如果没有这个sleep,则router将需要发送无法路由的消息,ZeroMQ会丢弃这种消息。
  • 这种行为是特定于ROUTER套接字的。PUB套接字也会在没有订阅者的时候丢弃消息,但是其他类型的套接字会对消息进行排队,直到有对端可以接收消息。

将消息通过router发送给dealer的时候,我们创建了一个只含有套接字标识的帧作为信封。给router发送消息的时候,router会认为消息的第一帧是应答地址,会根据这个地址进行消息路由。

注意:如果给定了无效地址,则router会丢弃消息。任何情况下都不能认为消息被正确地路由到了目的地,除非从目标节点收到了回应。

Dealer的工作于PUSH和PULL的结合非常相似。但是,不能将PULL或者PUSH套接字连接到REQ/REP套接字。

2.2 最近最少使用路由(LRU模式)(Router-to-mama)

前面讲过,mama(REQ套接字)不会听你说,如果你试图跟它说话,它会忽略你的话。你必须等待mama对你说些什么,然后你才可以给它答案。这个特征对于路由很有用,因为这意味着我们可以让一批mama等待答案,也就是mama会告诉我们什么时候它准备好了。

可以将一个router连接到多个mama,然后像dealer那样分发消息。mama需要应答,但是会让你说最后一句话。与dealer类似,只能将mama连接到一个router,因为mama总是首先主动开始与router交流。不能将一个mama连接到多个router,除非是在做多路冗余路由。这个太复杂,后面才会解释,希望你知道这个很复杂,之后在必须的时候才尝试做。

可以用router-to-mama模式做什么类型的路由?可能最明显的是“最近最少使用(least-recently-used、LRU)”路由,也就是路由到等待时间最长的mama。

 

#include <zmq.h>
#include <zmq_helpers.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
//#define ONE_CONTEXT_OF_EVERY_THREAD
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
static void worker_proc(const char* p_id){
#else
static void worker_proc(void* ctx,char* p_id){
#endif
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
void* ctx = zmq_ctx_new();
#endif
void* worker = zmq_socket(ctx,ZMQ_REQ);
zmq_setsockopt(worker,ZMQ_IDENTITY,p_id,strlen(p_id));
zmq_connect(worker,"tcp://localhost:6001");
int task_cnt = 0;
while(true){
s_send(worker,"READY");
char* p_task = s_recv(worker);
bool finished = ((p_task == NULL) || (strcmp(p_task,"END") == 0));
free(p_task);
if (finished){
printf("工作者%s执行了%d个任务\n",p_id,task_cnt);
break;
}
else{
++task_cnt;
}
}
free(p_id);
zmq_close(worker);
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zmq_ctx_destroy(ctx);
#endif
}
int main(void){
void* ctx = zmq_ctx_new();
void* client = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(client,"tcp://*:6001");
const char* p_name = "ABCDEF";
int len = strlen(p_name);
// 启动工作线程
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_name[idx];
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
boost::thread worker_a(worker_proc,_strdup(szName));
#else
boost::thread worker_a(worker_proc,ctx,_strdup(szName));
#endif
}
srand((unsigned int)time(NULL));
// 发送工作负载
int task_cnt = 100;
while(task_cnt-- > 0){
char* p_id = s_recv(client);
char* p_empty = s_recv(client);
char* p_ready = s_recv(client);
s_sendmore(client,p_id);
s_sendmore(client,"");
s_send(client,"工作负载");
free(p_id);
free(p_empty);
free(p_ready);
}
// 通知各个工作线程结束
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_name[idx];
s_sendmore(client,szName);
s_send(client,"END");
}
s_sleep(500);
zmq_close(client);
zmq_ctx_destroy(ctx);
return 0;
}

这个例子实现LRU时不需要任何特别的数据结构,因为我们不需要同步各个工作者。更实用的LRU算法需要收集已经准备就绪的工作者到队列中,然后使用这个队列来路由客户端请求,后面的例子会实现这个的。

  • 不需要等待时间,因为mama显式告诉路由器自己已经准备好了。
  • 这里我们使用zhelpers.h中的s_set_id函数生成可打印字符串形式的标识。
  • 要将消息路由到mama,则需要创建它可以识别的信封:地址加上一个空消息帧。

 

2.3 基于地址的路由(Router-to-papa)

典型的请求-应答模式中router不会直接与papa套接字通信,而是由dealer与之通信。dealer的作用就是:把提问传递给随机的papa,然后返回它们的应答。router通常更适合于与mama交流。强调一下:ZeroMQ中的典型模式是工作起来最棒的,如果我们试图打破常规,则可能出现问题。

papa套接字有两个特征:

  • 严格遵循请求-回应的步骤
  • 接受任意大小的信封栈,在发送回应的时候会原封不动地返回原来的信封

通常的请求-回应模式中,papa是匿名并且可以替换的,但是我们现在讨论的是定制路由,所以我们有把请求发送给papaA而不是papaB的原因。

ZeroMQ的核心设计理念是边缘节点很多而且具有智能(smart and many),中间节点很多却是哑的(vast and dumb)。但是这并不意味着它们能够相互寻址,我们需要探寻如何到达指定的papa。随后我们才讨论多跳路由,现在我们只关心最后一步:router与papa的交流。

 

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* client = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(client,"tcp://*:6002");
void* worker = zmq_socket(ctx,ZMQ_REP);
zmq_setsockopt(worker,ZMQ_IDENTITY,"A",1);
zmq_connect(worker,"tcp://localhost:6002");
s_sleep(500);
s_sendmore(client,"A");
s_sendmore(client,"地址");
s_sendmore(client,"地址");
s_sendmore(client,"地址");
s_sendmore(client,"");
s_send(client,"工作负载");
s_dump(worker);
s_send(worker,"工作完成");
s_dump(client);
zmq_close(client);
zmq_close(worker);
zmq_ctx_destroy(ctx);
return 0;
}

程序的输出如下:

 

关于代码的注解:

  • 实际应用的时候通常papa和router是在单独的节点中的。这个例子中它们在同一个线程中,这让事件序列比较清晰。
  • zmq_connect不会立即完成连接。papa套接字需要在后台花一些时间来连接到router。真实的应用中router不可能知道papa的存在,直到进行了一些会话。这个例子只是简单地调用sleep(1),以此确认连接已经完成。如果移除sleep语句,则papa套接字收不到消息(你可以试试)。
  • 我们使用papa的标识进行路由。试试使用错误的地址,比如说“B”,那么papa就收不到消息了。
  • s_dump和其他工具函数来自zhelpers.h头文件。使用这个头文件会让套接字上一次次的重复工作变得清晰,它也是我们可以在ZeroMQ API之上建立的一个有用的层次。后面创建真实应用的时候会再详细介绍这个头文件。
  • 要路由到papa,必须创建它可以识别的信封。

3 一个请求-应答消息代理

我们用前面介绍的ZeroMQ消息信封的知识来创建一个使用定制路由的队列设备,我们可以称之为“消息代理”。我们要创建的队列设备可以连接很多客户端和很多工作者,而且允许你使用任何路由算法。示例代码具体展示的是最近最少使用算法,因为这是大多数情况下最明显的适合用于负载平衡的算法。

开始之前先来复习下典型的请求-应答模式,看看如何才能将其扩展到非常大的面向服务的网络。典型的模式下只有一个客户端与多个工作者交流。

 

这个模式使用了多个papa,但是如果想使用多个mama,则需要一个中间设备。这个设备通常由连接在一起的一个router和一个dealer构成。内置的ZMQ_QUEUE设备就是一个实例,它只是尽可能快地在两个套接字之间复制消息帧。

 

要点在于router在请求消息的信封中存储了发起请求的mama的地址,而dealer和papa不会管这个信封,并且router知道应该将回应发送给哪个mama。这个模式中的papa是匿名的,不需要被寻址,并且所有的papa提供相同的服务。

上面的设计中我们使用的是delaer套接字内置的负载平衡算法。但是现在我们想使用最近最少使用算法,所以我们使用前面学习过的router-to-mama模式。

我们的代理——router-to-router模式的LRU队列——不能简单地复制消息帧。

 

程序最难的部分是:

  • 每个套接字读取和写入的信封
  • LRU算法

#include <zmq.h>
#include <zmq_helpers.h>
#include <boost/thread.hpp>
#include <list>
#include <stdio.h>
using namespace std;
#pragma comment(lib,"libzmq-v100-mt.lib")
//#define ONE_CONTEXT_OF_EVERY_THREAD
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
static void client_proc(char* p_id){
#else
static void client_proc(void* ctx,char* p_id){
#endif
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
void* ctx = zmq_ctx_new();
#endif
void* client = zmq_socket(ctx,ZMQ_REQ);
zmq_setsockopt(client,ZMQ_IDENTITY,p_id,strlen(p_id));
zmq_connect(client,"tcp://localhost:6005");
int task_cnt = 100;
while(task_cnt-->0){
s_send(client,"HELLO");
char* p_reply = s_recv(client);
assert((p_reply != NULL) && (strcmp(p_reply,"OK") == 0));
free(p_reply);
}
printf("** 客户端%s 退出**\n",p_id);
free(p_id);
s_send(client,"END");
zmq_close(client);
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zmq_ctx_destroy(ctx);
#endif
}
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
static void worker_proc(char* p_id){
#else
static void worker_proc(void* ctx,char* p_id){
#endif
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
void* ctx = zmq_ctx_new();
#endif
void* worker = zmq_socket(ctx,ZMQ_REQ);
zmq_setsockopt(worker,ZMQ_IDENTITY,p_id,strlen(p_id));
zmq_connect(worker,"tcp://localhost:6006");
s_send(worker,"READY");
int task_cnt = 0;
bool bEnd = false;
while(!bEnd){
char* p_client = s_recv(worker);
bEnd = (strcmp(p_client,"END") == 0);
if (!bEnd){
char* p_empty = s_recv(worker);
char* p_req = s_recv(worker);
s_sendmore(worker,p_client);
s_sendmore(worker,p_empty);
s_send(worker,"OK");
++task_cnt;
free(p_empty);
free(p_req);
}
free(p_client);
}
printf("工作者%s处理了%d个任务\n",p_id,task_cnt);
free(p_id);
s_send(worker,"END");
zmq_close(worker);
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zmq_ctx_destroy(ctx);
#endif
}
int main(void){
void* ctx = zmq_ctx_new();
void* front = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(front,"tcp://*:6005");
void* back = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(back,"tcp://*:6006");
const char* p_worker = "ABC";
int len = strlen(p_worker);
int worker_cnt = len;
// 启动工作线程
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_worker[idx];
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
boost::thread worker(worker_proc,_strdup(szName));
#else
boost::thread worker(worker_proc,ctx,_strdup(szName));
#endif
}
const char* p_client = "0123456789";
len = strlen(p_client);
int client_cnt = len;
// 启动客户端线程
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_client[idx];
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
boost::thread client(client_proc,_strdup(szName));
#else
boost::thread client(client_proc,ctx,_strdup(szName));
#endif
}
list<char*> listWorkerReady;
zmq_pollitem_t items[] ={
{back,0,ZMQ_POLLIN,0},
{front,0,ZMQ_POLLIN,0},
};
while((worker_cnt > 0) && (zmq_poll(items,(listWorkerReady.empty() ? 1 : 2),-1) >= 0)){
// 某个工作者就绪通知或者应答
if (items[0].events & ZMQ_POLLIN){
while(true){
int events = 0;
size_t optlen = sizeof(events);
zmq_getsockopt(back,ZMQ_EVENTS,&events,&optlen);
if (!(events & ZMQ_POLLIN)){
break;
}
char* p_server = s_recv(back);
char* p_empty = s_recv(back);
char* p_ready = s_recv(back);
bool bEnd = (strcmp(p_ready,"END") == 0);
// 退出通知
if (bEnd){
assert(worker_cnt > 0);
--worker_cnt;
}
// 应答
else if (strcmp(p_ready,"READY") != 0){
char* p_client = p_ready;
char* p_empty2 = s_recv(back);
char* p_ok = s_recv(back);
assert((p_ok != NULL) && (strcmp(p_ok,"OK") == 0));
s_sendmore(front,p_client);
s_sendmore(front,p_empty2);
s_send(front,p_ok);
free(p_empty2);
free(p_ok);
}
// 就绪或者应答
if (!bEnd){
list<char*>::iterator itor = listWorkerReady.begin();
for(; itor != listWorkerReady.end(); ++itor){
if (strcmp(p_server,*itor) == 0){
break;
}
}
assert(itor == listWorkerReady.end());
listWorkerReady.push_back(_strdup(p_server));
}
free(p_ready);
free(p_empty);
free(p_server);
}
}
// 收到客户端的请求
if (items[1].events & ZMQ_POLLIN){
while(listWorkerReady.size() > 0){
int events = 0;
size_t optlen = sizeof(events);
zmq_getsockopt(front,ZMQ_EVENTS,&events,&optlen);
if (!(events & ZMQ_POLLIN)){
break;
}
char* p_client = s_recv(front);
char* p_empty = s_recv(front);
char* p_hello = s_recv(front);
assert(p_hello != NULL);
if (strcmp(p_hello,"END") == 0){
assert(client_cnt > 0);
if (--client_cnt == 0){
// 通知工作者结束
int len = strlen(p_worker);
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_worker[idx];
s_sendmore(back,szName);
s_sendmore(back,"");
s_send(back,"END");
}
}
}
else{
assert(strcmp(p_hello,"HELLO") == 0);
assert(listWorkerReady.size() > 0);
char* p_server = listWorkerReady.front();
listWorkerReady.pop_front();
s_sendmore(back,p_server);
s_sendmore(back,p_empty);
s_sendmore(back,p_client);
s_sendmore(back,p_empty);
s_send(back,p_hello);
free(p_server);
}
free(p_client);
free(p_empty);
free(p_hello);
}
}
}
assert(listWorkerReady.size() == strlen(p_worker));
while(!listWorkerReady.empty()){
free(listWorkerReady.front());
listWorkerReady.pop_front();
}
zmq_close(front);
zmq_close(back);
zmq_ctx_destroy(ctx);
return 0;
}

有了上述代码,我们可以根据工作者初始的”READY”消息对LRU算法进行一些扩展。比如说,工作者启动的时候可以执行一个性能自测,告诉代理自己有多快。代理可以选择可用而且最快的工作者,而不是使用LRU或者轮转算法。

4 一个高层API(CZMQ)

直接使用原始的ZeroMQ API来读写多段消息就像用牙签吃带有炸鸡和蔬菜的热面汤:代码复杂而不可重用。我们需要一种简单的API,可以使用单个调用来接收或者发送整个消息,包括消息信封。ZeroMQ API本身不提供这种功能,但是我们可以进行一些封装。

ZeroMQ中的术语“messgage”有一些歧义:它既指整个多段消息,也指组成多段消息的每一段消息。为避免歧义,可以使用三个概念:string、frame(一个消息帧)和message(由一个或者多个frame组成的消息)。

我们需要的API应该具有下述能力:

  • 自动处理套接字。可以自动设定linger超时,可以在关闭上下文的时候自动关闭套接字。
  • 可移植的线程管理。
  • 可移植的时钟操作。
  • 代替zmq_poll的反应器(reactor)。
  • 恰当地处理Ctrl+C。

CZMQ实现了上述能力。

  • CZMQ中的反应器由zloop实现,当然其内部是使用zmq_poll的。
  • zctx类自动建立SIGINT信号处理。但是你的代码应该在zmq_poll返回-1,或者接收数据的函数(zstr_recv、zframe_recv、zmsg_recv)返回NULL的时候进行恰当的处理。
  • zctx_interrupted可以判断程序是否被中断(Ctrl+C)。

用CZMQ重写的第3节的程序:

#include <stdint.h>
#if defined(WIN32)
typedef int64_t ssize_t;
#endif
#include <zmq.h>
#include <czmq.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
#pragma comment(lib,"czmq.lib")
#define ONE_CONTEXT_OF_EVERY_THREAD
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
static void client_proc(char* p_id){
#else
static void client_proc(zctx_t* ctx,char* p_id){
#endif
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zctx_t* ctx = zctx_new();
#endif
void* client = zsocket_new(ctx,ZMQ_REQ);
zsocket_set_identity(client,p_id);
zsocket_connect(client,"tcp://localhost:6005");
int task_cnt = 100;
while(task_cnt-->0){
zstr_send(client,"HELLO");
char* p_reply = zstr_recv(client);
if (p_reply == NULL){
break;
}
free(p_reply);
}
printf("** 客户端%s 退出**\n",p_id);
free(p_id);
zstr_send(client,"END");
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zmq_ctx_destroy(ctx);
#else
zsocket_destroy(ctx,client);
#endif
}
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
static void worker_proc(char* p_id){
#else
static void worker_proc(zctx_t* ctx,char* p_id){
#endif
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zctx_t* ctx = zctx_new();
#endif
void* worker = zsocket_new(ctx,ZMQ_REQ);
zsocket_set_identity(worker,p_id);
zsocket_connect(worker,"tcp://localhost:6006");
zstr_send(worker,"READY");
int task_cnt = 0;
bool bEnd = false;
while(!bEnd){
zmsg_t* p_msg = zmsg_recv(worker);
bEnd = (memcmp(zframe_data(zmsg_first(p_msg)),"END",3) == 0);
if (bEnd){
zmsg_destroy(&p_msg);
}
else{
zframe_reset(zmsg_last(p_msg),"OK",2);
zmsg_send(&p_msg,worker);
++task_cnt;
}
}
printf("工作者%s处理了%d个任务\n",p_id,task_cnt);
free(p_id);
zstr_send(worker,"END");
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
zmq_ctx_destroy(ctx);
#else
zsocket_destroy(ctx,worker);
#endif
}
struct IruQueueContext{
void* front;
void* back;
zlist_t* p_workers;
char* p_worker_name;
int client_cnt;
int worker_cnt;
};
// 处理前端套接字上的数据
static int handle_front(zloop_t* loop,zmq_pollitem_t* poller,void* arg){
IruQueueContext* p_ctx = (IruQueueContext*)arg;
zmsg_t* p_msg = zmsg_recv(p_ctx->front);
if (p_msg != NULL){
zframe_t* p_hello = zmsg_last(p_msg);
if (memcmp(zframe_data(p_hello),"END",3) == 0){
assert(p_ctx->client_cnt > 0);
if (--p_ctx->client_cnt == 0){
// 通知工作者结束
int len = strlen(p_ctx->p_worker_name);
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_ctx->p_worker_name[idx];
zstr_sendm(p_ctx->back,szName);
zstr_sendm(p_ctx->back,"");
zstr_send(p_ctx->back,"END");
}
}
}
else{
assert(memcmp(zframe_data(p_hello),"HELLO",5) == 0);
assert(zlist_size(p_ctx->p_workers) > 0);
zmsg_wrap(p_msg,(zframe_t*)zlist_pop(p_ctx->p_workers));
zmsg_send(&p_msg,p_ctx->back);
// 从有工作者的状态变成没有工作者的状态,停止轮询前端套接字
if (zlist_size(p_ctx->p_workers) == 0){
zmq_pollitem_t item = {p_ctx->front,0,ZMQ_POLLIN};
zloop_poller_end(loop,&item);
}
}
}
return 0;
}
// 处理后端套接字上的数据
static int handle_back(zloop_t* loop,zmq_pollitem_t* poller,void* arg){
IruQueueContext* p_ctx = (IruQueueContext*)arg;
zmsg_t* p_msg = zmsg_recv(p_ctx->back);
if (p_msg != NULL){
zframe_t* p_addr = zmsg_unwrap(p_msg);
zframe_t* p_ready = zmsg_first(p_msg);
bool bEnd = (memcmp(zframe_data(p_ready),"END",3) == 0);
// 退出通知
if (bEnd){
zmsg_destroy(&p_msg);
assert(p_ctx->worker_cnt > 0);
if (--p_ctx->worker_cnt == 0){
return -1;// 返回表示退出事件循环
}
}
else{
zlist_append(p_ctx->p_workers,p_addr);
// 从没有工作者状态变到有工作者状态,开始轮询前端套接字
if (zlist_size(p_ctx->p_workers) == 1){
zmq_pollitem_t item = {p_ctx->front,0,ZMQ_POLLIN};
zloop_poller(loop,&item,handle_front,p_ctx);
}
// 应答
if (memcmp(zframe_data(p_ready),"READY",5) != 0){
zmsg_send(&p_msg,p_ctx->front);
}
// 就绪通知
else{
zmsg_destroy(&p_msg);
}
}
}
return 0;
}
int main(void){
zctx_t* ctx = zctx_new();
void* front = zsocket_new(ctx,ZMQ_ROUTER);
zsocket_bind(front,"tcp://*:6005");
void* back = zsocket_new(ctx,ZMQ_ROUTER);
zsocket_bind(back,"tcp://*:6006");
IruQueueContext* p_ctx = (IruQueueContext*)zmalloc(sizeof(IruQueueContext));
p_ctx->front = front;
p_ctx->back = back;
const char* p_worker = "ABC";
int len = strlen(p_worker);
p_ctx->p_worker_name = "ABC";
p_ctx->worker_cnt = len;
// 启动工作线程
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_worker[idx];
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
boost::thread worker(worker_proc,_strdup(szName));
#else
boost::thread worker(worker_proc,ctx,_strdup(szName));
#endif
}
const char* p_client = "0123456789";
len = strlen(p_client);
p_ctx->client_cnt = len;
// 启动客户端线程
for(int idx = 0; idx < len; ++idx){
char szName[2] = {0};
szName[0] = p_client[idx];
#if defined(ONE_CONTEXT_OF_EVERY_THREAD)
boost::thread client(client_proc,_strdup(szName));
#else
boost::thread client(client_proc,ctx,_strdup(szName));
#endif
}
p_ctx->p_workers = zlist_new();
// 事件循环
zloop_t* reactor = zloop_new();
zmq_pollitem_t item = {p_ctx->back,0,ZMQ_POLLIN};
zloop_poller(reactor,&item,handle_back,p_ctx);
zloop_start(reactor);
zloop_destroy(&reactor);
// 销毁就绪工作者队列
assert(zlist_size(p_ctx->p_workers) == strlen(p_worker));
while(zlist_size(p_ctx->p_workers) > 0){
zframe_t* frame = (zframe_t*)zlist_pop(p_ctx->p_workers);
zframe_destroy(&frame);
}
zlist_destroy(&p_ctx->p_workers);
free(p_ctx);
zmq_ctx_destroy(ctx);
return 0;
}

5 异步的客户端-服务器

前面的router-to-dealer示例展示了1到N的使用情形,一个客户端异步地与多个工作者通信。我们可以把这个模型颠倒过来,创建一个N到1的体系,让多个客户端异步地与单个服务器通信。

这个体系的工作模式如下:

  • 客户端连接到服务器,发送请求。
  • 对每个请求,服务器发送0到N个回应。
  • 客户端可以发送多个请求,不需要等待回应。
  • 服务器可以发送多个回应,不需要等待新的请求。

使用CZMQ实现的代码:

#include <stdint.h>
#if defined(WIN32)
typedef int64_t ssize_t;
#endif
#include <zmq.h>
#include <zmq_helpers.h>
#include <czmq.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
#pragma comment(lib,"czmq.lib")
static boost::mutex g_mutex_print;
// 客户端
static void client_proc(const char* p_id){
zctx_t* ctx = zctx_new();
void* client = zsocket_new(ctx,ZMQ_DEALER);
zsocket_set_identity(client,(char*)p_id);
zsocket_connect(client,"tcp://localhost:5570");
int request_idx = 0;
while(true){
for(int idx = 0; idx < 100; ++idx){
zmq_pollitem_t item = {client,0,ZMQ_POLLIN,0};
zmq_poll(&item,1,10 * ZMQ_POLL_MSEC);
if (item.revents & ZMQ_POLLIN){
zmsg_t* p_msg = zmsg_recv(client);
g_mutex_print.lock();
zframe_print(zmsg_last(p_msg),p_id);
g_mutex_print.unlock();
zmsg_destroy(&p_msg);
}
}
zstr_sendf(client,"第%d号请求",++request_idx);
}
zmq_ctx_destroy(ctx);
}
// 工作者
static void server_worker(zctx_t* ctx){
void* worker = zsocket_new(ctx,ZMQ_DEALER);
zsocket_connect(worker,"inproc://worker");
while(true){
zmsg_t* p_msg = zmsg_recv(worker);
zframe_t* p_addr = zmsg_pop(p_msg);
zframe_t* p_cont = zmsg_pop(p_msg);
zmsg_destroy(&p_msg);
char* p_cont2 = zframe_strdup(p_cont);
srand((unsigned int)zclock_time());
int reply_cnt = randof(5);
for(int idx = 0; idx < reply_cnt; ++idx){
zclock_sleep(randof(1000) + 1);
zframe_send(&p_addr,worker,ZFRAME_REUSE + ZFRAME_MORE);
char buf[256];
snprintf(buf,256,"%s的第%d次应答",p_cont2,idx + 1);
zframe_reset(p_cont,buf,strlen(buf));
zframe_send(&p_cont,worker,ZFRAME_REUSE);
}
free(p_cont2);
zframe_destroy(&p_addr);
zframe_destroy(&p_cont);
}
zsocket_destroy(ctx,worker);
}
static void server_proc(const char* p_id){
zctx_t* ctx = zctx_new();
void* front = zsocket_new(ctx,ZMQ_ROUTER);
zsocket_bind(front,"tcp://*:5570");
void* back = zsocket_new(ctx,ZMQ_DEALER);
zsocket_bind(back,"inproc://worker");
for(int idx = 0; idx < 5; ++idx){
boost::thread worker(server_worker,ctx);
}
#if 1
while(true){
zmq_pollitem_t items[]={
{front,0,ZMQ_POLLIN,0},
{back,0,ZMQ_POLLIN,0},
};
zmq_poll(items,2,-1);
if (items[0].revents & ZMQ_POLLIN){
zmsg_t* p_msg = zmsg_recv(front);
zmsg_send(&p_msg,back);
}
if (items[1].revents & ZMQ_POLLIN){
zmsg_t* p_msg = zmsg_recv(back);
zmsg_send(&p_msg,front);
}
}
#else
zmq_device(ZMQ_QUEUE,front,back);
#endif
zmq_ctx_destroy(ctx);
}
int main(void){
boost::thread_group group;
group.create_thread(boost::bind(client_proc,"A"));
group.create_thread(boost::bind(client_proc,"B"));
group.create_thread(boost::bind(client_proc,"C"));
group.create_thread(boost::bind(server_proc,"1"));
group.join_all();
return 0;
}

跟其他多任务示例一样,这个程序运行在单进程模式,但是每个任务有自己的上下文,概念上跟单独的进程一样。关于代码的注解:

  • 客户端每秒发送一个请求,获取零个或者多个回应。为使用zmq_poll完成这个工作,不能简单地使用1秒的超时,否则我们只能在收到最后一个回应之后1秒发送一个新的请求。所以我们以较高的频率进行轮询(每秒100次)。这意味着服务器可以使用请求作为一种心跳,检测客户端是仍然存在,还是已经断线。
  • 服务器使用一个工作线程池,池中每个线程同步地处理一个请求。服务器通过一个使用内部队列的前端套接字连接到这些线程。为帮助调试,代码实现了自己的队列设备逻辑。
  • 我们在客户端和服务器之间进行dealer-to-router会话,但是在服务器内部的主线程和工作者之间进行dealer-to-dealer会话。如果工作者是严格同步的,则必须使用REP套接字。但是我们需要发送多个回应,所以需要使用异步套接字。我们不想路由回应,回应总是返回给发送请求的服务器线程。

看看信封路由。客户端发送一个简单消息的时候,服务器线程收到一个两段消息(带有客户端标识作为前缀的真实消息)。而对于server-to-worker接口,有两种可能的设计:

  • 工作者得到不带地址的消息,我们使用一个router套接字作为后端来管理从服务器线程到工作者线程的连接。这要求工作者启动的时候通知服务器自己的存在,可以将请求路由到工作者,跟踪哪个客户端“连接”到了哪个工作者。这就是前面我们讲过的LRU模式。
  • 工作者得到带地址的消息,并且返回带地址的回应。这要求工作者可以正确地解码和重建信封,但是不需要其他机制。

第二种设计更简单:

 

创建维护大量状态信息的服务器时,你可能会遇到一个典型的问题。如果服务器为每个客户端维护一些状态信息,并且客户端不时地上线和下线,则服务器最终会耗尽资源。即使同一个客户端保持连接,如果你使用默认的标识,则每个连接看起来都像新的一样(具有不同的标识)。

示例代码只使用了少量状态信息,并且只维持很短的时间。对于真实的服务器,你应该:

  • 在客户端和服务器间维持心跳。示例代码中每秒发送的请求可以当做是心跳。
  • 使用客户端标识(不论是自动生成的还是显式指定的)作为状态信息的关键字。
  • 检测心跳停止。比如说,如果两秒没有收到心跳,则服务器可以销毁相应客户端的状态信息。

6 真实的示例:代理间路由

我们的客户要求我们设计一个大的云计算设施。他要求这个云能够扩展到多个数据中心,每个数据中心都是一个客户端和工作者的集群,而多个数据中心作为一个整体进行工作。

6.1 明确需求细节

  • 工作者运行在各种硬件之上,可以处理任何任务。每个集群中有成百上千个工作者,总共有十多个这样的集群。
  • 客户端为工作者创建任务。每个任务是独立的工作单元,并且希望尽快地找到一个可用的工作者,把任务发送给它。有很多客户端,它们可以任意地上线和下线。
  • 真正的困难在于要求在任何时候都可以添加和删除集群:集群可以即时地退出或者加入,带有它的工作者和客户端。
  • 如果所处的集群中没有工作者,客户端的任务可以传递到云中的其他工作者中。
  • 客户端每次发送一个任务,等待回应。如果在X秒内没有收到回答,则重新发送。这不在我们考虑的范围之内,这是客户端API处理的事情。
  • 工作者每次处理一个任务,逻辑非常简单。如果崩溃,则启动工作者的脚本会重启工作者。

再次确认:

  • 集群间存在超级快的网络连接(super-duper network)
  • 每个集群至多有1000个客户端,每个客户端每秒最多发送10个请求。请求和回应都很小,不大于1KB。

简单计算一下,看看TCP能否承受住流量压力。2500个客户端×10个请求每秒×1000字节每个请求/回应×2 = 50MB/S,或者说400Mbps,对于千兆网络不是问题。

很显然,我们不需要额外的硬件或者协议,而只需要精巧的路由算法和仔细的设计。我们从设计单个集群(一个数据中心)开始,然后考虑如何把多个集群连接到一起。

6.2 单个集群的体系

工作者和客户端是同步的。我们使用LRU模式来将任务路由到工作者。工作者都是相同的,我们不考虑不同的服务。工作者是匿名的,客户端不能直接寻址工作者。我们不提供到达保证、重试等机制。

前面已经讲过,不能让客户端直接与工作者通信,这会让动态添加或者移除节点变得不可能。所以我们的基本模型由前面讲过的请求-应答消息代理构成。

6.3 扩展到多个集群

现在我们来扩展到多个集群。每个集群有一系列的客户端和工作者,代理把它们连接到一起。

 

问题是:如何让每个集群中的客户端与其他集群中的工作者通信?有几种解决方案,它们各有其优缺点:

  • 客户端直接连接到各个代理。优点是不需要修改代理或者工作者。但是客户端变得复杂了,并且客户端需要了解总体拓扑结构。比如说,如果我们想添加第三个或者第四个集群,则所有的客户端都会受影响。实际上,我们必须把路由和故障转移逻辑放到客户端中,这是不好的。
  • 工作者直接连接到各个代理。但是mama工作者没法做到这一点,它只能回答一个代理。可以使用papa套接字,但是papa不能应用LRU这样的定制路由算法,而只能使用内置的负载平衡。这就不能实现将工作分配到空闲的工作者的功能:我们的确需要LRU。一个解决方案是对工作者节点使用router套接字。我们把这个方案称作“Idea #1”。
  • 各个代理相互连接。这看起来是最整洁的,因为增加的额外连接最少。我们不能在线(on the fly)添加集群,但是这个可能超出我们的考虑范围了。这个方案中客户端和工作者不需要了解真实的网络拓扑结构,代理在有空闲能力的时候相互通知。我们把这个方案称作“Idea #2”。

先看看Idea #1。这个方案中工作者连接到各个代理,可以从各个代理接受工作。

这个方案看起来是可行的。但是它不能提供我们想要的行为:客户端在可能的情况下得到本地的工作者,只有在必须等待的时候在得到远程工作者。此外,工作者需要通知各个代理自己已经“就绪”,这样可能同时得到多个任务,而其他工作者却是空闲的。看来这个设计是不可取的,因为我们把路由逻辑放到边界中了。

再看看Idea #2。

这个方案很吸引人,因为问题被就地解决了,不影响其他地方。代理相互通信:“我有一些空闲处理能力,如果你有太多客户端请告诉我,我可以帮忙”。

实际上这只是一个更加精巧的路由算法:代理相互成为转包商(subcontractors)。其优点是:

  • 对于常见情况(客户端和工作者在相同集群中)它进行默认处理,而对例外情况(在集群间移动任务)进行额外处理。
  • 可以对不同类型的工作使用不同的消息流。也就是可以进行不同的处理,比如说,使用不同类型的网络连接。
  • 可以平滑地扩展。相互连接三个或者更多个代理不能变得过于复杂(over-complex)。如果这成为问题了,我们可以通过添加一个超级代理(super-broker)来简单地解决。

现在可以动手创建一个可用的示例。我们会把整个机器放到一个进程中。显然真实的情况不是这样的,但是使用单个进程比较容易模拟,而模拟能够精确地扩展到真实进程中。这就是ZeroMQ之美:你可以在微观层次进行设计,然后扩展到宏观层次。线程可以成为进程、成为机器,而模型和逻辑保持不变。我们的“集群”进程会包含客户端线程、工作者线程,以及代理线程。

我们已知的基本模型如下:

  • mama客户端(REQ)线程创建工作负载并且传递到代理(ROUTER)
  • mama工作者(REQ)线程处理工作负载,将结果返回给代理(ROUTER)
  • 代理使用LRU路由模型对负载进行排队和分发

6.4 联合与对等(Federation vs. Peering)

可以用多种方法来连接各个代理。我们要求连接方式能够让代理告诉其他代理“我有空闲的处理能力”,然后接收多个任务。我们也要求连接方式能够让代理告诉其他代理“停止,我已经满载了”。没必要非常完美:有时候我们可以接收不能立即处理的工作,随后尽可能快地处理它们。

最简单的互联方法是联合(federation):把代理的前端连接到其他代理的后端,让它们相互成为对方的客户端和工作者。注意:将一个套接字绑定到某个端点,同时连接到其他端点是合法的。

这种连接方式的逻辑简单:没有客户端的时候,通知其他代理自己“就绪”,从其他代理那里接受一个工作。问题是这种方式对于这个问题来说太简单了。联合的代理某个时刻只能处理一个任务。如果代理模拟同步的客户端和工作者(lock-step client and worker),则这种连接方式也是同步的,而且即使其他代理有多个可用的工作者,也不会被使用(因为一次只从其他代理那里接受一个工作)。

联合模型对于其他类型的路由是完美的,特别是对于面向服务架构(service-oriented architecture,SOA)(通过服务名和邻近关系进行路由,而不是使用LRU,或者负载平衡,或者随机分发)。所以不要认为联合模型是没有用的,它只是不适合于LRU和集群负载平衡。

现在来看看对等(peering)模型。对等模型中代理显式地相互感知,并且通过特权通道进行通信。假设我们需要互联N个代理,则每个代理有N-1个对端(peers),并且所有的代理使用完全相同的代码和逻辑。各个代理间有两种信息流:

  • 每个代理需要在任何时刻告知对端自己有多少个可用的工作者。这个信息可以非常简单,只是一个定期更新的数。对于这种信息,合适的套接字模型显然是发布-订阅。每个代理打开一个PUB套接字,在其上发布状态信息;每个代理也打开一个SUB套接字,连接到其他代理的PUB套接字,获取对端的状态信息。
  • 每个代理需要有一种异步地将任务委托(delegate)给对端并且取得回应的方法。这里我们将使用router/router套接字。每个代理有两个这种套接字:一个用于收到的任务,一个用于委托的任务。如果不使用两个套接字,则需要做更多的工作,以便知道我们是在读取请求还是回应,也就是要在消息信封中添加更多信息。

当然,代理与其本地客户端和工作者直接也存在信息流。

6.5 关于命名

3个信息流×每个流2个套接字=代理需要管理6个套接字。这种情况下选择好的命名方式很重要。套接字做什么事情应该是其名字的基础。

3个信息流是:

  • 代理和其客户端和工作者之间的本地请求-应答流
  • 代理和其对端之间的云请求-应答流
  • 代理和其对端之间的状态流

每个流都涉及到两个套接字,可以将其命名为“frontend”和“backend”。最终我们选择的名称是:

  • 本地流:localfe和localbe
  • 云流:cloudfe和cloudbe
  • 状态流:statefe和statebe

我们对每个传输端点都使用ipc协议。ipc在连接方面与tcp相同(都是可断开的),但是ipc不要求IP地址或者DNS名称。本例中处理IP地址和DNS会比较复杂。我们使用命名为sth-local、sth-cloud、sth-state这样的ipc端点名,其中sth是模拟集群中的名字。

 

注意:我们把每个代理中的cloudbe连接到其他每个代理中的cloudfe,也将每个代理中的statebe连接到其他每个代理中的statefe。

6.6 原型化状态流

对于粗心的人,每种套接字信息流都有其小小的陷阱,所以我们在真实的代码中一个个地测试它们,而不是一次抛出所有代码。调试验证通过每种信息流之后,再把它们放到一起形成完整的程序。

 

#include <stdint.h>
#if defined(WIN32)
typedef int64_t ssize_t;
#endif
#include <zmq.h>
#include <czmq.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
#pragma comment(lib,"czmq.lib")
int main(int argc,char* argv[]){
if (argc < 2){
printf("语法: InterBroker.exe 自身名称 其他代理名称列表\n");
return -1;
}
srand((unsigned int)zclock_time());
zctx_t* ctx = zctx_new();
void* statebe = zsocket_new(ctx,ZMQ_PUB);
zsocket_bind(statebe,"tcp://%s:5800",argv[1]);
void* statefe = zsocket_new(ctx,ZMQ_SUB);
zsocket_set_subscribe(statefe,"");
// 连接到各个对端
for(int idx = 2; idx < argc; ++idx){
zsocket_connect(statefe,"tcp://%s:5800",argv[idx]);
}
while(true){
zmq_pollitem_t items[]={{statefe,0,ZMQ_POLLIN,0}};
if (zmq_poll(items,1,1000 * ZMQ_POLL_MSEC) < 0){
break;
}
// 收到状态报告
if (items[0].revents & ZMQ_POLLIN){
char* p_name = zstr_recv(statefe);
char* p_cnt = zstr_recv(statefe);
printf("%s: %s 个工作者空闲\n",p_name,p_cnt);
free(p_name);
free(p_cnt);
}
// 发布自身状态
else{
zstr_sendm(statebe,argv[1]);
zstr_sendf(statebe,"%d",randof(10));
}
}
zctx_destroy(&ctx);
return 0;
}

关于代码的注解:

  • 每个代理都有一个标识,我们使用这个标识来构造ipc端点名。真实的代理需要工作在TCP之上,需要更复杂的配置模式。我们会在后面的章节中讨论配置模式,但是现在只是使用生成的ipc名称,而忽略如何取得TCP/IP地址或者名字的问题。
  • 程序的核心是一个zmq_poll循环。这个循环处理进入的消息,发出状态消息。我们只在等待了1秒之后没有收到任何进入的消息时才发送状态消息。如果每次都发送状态消息,则会导致消息风暴。
  • 我们使用由发送者地址和数据构成的两段消息。因为需要知道状态发布者的地址才能向其发送任务,而得到这个地址的唯一方法是显式地作为消息的一部分进行发送。
  • 我们没有为订阅者设置标识,因为如果设置标识,则状态信息可能过期。
  • 我们没有为发布者设置HWM,但是如果使用ZeroMQ 2.x,则最好为发布者设置一下HWM。

可以三次运行这个程序来模拟三个集群。我们把它们称作DC1、DC2和DC3:

真实的应用不会定期发送状态信息,而是会在状态改变的时候发送,比如说工作者变得可用或者不可用的时候。这样说来状态消息可能会带来大的网络流量,但是状态信息很小,并且前面说过,集群之间具有超级快的连接。

6.7 原型化本地和云信息流

 

我们需要两个队列,一个用于本地客户端,一个用于云客户端。可以从本地和云前端中取出消息,分别放入到各自的队列中。但是使用ZeroMQ的时候,这是没有必要的,因为ZeroMQ套接字内部已经有队列了。

我们只在可以将请求发往某处的时候才从两个前端读取请求。前面的LRU队列代理就是这么做的,并且工作得很好。总是可以从后端读取消息,因为后端给出的回应可以路由回去。如果没有后端与我们通信,则没必要查看前端是否有数据到达。

这样,主循环的任务就是:

  • 轮询后端的活动。从后端取得的消息可能是工作者的“就绪”通知,也可能是一个回应。如果是回应,则通过本地或者云前端将其路由回原始的请求者。
  • 如果收到工作者的应答,则它成为可用的了,将其放入队列并且进行计数。
  • 如果有可用的工作者,则从前端取得一个请求,随机地将其路由给一个本地工作者,或者路由给云对端。
  • 随机地将任务发送给对端代理而不是本地工作者,以模拟集群间的工作分发。

我们使用代理标识来在代理之间路由消息。我们在命令行中为代理提供了一个名字。只要这个名字不与ZeroMQ生成的、用于客户端节点的UUID相同,我们就可以判断应该将回应路由给一个客户端,还是给一个代理。

启动两个代理的示例:

 

关于代码的注解:

  • 使用zmsg类让代码更加简洁。
  • 因为我们不获取对端的状态信息,所以总是认为对端正在运行中。代码会提示你确认已经启动所有代理。真实的情况中最好不要给没有告知我们自己存在的代理发送任何数据。

如果有错误路由的消息,则客户端会阻塞,而代理会停止打印跟踪信息。你可以杀掉一个代理来证明这一点。另一个代理会试图发送请求到云中,而其客户端会一个个地阻塞,等待回应。

6.8 集成

关于代码:

  • 客户端线程会检测和报告失败的请求。客户端轮询回应,如果在一定时间内(10秒)回应没有到达,则打印错误信息。
  • 客户端线程不直接打印,而是将消息发送给一个“监视”套接字(PUSH),主循环会收集(PULL)并且打印消息。这是我们第一次将ZeroMQ套接字用于监视和日志,后面的章节中有更多的这种用法。
  • 客户端模拟各种负载,让集群在随机的时刻满载,以致任务被移动到云中。客户端和工作者个数,以及客户端和工作者线程中的延时控制着这一点。你可以自由地调整这些参数,看看能不能达到更真实的模拟。
  • 主循环使用了两个zmq_poll。实际上可以使用三个:信息、后端、前端。前面已经讲过,如果后端没有空闲处理能力,则没有必要从前端获取消息。

这个程序有一些问题:

  • 客户端可能会因为请求或者回应丢失在某处而冻结。前面讲过,ROUTER套接字会丢弃无法路由的消息。第一种解决方案是修改客户端线程,检测和报告这种错误。第二种方案是在主循环中的每个recv()之后、send()之前放置zmsg_dump()调用,直到找出问题所在。
  • 主循环错误地从多个就绪的套接字中读取数据。这会造成第一个消息丢失。请修复这个问题,让代码只从第一个就绪的套接字读取数据。
  • zmsg类没有合理地将UUID编码成C字符串。这会让带有0字节的UUID导致程序崩溃。请修改zmsg,让其将UUID编码成可打印的十六进制字符串。

模拟没有检测云对端的消失。如果启动多个对端,然后停止其中一个,则因为它已经将自身的处理能力广播给其他对端了,其他对端还是会继续给它发送工作。你可以试试看,这会让客户端抱怨丢失请求。有两种解决方案:第一,只短期保留处理能力信息,如果对端消失,迅速地将其处理能力清零。第二,为请求-回应链添加可靠性。下一章将讨论可靠性。

最后编辑:
作者:wy182000
这个作者貌似有点懒,什么都没有留下。

留下一个回复