zmq接口zmq push pull到pull收到延时了2s才收到消息是怎么回事

developerWorks 社区
随着多核时代的来临,需要与之相应软件技术来充分发挥多核硬件的优势。本文着眼于多进程的软件架构,介绍了 ZeroMQ 作为一种的消息队列,在处理并发问题上具有的独特优势。同时将简要介绍这一技术在 OpenStack 基础云架构中的应用。
, 软件工程师,
陈舜子是IBM 中国软件开发中心工程师,目前从事宁波智慧物流平台的研发工作,有5年JAVA EE应用开发经验,对信息安全及云平台相关技术有浓厚兴趣,email地址chszchen@。
, 软件工程师,
石芮,IBM CDL 软件工程师, 具有丰富的自动化测试经验、RFT 应用经验、网格操作经验、以及 Web application 开发经验。
, 测试工程师,
王莹,IBM GTS 测试工程师,从事软件测试有十年以上经验,精通软件自动化测试各种方法和工具,善于高效完成软件测试任务。
多核时代的挑战传统并发策略的弊端由于在提高 CPU 频率上遇到的瓶颈,当前多核架构已经成为提高 CPU 计算能力的主要方式,并且在可以预见的时间内,一台计算机尤其是大型机可以拥有的核数乃至 CPU 数仍将不断提升。然而传统的软件技术在处理并发问题能力上的滞后性却使得多核硬件技术无法充分发挥其优势。传统的软件开发语言通常使用线程来处理并发问题,这种做法存在一些固有的弊端,比如:
同步缺失:线程之间共享内存数据,当多个线程访问同一数据而其中一个线程存在修改数据的行为时,就会出现 race-condition 而导致程序出错。开发人员必须对各线程中访问同一数据的代码段使用各种同步机制进行同步,比如设立 critical
section,使用 Semaphore 控制访问。
同步粒度设置不当:为了避免同步缺失问题,必须使得不同线程中的某些代码段顺序执行而非并发执行,这样的区域成为关键区域。显而易见,关键区域越大,整个程序的并发性就越差;但若将关键区域设置的太小,又容易导致同步缺失的问题。
可分解的读写操作:即使是最简单的 32 位或是 64 位的数据类型读写操作,也未必是原子操作,这需要程序员对系统的底层实现有深入了解。而一旦它们不是原子操作,就需要进行同步处理。
潜在的代码优化:当前的 CPU 和代码编译器通常具有足够的智能来对接收的代码进行优化,这通常会导致代码的实际执行顺序与开发人员所给出的顺序在局部有所差别。这增加了处理同步缺失问题的难度。
死锁:一旦我们开始使用锁来处理同步问题,就引入了潜在的死锁危险。当多个线程已经持有的资源和它们试图访问的资源形成一个环路,就会导致所有的线程都因为无法继续执行而处于停滞的状态,进而导致程序失去响应。
循环让步:为了避免死锁的状况,通常会采取让无法继续执行的线程让步给其他线程的情况,但若第二个线程也无法继续执行又让步给前一个线程,那么这两个线程就会不断的在挂起和唤醒两个状态之间进行切换。而线程切换是一种耗费 CPU 的工作。 优先级倒置:通过在运行过程中动态调整线程优先级来让低优先级的线程获得执行机会的策略,可能使得原本高优先级的任务被原本低优先级的任务所阻塞。即便我们的开发人员有足够的能力来处理好上述问题,我们也要为此付出额外的代价: 编写和维护多线程的应用是非常耗费时间和精力的工作。 因为处理多线程问题的难度,多线程应用的线程数往往有限。当计算机的 CPU 核数达到 16 个以上时 CPU 核数和应用线程数的差异往往导致 CPU 无法被充分利用。 线程同步机制存在的必要性使得在多线程应用中某些代码实际上仍然是顺序执行,这 CPU 的多核设计背道而驰。本文介绍了如何运用开源的 ZeroMQ 来解决上述问题,并且详述了 ZeroMQ 在主流云技术 Openstack 中的运用,给云计算开发人员解决此类问题提供思路和参考。从 Erlang 到 ZeroMQ理想化的并发应用必须能够适应任何数量的 CPU 核数,杜绝锁的使用,在多数情况下,其编程实践应该与单线程开发一致而无需额外的繁琐同步处理。Erlang 语言为我们提供一个范例,它具备如下优点:(注:这里的进程与操作系统进程并无直接对应关系) 快速的创建和销毁进程 能从容应对 10000 个以上的并发进程。 进程间基于消息传递进行快速的异步通信。 基于消息传递,进程间实现数据零共享。 对进程状态进行实时监控。 选择性的消息接收。这是一种与多线程机制截然不同的并发策略,其关键在于通过消息传递 (Messaging) 来实现进程间的通信而非共享内存数据。进程首先将消息发送给消息队列,后者再将消息传递给目标进程,这一过程无需使用锁来进行同步。然而 Erlang 的受众仍十分有限,需要有一种受众更广,更易于使用的技术,来最大限度的发挥多核硬件的能力。这正是创立 ZeroMQ 的初衷,它在继承了 Erlang 优秀的并发特性的基础上,更具备如下优势: 开放源码,拥有一个活跃的开源社区。 基于广为人知的 BSD socket 接口,易于使用。 实现了多种常见的消息通信模型,易于和实际问题相结合。 兼容多数编程语言、操作系统和硬件,具有良好的可移植性。 没有消息中间件,这避免了 Single Point of Failure,减少了维护成本。ZeroMQ 针对不同应用场合的消息传递模型ZeroMQ 简介ZeroMQ(也拼写作?MQ,0MQ 或 ZMQ) 是一个为可伸缩的或并发应用程序设计的高性能异步消息库。它提供一个, 但是与不同,ZeroMQ 的运行不需要专门的()。该库设计成常见的风格的 。ZeroMQ 是由 iMatix 公司和大量贡献者组成的社群共同开发的。ZeroQ 通过许多第三方软件支持大部分流行的编程语言,从
和 。上一节中我们提到,走出传统并发策略困境的关键在于,以消息传递取代内存数据共享来进行进程(线程)间的通信。而 ZeroMQ 正是通过为传统的 Socket 接口赋予消息队列的能力来实现这一目标。Request-reply 模式Request-reply 是 ZeroMQ 提供的最常用的消息传递模式之一。在这种模式下,客户端进程发起请求,服务器端进程接受请求并返回响应给客户端。客户端和服务器端进程都可以有多个。清单 1 和清单 2 实现了一个简单的“请求-应答”应用的服务器端和客户端。在 HelloWorldServer.py 中,我们首先创建了一个 socket 对象,将它绑定到一个特定的地址。一旦接受到客户端的请求,就发送内容为”World”的回复。清单 1.
HelloWorldServer.pyimport zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# 等待下一个来自客户端的请求
message = socket.recv()
print "Received request: ", message
# 休眠时间
time.sleep (1) # Do some 'work'
# 发送回复给客户端
socket.send("World")清单 2.
HelloWorldClient.pyimport zmq
context = zmq.Context()
# 发送给服务器端的 Socket
print "Connecting to hello world server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:5555")
# 发送 10 个请求,每次都等待相应
for request in range (10):
print "Sending request ", request,"..."
socket.send ("Hello")
# 收到回复
message = socket.recv()
print "Received reply ", request, "[", message, "]"从表面上看这种风格与传统的 socket 十分相似,但实际上它们有重大的差别。首先,ZeroMQ 的 socket 是面向消息的,我们从 socket 里直接获得消息字符串,而非字节流,发送亦然。其次,开发者无需关心负责底层通讯的连接的管理,这种连接可能是传统的 socket 连接,也可能基于其他协议。这些底层连接的创建,销毁,重连以及它如何确保消息被有效的发送,都由 ZeroMQ 负责管理。最后,ZeroMQ 的 socket 之间的连接不受任何限制,而传统的 socket 之间往往无法建立多对多的连接。因此,ZeroMQ 的 socket 可以被看作一个功能完善的消息队列。REQ 类型的 socket 通常被用来发送请求,并且只有在收到第一个请求的回复之后,才能发送第二个请求。在 HelloWorldClient.py 中该 REQ 类型的 socket 只连接到了一个地址,但它也可以连接多个地址。在这种情况下,ZeroMQ 将确保消息被均匀的发送给每个地址,但每次只有一个地址会受到请求。REP 类型的 socket 用于接受请求。它必须在发送第一个请求的回复之后才能接受第二个请求。尚未来得及处理的请求按顺序被置于队列中。REQ 和 REP 类型的 socket 在消息发送和接受的操作序列上存在严格限制,为了应对更复杂的情况,ZeroMQ 也提供了更为灵活的 socket 类型,这就是 DEALER 和 ROUTER。DEALER 和 REQ 的区别在于,它可以按照任意的次序执行发送消息和接受消息的操作,而不必等待上一个请求的回复。同样,ROUTER 也不必等待发送上一次请求的响应完成就能接受第二个请求。此外,ROUTER 会为请求加上标识以记录最初请求者的身份。这样一来它可以将该请求发送给其他进程处理,得到返回结果后,仍可以根据消息中的身份标识将该请求准确的返回给最初请求者。因此 ROUTER 和 DEALER 可以被用来实现类似于传统消息队列架构中的消息服务器的进程。Publish-subscribe 模式Publish-subscribe 是用于广播消息的模式,在这种模式下发布的消息将同时发送给多个节点。它包含 PUB 和 SUB 两种 socket 类型。与 Request-reply 不同,PUB 和 SUB 都只能进行单向的消息传递。PUB 只能发送消息,而 SUB 只能接受消息。清单 3,清单 4 是一个简单的 Publish-subscribe 模式的实现。从中我们可以看到,作为消息订阅者的 syncsub.py,将一个 SUB 类型 socket 绑定到‘tcp://localhost:5561’,这代表了一个单一的地址。而作为消息发布者的 syncpub.py,将一个 PUB 类型的 socket 绑定到‘tcp://*:5561’,这实际上匹配了多个地址。也就是说,凡是绑定到符合格式‘tcp://*:5561’地址的任何 SUB 类型的 socket 都可以接收到该 PUB 进程发布的消息。清单 3. syncsub.pydef main():
context = zmq.Context()
# 首先,连接我们的订阅 socket
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')
subscriber.setsockopt(zmq.SUBSCRIBE, "")
# 其次,跟 publisher 同步
syncclient = context.socket(zmq.REQ)
syncclient.connect('tcp://localhost:5562')
# 发送一个同步请求
syncclient.send('')
# 等待同步的回复
syncclient.recv()
# 第三,收到更新并且报告我们收到的数目
while True:
msg = subscriber.recv()
if msg == 'END':
print 'Received %d updates' % nbr
if __name__ == '__main__':
main()清单 4. syncpub.pyimport zmq
# 我们等待 10 个 subscribers
SUBSCRIBERS_EXPECTED = 10
def main():
context = zmq.Context()
# 发送给客户端的 socket
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://*:5561')
# 收到信号的 socket
syncservice = context.socket(zmq.REP)
syncservice.bind('tcp://*:5562')
# 从订阅者收到同步
subscribers = 0
while subscribers & SUBSCRIBERS_EXPECTED:
# 等待同步请求
msg = syncservice.recv()
# send synchronization reply
syncservice.send('')
subscribers += 1
print "+1 subscriber"
# 广播 1M 的更新后结束
for i in range(1000000):
publisher.send('Rhubarb');
publisher.send('END')
if __name__ == '__main__':
main()在这一实现中,我们还使用 REQ 和 REP 类型的 socket 对 SUB 和 PUB 进程进行了同步,仅当出现 10 个 SUB 进程时,PUB 进程才会开始发送消息。Pipeline 模式Pipeline 模式通常用于实现工作流的概念,每个进程负责整个处理流程中的一个步骤。每个步骤接受上一步的处理结果,并将自己的处理结果传递给下一步。每一步可以有多个备选进程。它包含 PUSH 和 PULL 两种 socket 类型。与 PUB 和 SUB 类型的 socket 类似,这两种 socket 都只能做单向消息传递,PUSH 只能发送消息,PULL 只能接受消息。因此通常一个进程需要同时包含这两种类型的 socket。此外,与 PUB 不同的是,PUSH 只会将消息发送给单个 PULL 节点。Exclusive pair 模式这种模式仅适用于两个特定节点之间互相传递消息的场合。ZeroMQ 核心机制分析在上一节中我们简要介绍了 ZeroMQ 针对各种不同应用场合提供的各种消息传递模型。然而消息队列技术发展至今,有许多成功的产品,它们同样提供了这些模型。那么 ZeroMQ 与这些传统的消息队列技术相比有什么独特的优势呢?可以说,其中最大的区别就是 ZeroMQ 弱化了消息中间件的概念。在传统的消息队列系统中,消息中间件扮演着核心的角色。进程间并不是直接交互,而是连接到消息中间件,由消息中间件确保消息的有效传递。这种模式的优点是: 进程本身不需要维护其他进程的地址信息,而只需知道消息中间件的地址。无论是增加、删除或者修改消息接收进程的地址都不会对消息发送进程产生影响,反之亦然。
进程的生命周期不会存在依赖。消息发送进程在发送消息时不必担心消息接收进程尚未启动而导致消息丢失,因为消息中间件会负责保存消息并在消息接收进程启动时将消息传递给它。 即使消息接收进程和消息发送进程都产生故障,消息仍然可以保留在消息中间件上。尽管如此,消息中间件的缺点也同样明显: 极大的增加了网络开销。本可以由进程 A 发送给进程 B 的消息,现在必须先由进程 A 发送至消息中间件,再由消息中间件发送给进程 B。如图 1,图 2 所示。
消息中间件可能成为整个系统的瓶颈。由于所有的消息都必须经由消息中间件,容易导致消息中间件负载过高,而此时其他进程却因为无法接受到消息而处于空闲的状态。系统的整体效率因此而大幅降低。图 1.
存在消息中间件时的消息传递图 2.
没有消息中间件时的消息传递不难看出,不管是使用单一的消息中间件或者是完全不使用消息中间件,都不是完美的解决方案。在实际应用,往往需要采用一些折中的解决方式,比如:
用目录服务取代消息中间件。消息中间件的主要作用之一体现在它管理着所有参与消息传递的进程地址。当整个系统中存在上百个乃至更多这样的进程时,这种统一管理的服务是不可或缺的。但是在这种庞大的系统中,如果同时由消息中间件负责消息传递,容易导致其成为系统的瓶颈。因此将目录服务和消息传递服务进行分离,消息中间件仅提供目录服务而将消息传递功能交由各个进程自身去实现,可以在保证进程有效管理的同时,有效分散系统的负载到各个进程上,提高系统的整体效率。 当系统的总进程数较多时,可以建立多个消息中间件,每个中间件管理部分进程。 同理,可以建立多个目录服务来避免 SPOF(Single Point of Failure),提高系统的稳定性。由于 ZeroMQ 本身并不依赖于消息中间件,因此开发者可以根据实际情况来选择合适的消息传递模型。而传统的消息队列技术因为过度依赖现有的消息中间件产品,难以提供这种灵活性。ZeroMQ 在 Openstack 中的应用Openstack 作为一个开源的 IaaS 平台为人们所熟知,并在近年来伴随着云计算的兴起而成为热点。Nova 是 Openstack 中最为核心的组件,负责完成与虚机相关的各种操作。Nova 采用了多进程的架构,通过消息传递来完成各进程间的相互协作,并且提供了一个基于 ZeroMQ 的实现。Nova 中的消息传递在 Nova 的诸多进程中,nova-api 负责提供统一的对外接口,nova-scheduler 负责为虚机选择合适的物理机宿主,nova-compute 负责虚机的创建和启停等操作,nova-network 和 nova-volume 分别负责虚机的网卡和存储的相关操作。这些进程之间完全依赖消息传递来进行通信。图 3.
创建新虚机操作的消息传递流程以图 3 中的创建新虚机操作为例,首先用户将新虚机的规格参数提供给 nova-api。nova-api 将用户提供的参数组装成一个创建虚机的请求,以消息的形式发送给 nova-scheduler。nova-scheduler 根据相关策略为新的虚机选择一个合适的物理机宿主,然后发送消息给 nova-compute,要求在该物理机上按要求创建一个新的虚机。当所有操作都完成后,nova-scheduler 再将所创建虚机的信息封装成消息发送给 nova-api,最终返回给用户。ZeroMQ 在 Nova 中的应用为了实现上一节中所描述的架构,Nova 利用 ZeroMQ 建立了一套消息传递机制,提供以下操作:
CALL 和 MULTICALL。这类操作发送请求给其他进程,然后等待回应。尽管看起来 REQ 和 REP 类型的 socket 非常适合这种场景,但是它们对消息传递操作的顺序有严格限制,无法并发处理请求,这显然会降低系统的工作效率。Nova 的开发者结合 PUB,SUB,PUSH 和 PULL 四类型的 socket 来实现这类操作。完成一个 call 操作,通常涉及三个进程,如图 4 所示。与普通的 Request-reply 模式相比,这里引入了响应中转进程来转发响应给请求发起进程。这样做的好处是,请求响应进程无需知晓请求发起进程的存在,减轻了进程间的依赖程度。MUTICALL 是可以同时发送请求给多个响应进程的操作。图 4. call 操作的实现流程
CAST 和 NOTIFY。这类操作发送指令给其他进程,但并不需要获得回应,可以看做是 CALL 的简化版。在 Nova 里,这类操作也是由 PULL 和 PUSH 类型的 socket 实现的。 FANOUT-CAST。这类操作用于发送广播指令,由 PUB 和 SUB 类型的 socket 实现。图 5. 创建新虚机操作的实现结合这些操作,图 3 可以重新描述如下,如图 5 所示。值得一提的是,nova-scheduler 和 nova-compute 之间的 CAST 和 NOTIFY 是两个异步的操作,即 nova-scheduler 通过 cast 操作发出部署请求后,就已经发送响应给 nova-api,此时虚机处于部署中的状态。当部署完成时,nova-compute 另行通过 NOTIFY 发送通知给 nova-scheduler,将虚机状态设置为部署完毕。此外,在创建虚机的过程中,nova-scheduler 也会发送相应的事件给其他进程,以便进行协调。结束语通过以上各章节,我们了解了 ZeroMQ 的来历、基本功能以及在开源 IaaS 框架 Openstack 中的应用。总而言之,ZeroMQ 为开发者提供了简单易用、功能完善的消息通讯机制,能够很好的解决多进程并发应用中各进程间通信协作的问题。同时,ZeroMQ 不依赖于消息中间件,这为开发者提供了足够的灵活性,根据实际问题的需要来构建适应性更强的应用。
参考资料 ,本文从背景到 ZeroMQ 的三种基本模型及其通讯方式进行讲解,是初学者的好帮手。,介绍 ZeroMQ 的基本概念等,简单通俗易懂。。。。 developerWorks 中国:。
提供了有关云计算的更新资源,包括 云计算 。更新的 ,让您的开发变得轻松,
帮助您成为高效的云开发人员。连接转为云计算设计的 。关于
的活动聚合。加入 ,您可以在这里提问、解答并获取更多有关 Bluemix 的技术问题。加入 ,查看开发人员推动的博客、论坛、组和维基,并与其他 developerWorks 用户交流。
developerWorks: 登录
标有星(*)号的字段是必填字段。
保持登录。
单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件。
在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。
所有提交的信息确保安全。
选择您的昵称
当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。昵称长度在 3 至 31 个字符之间。
您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。
标有星(*)号的字段是必填字段。
(昵称长度在 3 至 31 个字符之间)
单击提交则表示您同意developerWorks 的条款和条件。 .
所有提交的信息确保安全。
文章、教程、演示,帮助您构建、部署和管理云应用。
立即加入来自 IBM 的专业 IT 社交网络。
免费下载、试用软件产品,构建应用并提升技能。
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=Cloud computingArticleID=1032645ArticleTitle=基于 ZeroMQ 优化处理云计算中的并发问题publish-date=zeroMQ初体验-3.分而治之模式(push/pull) - 风吹鸡蛋壳 - ITeye技术网站
博客分类:
push/pull模式:
模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)
上游代码:
import zmq
import random
import time
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
print "Press Enter when the workers are ready: "
_ = raw_input()
print "Sending tasks to workers..."
# The first message is "0" and signals start of batch
sender.send('0')
# Initialize random number generator
random.seed()
# Send 100 tasks
total_msec = 0
for task_nbr in range(100):
# Random workload from 1 to 100 msecs
workload = random.randint(1, 100)
total_msec += workload
sender.send(str(workload))
print "Total expected cost: %s msec" % total_msec
工作代码:
import sys
import time
import zmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process tasks forever
while True:
s = receiver.recv()
# Simple progress indicator for the viewer
sys.stdout.write('.')
sys.stdout.flush()
# Do the work
time.sleep(int(s)*0.001)
# Send results to sink
sender.send('')
下游代码:
import sys
import time
import zmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
# Start our clock now
tstart = time.time()
# Process 100 confirmations
total_msec = 0
for task_nbr in range(100):
s = receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(':')
sys.stdout.write('.')
# Calculate and report duration of batch
tend = time.time()
print "Total elapsed time: %d msec" % ((tend-tstart)*1000)
注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个"堵塞问题"的一个解决策略吧)
由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:
这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。
(未完待续)
浏览 10210
浏览: 302488 次
来自: 上海
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
Map&String, Object& args
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...高级编程(6)
&&&名字中Zero的含义:
无需代理(zero broker)零延迟(zero latency as possible)
&&&Zero也表示设计目标:
零管理零开销零浪费
&&&概括地说,Zero表示最简化的设计理念:通过移除复杂性来增强能力,而不是通过增加新功能。
2 套接字API
&&&ZeroMQ中的套接字管理分成四个部分:
创建和销毁套接字:zmq_socket和zmq_close设置和获取套接字选项:zmq_setsockopt和zmq_getsockopt通过创建连接来将套接字插入到网络拓扑结构中:zmq_bind和zmq_connect使用套接字收发消息:zmq_recv和zmq_send
&&&一般来说,ZeroMQ应用中,套接字属于ZeroMQ,消息属于你的代码。
3 将套接字插入到拓扑结构中
&&&要在两个节点间创建一个连接,则需要对一个节点调用zmq_bind,对另一个调用zmq_connect。一般来说,调用zmq_bind的节点是服务器,使用众所周知的网络地址;而调用zmq_connect的节点是客户端,使用未知或者任意的网络地址。
&&&ZeroMQ连接与TCP连接的主要不同之处:
ZeroMQ可使用多种传输端点(inproc、ipc、tcp、pgm或者epgm)客户端调用zmq_connect的时候连接就存在了,不论此时服务器是否已经调用zmq_bindZeroMQ是异步的,在必要的时候会有队列根据两端使用的套接字的类型,连接可以表达某种“消息模式”一个套接字可以有多个输出和输入连接没有zmq_accept函数。绑定到端点的时候套接字会自动开始接受连接。ZeroMQ应用不能直接使用相关的底层连接(如TCP连接),它们被封装在ZeroMQ套接字之中了。
&&&&如果客户端先启动,开始发送消息,而此时服务器还没有启动,则消息会被存放到消息队列中(当前前提是队列还没有满),等服务器启动后才开始投递。
&&&&服务器节点可以绑定到多个端点;当然,客户端也可以连接到多个端点。客户端每次调用zmq_connect都会使得服务器端的套接字获得一个新的连接。
&&&&套接字有不同的类型。套接字类型定义了套接字的语义,以及路由消息和消息排队的策略等等。套接字类型与“消息模式”相关。
&&&&以不同方式连接套接字的能力给了ZeroMQ作为一个消息队列系统所需要的基本能力。使用ZeroMQ定义网络体系的时候,只需要把各个部分插接到一起,就像玩积木一样。
4 使用套接字传输数据
&&&&使用zmq_send和zmq_recv来发送和接收消息。在传输数据方面,ZeroMQ套接字与TCP套接字的主要不同是:
ZeroMQ套接字传输消息,而不是字节(如TCP)或者数据包(如UDP)。消息是带有长度指定的二进制数据块。ZeroMQ套接字在后台线程中进行IO。这意味着,在应用进行其他事情的同时,消息可以到达本地输入队列,也可以发送本地输出队列中的消息。当然,队列是可以配置的。根据类型的不同,套接字可以连接到多个其他套接字,或者被其他多个套接字连接。TCP就像一对一的电话呼叫,而ZeroMQ实现了一对多(像无线电广播)、多对多(像邮局)、多对一(像邮箱),以及一对一模式。ZeroMQ可以将消息发送到很多端点(扇出模型),或者从多个端点接收消息(扇入模型)。
&&&&zmq_recv可以从所有连接上收集消息。它使用公平排队算法,让每个发送者都有机会把数据发送给接收者。
&&&&zmq_send不会真正把消息发送到套接字连接中。它只是对消息进行排队,以便IO线程可以异步地发送消息。除少数特殊情况外zmq_send不会阻塞。
5 单播传输端点
单播传输端点包括inproc、ipc和tcp。大多数情况下请使用tcp。tcp是可断开的(disconnected)传输端点。“可断开”的意思是:执行连接的时候,不要求对端已经存在。客户端和服务器可以在任何时刻进行绑定和连接,可以下线和上线,而这对应用是透明的。ipc是进程间传输端点,与tcp类似,只是只能用在局域网中,不需要指定IP地址或者域名。ipc传输端点也是“可断开的”。当前ipc还不能在Windows中工作,以后的版本中也许可以。在Unix系统中工作的时候需要合适的权限,否则不同用户的进程可能没法共享传输端点。inproc是线程间(进程内?)传输端点。inproc是一种连接的信号传输端点(connected signaling transport)。inproc比tcp和ipc要快很多。与tcp和ipc的最大不同是:必须在connect之前调用bind。未来版本的ZeroMQ可能不会要求必须这样,但是当前是这样的。
6 ZeroMQ不是中性载体
&&&&ZeroMQ不是中性载体,它在传输层上增加了自己的帧结构。这种结构不能与现有的协议(如HTTP)兼容。也就是说,ZeroMQ不是像boost.asio那样直接使用传输层传输客户的数据,而是额外增加了少量管理信息,这就导致了不兼容。比如说,如果写一个ZeroMQ应用,给Web服务器发送HTTP请求,则Web服务器没法理解请求,因为ZeroMQ在HTTP请求上增加了管理信息,发出的数据已经不是正确的HTTP请求了。
7 核心消息模式
&&&&ZeroMQ迅速高效地将数据块(消息)传递到节点中。节点可以映射到线程、进程,或者机器。应用程序只需要使用单个简单的套接字API就可以了,而不用管实际的传输端点是什么(进程内、进程间、TCP,或者多播等)。ZeroMQ会自动处理重连。必要的时候ZeroMQ会在发送端和接收端进行消息排队。ZeroMQ仔细地管理消息队列以确保进程不会耗尽内存,必要的时候会把消息存放到磁盘中。ZeroMQ处理了套接字错误。ZeroMQ在后台进行所有IO。ZeroMQ使用无锁(lock-free)技术进行节点间的通信,所以不需要信号量、锁和等待,也不会死锁。
&&&&ZeroMQ根据消息模式进行消息的排队和路由。正是消息模式给了ZeroMQ智能。消息模式封装了难以学到的、用最好的方式分布数据和任务的经验。ZeroMQ的消息模式是硬编码到库中的,未来的版本也许会允许用户自定义模式。内建的消息模式有:
请求-应答模式:连接一系列客户端和一系列服务。这是一种远程过程调用和任务分布模式。发布-订阅模式:连接一系列发布者和一系列订阅者。这是一种数据分布模式。管线模式:以扇出/扇入模式连接节点,可以有多步和循环。这是一种并行任务分布和收集模式。专用套接字对模式:与TCP套接字类似,连接两个套接字,用于特定的高级情况。
&&&&上述四种核心消息模式是内建在ZeroMQ中的,是API的一部分,由C++库实现。在这四种模式的基础上还有高层消息模式。高层模式不是库的一部分,不包含在ZeroMQ包中,而是ZeroMQ社区的组成部分。
&&&&消息模式由匹配的套接字对实现。要理解消息模式必须理解套接字类型以及它们如何协同工作。有效的套接字组合有:
&&&&其他类型的套接字组合是非法的,将会导致不确定的结果,未来版本的ZeroMQ可能会在使用非法套接字对的时候返回错误。当然,可以用代码将各种类型的套接字桥接起来。
8 使用消息
&&&线路中的ZeroMQ消息是适合于内存的零字节或者多字节数据。你需要使用Google Protocol Buffers、XDR、JSON或者其他方式来进行序列化。
&&&内存中的ZeroMQ消息是zmq_msg_t结构体(或者类,取决于使用的编程语言)。在C中使用ZeroMQ消息的基本规则是:
创建和传递zmq_msg_t对象,而不是数据块。读取消息:调用zmq_msg_init创建空的消息,然后传递给zmq_recv。写入消息:使用zmq_msg_init_size创建指定大小的消息,使用memcpy填入数据,然后传递给zmq_send。释放(不是销毁)消息:调用zmq_msg_close。这会减小引用计数,最终ZeroMQ会销毁消息。使用zmq_msg_data获取消息内容指针;使用zmq_msg_size得知消息内容大小。除非精确地知道自己在做什么,不要使用zmq_msg_move、zmq_msg_copy或者zmq_msg_init_data。
&&&一旦把消息传递给zmq_send,ZeroMQ会清除(clear)消息,也就是设置其大小为零。发送之后就不能访问消息了,不能两次发送同一个消息。如果确实要两次发送相同的消息,正确的做法是:使用zmq_msg_init创建一个新的消息对象,然后调用zmq_msg_copy创建第一个消息的复本。zmq_msg_copy不会复制消息内容,而只是增加引用计数。这样就可以两次(或者多次,如果你创建多个复本)发送相同的消息了。ZeroMQ会在最后一个复本发送完成或者关闭后销毁消息。
&&&关于消息,还应该知道:
ZeroMQ原子地发送和接收消息,要么得到整个消息,要么得不到消息。ZeroMQ不会立即发送消息,而是有一些不确定的延迟。可以发送零字节的消息,这通常用于从一个线程向另一个线程发送信号。消息必须适合于内存。如果要发送任意大小的文件,应该把文件切分成适合于内存的片段,将每个片段作为单独的消息发送。使用没有自动垃圾回收功能的语言编程时,使用完消息之后必须调用zmq_msg_close。
&&&&再次强调,暂时不要使用zmq_msg_init_data。这个零拷贝方法现在只会给你带来麻烦。
9 处理多个套接字
低效的方式
ZeroMQ推荐的方式
&&&&&更好的方法是将zmq_poll封装到一个框架中,成为一个优良的事件驱动的反应器。
10 处理错误和ETERM
&&&&我们认为,进程应该对内部错误是脆弱的,但是对外部攻击和错误则尽可能地健壮。不清楚某个错误是内部的还是外部的,这是一个设计上的错误,应该被修正。
&&&&真正使用的代码应该对每个ZeroMQ调用做错误处理。其他语言绑定可能会帮你处理错误,但是使用C则要求你自己处理错误。关于错误处理,一般有下述原则:
如果失败,创建对象的方法会返回NULL错误码由errno或者zmq_errno()提供zmq_strerror可以提供用于日志的描述性错误文本如果线程带NOBLOCK选项调用zmq_recv,并且没有等待收取的数据,则ZeroMQ会返回-1并且设置errno为EAGAIN如果线程调用zmq_ctx_destroy,同时还有其他线程在进行阻塞的工作。则zmq_ctx_destroy调用会关闭上下文,而所有的阻塞调用会返回-1,并且errno被设置为ETERM
&&&我们来看看在前面的并行管线示例中如何正确地结束进程。设想我们在后台启动了很多工作者进程,现在批处理完成了,需要杀死这些工作者进程。我们通过发送kill消息来杀死工作者进程。最好是在汇聚点做这个工作,因为只有汇聚点真正知道什么时候批处理完成了。
&&&如何将汇聚点连接到工作者?PUSH/PULL套接字仅仅是单向的。ZeroMQ的标准答案是:为需要解决的每种问题创建一个新的套接字流程。我们使用发布-订阅模型来向工作者发送kill消息:
汇聚点在一个新的端点创建一个PUB套接字工作者绑定其输入套接字到这个端点汇聚点检测到批处理完成时,通过PUB套接字发送一个kill消息工作者检测到kill消息时退出
&&&汇聚点只需要增加少量代码:
工作者代码&&
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&receiver&=&zmq_socket(ctx,ZMQ_PULL);
&&&&zmq_connect(receiver,&tcp://localhost:5557&);
&&&&void*&sender&=&zmq_socket(ctx,ZMQ_PUSH);
&&&&zmq_connect(sender,&tcp://localhost:5558&);
&&&&void*&notify_end&=&zmq_socket(ctx,ZMQ_SUB);
&&&&zmq_connect(notify_end,&tcp://localhost:5559&);
&&&&zmq_setsockopt(notify_end,ZMQ_SUBSCRIBE,&&,0);
&&&&zmq_pollitem_t poll_item[]&=&{
&&&&&&&&{receiver,0,ZMQ_POLLIN,0},
&&&&&&&&{notify_end,0,ZMQ_POLLIN,0},
&&&&while(zmq_poll(poll_item,2,-1)&&=&0){&&&&&&&&
&&&&&&&&if&(poll_item[0].revents&&&ZMQ_POLLIN){
&&&&&&&&&&&&// 接收任务
&&&&&&&&&&&&char*&p_work&=&s_recv(receiver);
&&&&&&&&&&&&fflush(stdout);
&&&&&&&&&&&&printf(&%s\n&,p_work);
&&&&&&&&&&&&// 执行任务
&&&&&&&&&&&&s_sleep(atoi(p_work));
&&&&&&&&&&&&free(p_work);
&&&&&&&&&&&&// 通知汇聚点已经完成一个任务
&&&&&&&&&&&&s_send(sender,&&);
&&&&&&&&if&(poll_item[1].revents&&&ZMQ_POLLIN){
&&&&&&&&&&&&break;
&&&&zmq_close(receiver);
&&&&zmq_close(sender);
&&&&zmq_close(notify_end);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
汇聚点代码&&
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&sink&=&zmq_socket(ctx,ZMQ_PULL);
&&&&zmq_bind(sink,&tcp://*:5558&);
&&&&void*&notify_end&=&zmq_socket(ctx,ZMQ_PUB);
&&&&zmq_bind(notify_end,&tcp://*:5559&);
&&&&// 等待任务发生器通知开始
&&&&char*&p_start&=&s_recv(sink);
&&&&free(p_start);
&&&&int64_t start_time&=&s_clock();
&&&&printf(&开始收集任务执行结果\n&);&&&&
&&&&// 收集个任务执行结果
&&&&int&task_
&&&&for(task_cnt&=&0;
task_cnt&&&100;&++task_cnt){
&&&&&&&&char*&p_done&=&s_recv(sink);
&&&&&&&&free(p_done);
&&&&&&&&printf(&.&);
&&&&&&&&if&((task_cnt&+&1)&%&10&==&0){
&&&&&&&&&&&&printf(&\n&);
&&&&&&&&fflush(stdout);
&&&&printf(&100个任务实际耗时: %d 毫秒\n&,int(s_clock()&-&start_time));
&&&&// 通知工作者批处理操作完成
&&&&s_send(notify_end,&DONE&);
&&&&s_sleep(1000);
&&&&zmq_close(sink);
&&&&zmq_close(notify_end);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
11 处理中断信号
&&&&真正实用的应用程序需要在被Ctrl-C或者SIGTERM等信号中断的时候正确地退出。默认情况下Ctrl-C和SIGTERM会杀死进程,那么消息不会被清空(flushed)、文件不会被正确地关闭等等。
处理Ctrl-C和SIGTERM的代码
在main函数开始处调用s_catch_signals(),这会建立信号处理。如果代码阻塞在zmq_recv、zmq_poll或者zmq_send上,则信号到达时调用会返回EINTR。s_recv()之类的封装函数会在被中断的时候返回NULL。所以应用程序应该检查EINTR、NULL返回值,或者s_interrupted。
典型代码片段
12 检测内存泄露
&&&&长期运行的应用需要正确地管理内存,否则可能会耗尽可用内存并且崩溃。如果使用自动管理内存的语言,那么不用关心这个问题。但是如果使用C/C++,则你就需要负责内存管理了。以下是使用valgrind检测内存泄露的简要步骤:
安装valgrind。在Ubuntu或者Debina中执行:sudo apt-get install valgrind。默认情况下ZeroMQ会让valgrind产生很多警告。要移除这些警告,请创建包含以下内容的valgrind.supp文件:
修改应用代码,使之在Ctrl-C之后正确地执行清理动作。对于自己退出的应用程序,这是不需要的,但是对于长期运行的应用,这是必须的,否则valgrind会对所有当前分配的内存发出警告。带-DDEBUG标志创建应用。这让valgrind可以准确地告诉你内存泄露的位置。最后,运行valgrind:
修复valgrind报告的错误之后可以得到以下输出信息:
13 多段消息
发送多段消息
接收多段消息
发送多段消息的时候,只有在给出最后一段之后,所有的分段才开始发送如果使用zmq_poll,则收到第一段的时候,其他段就也已经到达了要么收到消息的所有分段,要么一个也收不到消息的每一段都是一个单独的zmq_msg_t对象不论是否检查RCVMORE选项,都会收到消息的所有分段除非关闭套接字,否则没有方法取消部分发送了的消息
14 中间层和设备
&&&系统成员增加时,让各个成员相互了解的开销会迅速增加,为解决这个问题,可以将系统划分成较小的部分,然后用中间层连接各个部分。中间层(intermediaries)通常称作批发商(wholesalers)、分配器(distributors)、管理者(managers)等。
&&&增长到一定尺度时ZeroMQ网络也需要中间件。ZeroMQ中的中间件称作“设备(device)”。
&&&开始设计的时候,通常将应用程序建造为网络中一系列相互交流的节点,没有中间件。
&&&以后将应用扩展到更大的网络中时,在特定的位置放置设备,增加节点数。
&&&虽然没有严格的设计规则,但ZeroMQ设备通常将一系列“前端(frontend)”套接字连接到一系列“后端(backend)”套接字。设备通常是无状态的,这样才能用尽可能多的中间件来扩展应用。可以在进程中的线程里运行设备,或者作为单独的进程来运行。
&&&设备可以作为寻址、服务、队列或者其他你可以在消息和套接字层之上定义的抽象的中间件。不同的消息模式有不同的复杂度问题,需要不同类型的中间件。比如说,请求-应答模式可以很好地与队列和服务抽象配合工作;而发布-订阅模式可以很好地与流(streams)和主题(topics)配合工作。
&&&ZeroMQ与传统的中心化的代理的不同在于,你可以精确地将设备放置到你需要的地方,而设备能够优化地进行中介(intermediation)工作。
14.1 一个发布-订阅代理
&&&要求将发布-订阅体系扩展到多个网段或者传输端点是很常见的一种需求。可能有一组订阅者位于远程位置。可能我们想用多播向本地订阅者发布消息,而用TCP向远程订阅者发布消息。
&&&我们来写一个简单的代理服务器,它位于一系列发布者和一系列订阅者之间,桥接两个网络。这个可能是最简单的设备了。设备有两个套接字:面向内部网络(也就是天气服务器所在网络)的前端;面向外部网络中订阅者的后端。设备从前端套接字订阅天气信息,重新在后端套接字上发布。
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&frontend&=&zmq_socket(ctx,ZMQ_SUB);
&&&&zmq_setsockopt(frontend,ZMQ_SUBSCRIBE,&&,0);
&&&&zmq_connect(frontend,&tcp://localhost:5556&);
&&&&void*&backend&=&zmq_socket(ctx,ZMQ_PUB);
&&&&zmq_bind(backend,&tcp://*:8100&);
&&&&while(true){&&&&&&&&
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init(&report);
&&&&&&&&zmq_recvmsg(frontend,&report,0);&&&&
&&&&&&&&int&
&&&&&&&&size_t optlen&=&sizeof(hasmore);
&&&&&&&&zmq_getsockopt(frontend,ZMQ_RCVMORE,&hasmore,&optlen);
&&&&&&&&zmq_sendmsg(backend,&report,((hasmore&==&0)&?&0&:&ZMQ_SNDMORE));&&&&&&&&
&&&&&&&&zmq_msg_close(&report);
&&&&zmq_close(frontend);
&&&&zmq_close(backend);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
&&&我们称这个设备为“代理服务器(proxy)”,因为对于发布者而言,它是订阅者;对于订阅者而言,它是发布者。这样你可以把它插入到现有的网络中而不会影响它(当然新的订阅者需要知道与代理服务器交流)。
&&&这个应用可以正确地处理多段消息。设计设备的时候,总是应该让它可以正确地处理多段消息。
14.2 一个请求-应答代理
&&&前面的Hello World应用中,一个客户端与一个服务交流。但是真实的世界中我们通常有多个服务和多个客户端。我们来试着扩展服务的能力。唯一的限制是服务必须是无状态的,所有的状态信息都在请求中或者在某种共享存储器中,如数据库。
通常的方法
&&&有两种方法将多个客户端连接到多个服务器。通常的、暴力的方法是,让每个客户端套接字连接到多个服务端点。应用需要自己处理负载均衡。
&&&如上图所示,应用自己处理负载均衡,将请求R1和R4发送给服务A,将请求R2发送给服务B,将请求R3发送给服务C。
&&&这个设计中可以很容易地增加更多客户端,也可以增加更多服务。每个客户端会将自己的请求均衡到多个服务中。但是客户端需要了解服务的拓扑结构。如果有100个客户端,然后要增加3个服务,则需要重新配置和启动这100个客户端,以便让客户端了解新的3个服务的信息。而这显然不是在我们的超级计算集群耗尽资源,我们需要增加数百个服务节点时想做的事情。太多固定的部分就像混凝土一样:拓扑信息是分布的,固定的部分越多,改变拓扑结构越麻烦。我们需要一种位于客户端和服务之间的东西,让它中心化拓扑信息。最好是让我们能够在任何时候添加和移除服务,而不用关心拓扑结构的其他部分。
消息队列代理
&&&我们来写一个小的消息队列代理以提供这种灵活性。这个代理绑定到两个端点:对客户端的前端,以及对服务的后端。代理使用zmq_poll监测两个套接字的活动,在两个套接字直接转发信息。代理实际上不会管理任何队列,ZeroMQ会自动地为每个套接字处理消息队列。
&&&使用REQ与REP交流的时候得到的是严格同步的请求-应答会话:客户端发送请求,服务读取请求,发送回应,然后客户端读取回应。如果客户端或者服务企图做其他事情(比如说,连续发送两个请求而不等待回应),则会发生错误。
&&&但是我们的代理必须是非阻塞的。显然必须使用zmq_poll来等待套接字上的活动,但除此之外,我们还不能使用REP和REQ。
&&&DEALER和ROUTER类型的套接字让我们可以进行非阻塞的请求-回应。它们以前的名字是XREQ和XREP,在旧代码中还可以看到这两个名字。第三章会介绍如何使用DEALER和ROUTER套接字创建各种类型的请求-应答流程。&
&&&使用请求-应答代理让客户端-服务器体系结构能够较容易地扩展,因为客户端看不到服务,服务也看不到客户端。唯一保持不变的节点是中间的设备。
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&client&=&zmq_socket(ctx,ZMQ_REQ);
&&&&zmq_connect(client,&tcp://localhost:5559&);
&&&&s_sleep(500);
&&&&for(int&idx&=&0;
idx&&&10;&++idx){
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init_size(&request,6);
&&&&&&&&memcpy(zmq_msg_data(&request),&Hello&,6);
&&&&&&&&printf(&发送Hello\n&);
&&&&&&&&zmq_sendmsg(client,&request,0);
&&&&&&&&zmq_msg_close(&request);
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init(&reply);
&&&&&&&&zmq_recvmsg(client,&reply,0);
&&&&&&&&printf(&收到%s\n&,(char*)zmq_msg_data(&reply));
&&&&&&&&zmq_msg_close(&reply);
&&&&zmq_close(client);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&responder&=&zmq_socket(ctx,ZMQ_REP);
&&&&zmq_connect(responder,&tcp://localhost:5560&);
&&&&s_sleep(1000);
&&&&while(1){
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init(&request);
&&&&&&&&zmq_recvmsg(responder,&request,0);
&&&&&&&&printf(&收到%s\n&,(char*)zmq_msg_data(&request));
&&&&&&&&zmq_msg_close(&request);
&&&&&&&&s_sleep(1000);
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init_size(&reply,6);
&&&&&&&&memcpy(zmq_msg_data(&reply),&World&,6);
&&&&&&&&printf(&发送World\n&);
&&&&&&&&zmq_sendmsg(responder,&reply,0);
&&&&&&&&zmq_msg_close(&reply);
&&&&zmq_close(responder);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&frontend&=&zmq_socket(ctx,ZMQ_ROUTER);
&&&&zmq_bind(frontend,&tcp://*:5559&);
&&&&void*&backend&=&zmq_socket(ctx,ZMQ_DEALER);
&&&&zmq_bind(backend,&tcp://*:5560&);
&&&&zmq_pollitem_t items[]={
&&&&&&&&{frontend,0,ZMQ_POLLIN,0},
&&&&&&&&{backend,0,ZMQ_POLLIN,0},
&&&&while(zmq_poll(items,2,-1)&&=&0){
&&&&&&&&// router --& dealer
&&&&&&&&if&(items[0].revents&&&ZMQ_POLLIN){
&&&&&&&&&&&&while(true){
&&&&&&&&&&&&&&&&zmq_msg_t a_
&&&&&&&&&&&&&&&&zmq_msg_init(&a_msg);
&&&&&&&&&&&&&&&&zmq_recvmsg(frontend,&a_msg,0);
&&&&&&&&&&&&&&&&int&
&&&&&&&&&&&&&&&&size_t optlen&=&sizeof(hasmore);
&&&&&&&&&&&&&&&&zmq_getsockopt(frontend,ZMQ_RCVMORE,&hasmore,&optlen);
&&&&&&&&&&&&&&&&zmq_sendmsg(backend,&a_msg,((hasmore&==&0)&?&0&:&ZMQ_SNDMORE));
&&&&&&&&&&&&&&&&zmq_msg_close(&a_msg);
&&&&&&&&&&&&&&&&if&(!hasmore){
&&&&&&&&&&&&&&&&&&&&break;
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&&&&&// dealer --& router
&&&&&&&&if&(items[1].revents&&&ZMQ_POLLIN){
&&&&&&&&&&&&while(true){
&&&&&&&&&&&&&&&&zmq_msg_t a_
&&&&&&&&&&&&&&&&zmq_msg_init(&a_msg);
&&&&&&&&&&&&&&&&zmq_recvmsg(backend,&a_msg,0);
&&&&&&&&&&&&&&&&int&
&&&&&&&&&&&&&&&&size_t optlen&=&sizeof(hasmore);
&&&&&&&&&&&&&&&&zmq_getsockopt(backend,ZMQ_RCVMORE,&hasmore,&optlen);
&&&&&&&&&&&&&&&&zmq_sendmsg(frontend,&a_msg,((hasmore&==&0)&?&0&:&ZMQ_SNDMORE));
&&&&&&&&&&&&&&&&zmq_msg_close(&a_msg);
&&&&&&&&&&&&&&&&if&(!hasmore){
&&&&&&&&&&&&&&&&&&&&break;
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&zmq_close(frontend);
&&&&zmq_close(backend);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
14.3 内建的设备
&&&&ZeroMQ提供了三种内建的设备:
QUEUE 用作请求-应答代理,要求ROUTER/DEALER套接字对FORWARDER 用作发布-订阅代理,要求PUB/SUB套接字对STREAMER 与FORWARDER相似,只是用于管线流程,要求PULL/PUSH套接字对
&&&&启动设备的代码如下:
&&&&完整的示例代码如下:&&&
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&frontend&=&zmq_socket(ctx,ZMQ_ROUTER);
&&&&zmq_bind(frontend,&tcp://*:5559&);
&&&&void*&backend&=&zmq_socket(ctx,ZMQ_DEALER);
&&&&zmq_bind(backend,&tcp://*:5560&);
&&&&zmq_device(ZMQ_QUEUE,frontend,backend);
&&&&zmq_close(frontend);
&&&&zmq_close(backend);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
&&&&不要邪恶地想:如果把任意类型的套接字对传入到zmq_device函数中会发生什么?不要那么做,那会导致不确定的行为。如果真的需要那么做,你需要编写自己的设备。
&&&&使用ZeroMQ编写多线程程序不需要互斥量和锁,除了在套接字之间传递的消息之外,也不需要任何其他形式的线程间通信。需要遵循的规则如下:
不得从多个线程访问相同的数据。在ZeroMQ应用中使用互斥量等典型的多线程编程技术是违背模式的(anti-pattern)。唯一的例外是ZeroMQ上下文对象,它是线程安全的。必须为进程创建ZeroMQ上下文,然后传递给想要通过inproc套接字互联的所有线程。可以将线程看做单独的任务,有自己的上下文,但是这些线程将不能使用inproc套接字相互通信。只是这样会让线程能够较容易地切分成单独的进程。不得在线程间共享ZeroMQ套接字,因为它不是线程安全的。虽然在技术上可以做到共享ZeroMQ套接字,但是这要求信号量、锁,或者互斥量。这些会让你的应用变慢和变得脆弱。唯一需要在线程间共享套接字的场合是需要对套接字进行特殊操作的语言绑定,比如说垃圾收集。
&&&比如说,如果要启动多个设备,则每个设备都需要在自己的线程中运行。常见的错误是在一个线程中创建设备套接字,然后将其传递给另一个线程中的设备。这好像能够工作,但是会随机出现故障。记住:不要在创建套接字的线程之外使用或者关闭套接字。
&&&如果遵循上述原则,则在必要的时候可以很容易地将线程切分到单独的进程中。
&&&我们来扩展前面的Hello World应用,使用多个工作线程来处理请求。当然可以用队列设备和外部工作者进程来实现,但是用一个进程耗尽16个CPU核心的处理能力通常比用16进程,每个耗尽1个核心的处理能力要容易。而且,将工作者作为线程运行也会减少网络跳数,延迟,以及网络流量。
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&boost/thread.hpp&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
static&void&worker_proc(void*&ctx){
&&&&void*&responder&=&zmq_socket(ctx,ZMQ_REP);
&&&&zmq_connect(responder,&inproc://workers&);
&&&&while(1){
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init(&request);
&&&&&&&&zmq_recvmsg(responder,&request,0);
&&&&&&&&printf(&收到%s\n&,(char*)zmq_msg_data(&request));
&&&&&&&&zmq_msg_close(&request);
&&&&&&&&s_sleep(1000);
&&&&&&&&zmq_msg_
&&&&&&&&zmq_msg_init_size(&reply,6);
&&&&&&&&memcpy(zmq_msg_data(&reply),&World&,6);
&&&&&&&&printf(&发送World\n&);
&&&&&&&&zmq_sendmsg(responder,&reply,0);
&&&&&&&&zmq_msg_close(&reply);
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&frontend&=&zmq_socket(ctx,ZMQ_ROUTER);
&&&&zmq_bind(frontend,&tcp://*:5559&);
&&&&void*&backend&=&zmq_socket(ctx,ZMQ_DEALER);
&&&&zmq_bind(backend,&inproc://workers&);
&&&&for(int&idx&=&0;
idx&&&5;&++idx){
&&&&&&&&boost::thread worker(worker_proc,ctx);
&&&&zmq_device(ZMQ_QUEUE,frontend,backend);
&&&&zmq_close(frontend);
&&&&zmq_close(backend);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
&&&&上述多线程版本的Hello World服务把队列设备和工作者合并到单个进程中了。
16 线程间的信号
&&&&使用ZeroMQ编写多线程应用时会遇到协调线程的问题。虽然可以使用sleep或者其他多线程技术,如信号量或者互斥量,但是你应该使用的唯一机制是ZeroMQ消息。
&&&&假设三个线程需要依次宣告自己准备好了。这个例子中我们在inproc传输端点上使用PAIR套接字:
#include&&zmq.h&
#include&&zmq_helpers.h&
#include&&boost/thread.hpp&
#include&&stdio.h&
#pragma&comment(lib,&libzmq-v100-mt.lib&)
static&void&step1(void*&ctx){
&&&&void*&this_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_connect(this_step,&inproc://step1&);
&&&&s_send(this_step,&step1&);
&&&&zmq_close(this_step);
static&void&step2(void*&ctx){
&&&&void*&prev_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_bind(prev_step,&inproc://step1&);
&&&&boost::thread step1_(step1,ctx);
&&&&char*&p&=&s_recv(prev_step);
&&&&free(p);
&&&&zmq_close(prev_step);
&&&&//---------------------------------
&&&&void*&this_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_connect(this_step,&inproc://step2&);
&&&&s_send(this_step,&step2&);
&&&&zmq_close(this_step);
static&void&step3(void*&ctx){
&&&&void*&prev_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_bind(prev_step,&inproc://step2&);
&&&&boost::thread step2_(step2,ctx);
&&&&char*&p&=&s_recv(prev_step);
&&&&free(p);
&&&&zmq_close(prev_step);
&&&&//---------------------------------
&&&&void*&this_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_connect(this_step,&inproc://step3&);
&&&&s_send(this_step,&step3&);
&&&&zmq_close(this_step);
int&main(void){
&&&&void*&ctx&=&zmq_ctx_new();
&&&&void*&last_step&=&zmq_socket(ctx,ZMQ_PAIR);
&&&&zmq_bind(last_step,&inproc://step3&);
&&&&boost::thread(step3,ctx);
&&&&char*&p&=&s_recv(last_step);
&&&&free(p);
&&&&zmq_close(last_step);
&&&&zmq_ctx_destroy(ctx);
&&&&return&0;
&&&这是ZeroMQ用于多线程时的一种典型模式:
两个线程使用共享的上下文中的inproc传输端点进行通信父线程创建一个套接字,绑定到inproc://端点,然后启动子线程,传入上下文子线程创建第二个套接字,连接到inproc://端点,然后通知父线程自己已经准备好了
&&&&注意:使用这种模式的多线程代码不能扩展到多个进程。使用inproc传输端点和套接字对会创建紧密耦合的应用。请在低延迟确实非常重要的时候才这么做。对于通常的应用,请对每个线程使用一个上下文,使用ipc或者tcp传输端点。这样在需要的时候就可以很容易地将线程划分到单独的进程中。
&&&&为什么使用PAIR套接字?其他套接字组合可能能够工作,但是都有可能会影响信号的副作用:
可以使用PUSH作为发送方,使用PULL作为接收方。但是PUSH会将消息负载均衡给所有可用的接收者。如果你意外地启动了两个接收者,则会丢失一半的信号。PAIR的优点在于拒绝多个连接,它是独占的。可以使用DEALER作为发送方,使用ROUTER作为接收方。但是ROUTER会使用“信封”封装消息,使得零字节的信号称为多段消息。如果你不关心数据,把任何数据当做是有效的信号,并且只在套接字上执行一次读取操作,那么没有问题。但是如果你决定发送实际的数据,则会发现ROUTER会给你“错误的”消息。DEALER也是负载平衡的,和PUSH有相同的风险。可以使用PUB作为发送方,使用SUB作为接收方。这样可以正确地投递消息,而且PUB不会像PUSH和DEALER那样进行负载平衡。但是你需要配置订阅者执行一个空的订阅,这是比较麻烦的。更糟的是,PUB-SUB链路的可靠性依赖于时间,如果PUB套接字发送消息的时候SUB套接字正在进行连接,则消息会丢失。
&&&因为上述原因,PAIR是用于协调两个线程的最佳选择。
17 节点协调
&&&要协调节点的时候,PAIR套接字就不管用了。这是线程和节点的策略不同的少数情况之一。理论上说节点可以上线和下线,而线程是不受影响的。但是远端节点离开然后重新上线时,PAIR套接字不会自动进行重新连接。
&&&线程和节点的第二点不同在于,通常线程的数量是固定的,但是节点数是可变的。我们来在先前的场景(天气服务器和客户端)中使用节点协调,以保证订阅者在启动过程中不会错过(天气)数据。
&&&应用程序的工作过程:
发布者预先知道期望有多少个订阅者。这个数目是从别处得到的。启动发布者,等待所有订阅者连接。这部分工作就是节点协调。每个订阅者进行订阅操作,并且通过另一个套接字通知发布者,自己已经准备好了。发布者等到所有订阅者连接后,开始发布数据。
&&&这个例子中我们使用REQ-REP套接字来同步发布者和订阅者。
&&&不能认为REQ/REP会话完成的时候SUB已经完成了了连接。除非使用inproc传输端点,否则没有办法确定外出的连接已经完成。所以示例代码只是简单地在发起订阅和进行REQ/REP同步之间调用sleep。
&&&&更健壮的做法可以是:
发布者打开PUB套接字,发送&Hello&消息(不是实际的数据)订阅者打开SUB套接字,进行连接,并且在接收到Hello消息后通过一个REQ/REP套接字对通知发布者发布者收到所有必要的确认后,开始发送真实的数据
&&&第一章中我们提醒过你,作为新手,最好不要使用零拷贝。如果你看到了这里,那么可能你已经准备好使用零拷贝了。然而,要记住的是,很多路是通向地狱的,过早的优化并不好。在应用的体系并不完美的时候就试图进行零拷贝只是浪费时间,只会让事情更糟糕,而不是更好。
&&&要进行零拷贝,请使用zmq_msg_init_data创建一个消息,代表一个用malloc在堆上分配的数据块,然后传递给zmq_send。在创建消息的时候还需要传入一个函数,ZeroMQ将在发送完消息后调用这个函数来释放数据块。下面是一个简单的示例:
&&&不能在接收的时候进行零拷贝:ZeroMQ会给你一个缓冲区,你想用多久就用多久,但是不能直接把收到的数据写入到应用程序的缓冲区中。
&&&零拷贝可以用于多段消息。传统的消息系统中,发送多段消息之前需要把多个不同的缓冲区列集(marshal)到一个缓冲区中。这就需要复制数据。ZeroMQ让你可以将多个不同来源的缓冲区作为单个消息帧来发送。ZeroMQ会将每个缓冲区作为一个限定长度的帧发送。从应用程序来看,这需要一系列的send和recv调用。但是在ZeroMQ内部,所有数据通过单个系统调用写入到网络中,通过单个系统调用读回数据,所以效率很高。
19 发布-订阅模式中的消息信封
&&&前面简单介绍过多段消息,现在来看看它的主要用途:消息信封。
&&&发布-订阅基于前缀来匹配消息。把关键字放入到单独的帧中让匹配非常直接。
带信封的发布者
处理信封的订阅者
&&&&注意:订阅过滤器拒绝或者接受整个多段消息,不能只接受多段消息的一部分。
&&&&当然,也可以使用多个信封。
20 高水位标记
&&&如果进程A发送消息给进程B,而B突然变得很忙(垃圾收集、CPU过载等等),那么进程A想发送的消息会怎样?有些消息已经在B的网络缓冲区中;有些还在以太网线路中;有些在A的网络缓冲区中;而其他的则累积在A的内存中。如果不采取某些措施,A很容易耗尽内存然后崩溃。这是消息代理中常见的典型问题。
&&&如何解决这个问题?方法之一是向上传递问题。A从其他什么地方取得消息,所以告诉那个地方暂停就可以了。这就是“流控制”。有些情况下可以使用流控制,但是在更多情况下,传输层不能告诉应用层“暂停”发送消息。
&&&正确的答案是对缓冲区大小设置一个限制,达到这个限制的时候采取一些措施。大多数情况下,采取的措施是丢弃消息;少数其他情况下则是等待。
ZeroMQ使用概念“高水位标记(high water mark)”或者说HWM来定义其内部管道的容量。套接字上的每个外出连接或者进入连接都有自己的管道以及HWM容量。ZeroMQ 2.x版本中默认的HWM值为无限大;ZeroMQ 3.x版本中默认的HWM值为1000。高水位标记同时影响套接字的发送和接收缓冲区。有些套接字(PUB、PUSH)只有发送缓冲区;有些(SUB、PULL、REQ?、REP)则只有接收缓冲区;有些(DEALER、ROUTER、PAIR)则同时有发送和接收缓冲区。根据套接字类型的不同,达到高水位标记的时候,套接字要么阻塞,要么丢弃数据:PUB套接字会在达到高水位标记的时候丢弃数据;而其他类型的套接字则会阻塞。inproc传输端点上的发送方和接收方共享同一个缓冲区,所以实际的HWM是两端设置的HWM的和。如果一方没有设置HWM,则缓冲区大小没有限制(仅对于ZeroMQ 2.x版本?)。
&&&ZeroMQ应用就像插接到一起的一些盒子,唯一的限制只是你的想象。
&&&可伸缩的体系结构并不是新的概念,基于流的程序设计(flow-based programming)和Erlang等语言就是这样工作的,但是ZeroMQ让它前所未有地容易使用。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:4168次
排名:千里之外
转载:10篇

我要回帖

更多关于 push and pull 的文章

 

随机推荐