进击的zeromq

进击的zeromq,第1张

进击的zeromq zmq的消息模式
  • PUB[server] - SUB[client]
  • REP[server] - REQ[client]
  • REQ - ROUTER
  • DEALER - REP
  • DEALER - ROUTER
  • DEALER - DEALER
  • ROUTER - ROUTER
  • PUSH[server] - PULL[client]
  • PAIR - PAIR
zmq的api
  • 创建和销毁套接字:zmq_socket(), zmq_close()
  • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  • 为套接字建立连接:zmq_bind()(server端), zmq_connect()(client端)
  • 发送和接收消息:zmq_send(), zmq_recv()
不同语言的库

libzmq,zmq的核心代码
cppzmq, 该库只有一个头文件。
Pyzmq,zmq的python库

  • libzmq的编译
git clone https://github.com/zeromq/libzmq.git
cd libzmq && mkdir build
cmake -D WITH_PERF_TOOL=OFF -D ZMQ_BUILD_TESTS=OFF 
-D ENABLE_CPACK=OFF -D CMAKE_BUILD_TYPE=Release
make -j3

更多编译选项可以参考链接

内存布局
socket.send ("Hello")

对应的布局:

长度+内容

REP&REQ例子

  • cpp版本例子
    client:
#include 
#include 
#include 
#include
    zmq::context_t context (1);
    //建立套接字
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server..." << std::endl;
    //连接服务器
    socket.connect ("tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        s_send (socket, "hello");
        std::cout << "Client1 Received :" < 

server:

#include 
#include 
#include 
#include
    zmq::context_t context (1);
    //建立套接字
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server..." << std::endl;
    //连接服务器
    socket.connect ("tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        s_send (socket, "SB");
        std::cout << "Client2 Received :" < 
  • python版本例子
    client:
#
#   Hello World client in Python
#   Connects REQ socket to tcp://localhost:5555
#   Sends "Hello" to server, expects "World" back
#

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s …" % request)
    socket.send(b"Hello")

    #  Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

server:

#
#   Hello World server in Python
#   Binds REP socket to tcp://*:5555
#   Expects b"Hello" from client, replies with b"World"
#

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")
pub&sub模式


cpp示例:
pub:

    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_PUB);
    publisher.bind("tcp://*:5563");

    while (1) {
        //  Write two messages, each with an envelope and content
        s_sendmore (publisher, "A");
        s_send (publisher, "We don't want to see this");

        Sleep (100);
        s_sendmore (publisher, "B");
        s_send (publisher, "We would like to see this");
        Sleep (100);
    }

sub:

    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    //连接
    subscriber.connect("tcp://localhost:5563");
    //设置频道B
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "B", 1);
    while (1) {
 
        //  Read envelope with address
        std::string address = s_recv (subscriber);
        //  Read message contents
        std::string contents = s_recv (subscriber);
        
        std::cout << "订阅2:[" << address << "] " << contents << std::endl;
    }

python示例:

import zmq
import sys

def start_sub():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE, b'2333')

    for _ in range(10):
        msg = socket.recv()
        topic, data = msg.split()
        print(f'topic:{topic}, data:{data}')

def start_pub():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
    topic = 2333
    idx = 0
    while True:
        msg = b'%d %d'% (topic, idx)
        socket.send(msg)
        idx += 1

if __name__ == '__main__':
    if len(sys.argv) > 1:
        print('starting sub')
        start_sub()
    else:
        print('starting pub')
        start_pub()

参考链接:
https://www.cnblogs.com/ssss429170331/p/5559210.html
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/client_server.html
https://zeromq.org/socket-api/
https://github.com/anjuke/zguide-cn

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5692801.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存