NodeJS

NodeJS

Github https://github.com/zeromq/zeromq.js/
npm https://npmjs.net.cn/package/zeromq

安装

要安装适用于 Node 的 ZeroMQ,请从 npm 下载并安装软件包。

$ npm install zeromq

ZeroMQ.js API 参考

使用 zeromq 的示例

Push/Pull

此示例演示生产者如何将信息推送到 socket,以及工作者如何从 socket 拉取信息。

producer.js

创建一个生产者将信息推送到 socket。

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Push()

  await sock.bind("tcp://127.0.0.1:3000")
  console.log("Producer bound to port 3000")

  while (true) {
    await sock.send("some work")
    await new Promise(resolve => {
      setTimeout(resolve, 500)
    })
  }
}

run()

worker.js

创建一个工作者从 socket 拉取信息。

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Pull()

  sock.connect("tcp://127.0.0.1:3000")
  console.log("Worker connected to port 3000")

  for await (const [msg] of sock) {
    console.log("work: %s", msg.toString())
  }
}

run()

Pub/Sub

此示例演示了在经典的 Pub/Sub(发布/订阅)应用程序中使用 zeromq。

发布者: publisher.js

创建发送消息的发布者。

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Publisher()

  await sock.bind("tcp://127.0.0.1:3000")
  console.log("Publisher bound to port 3000")

  while (true) {
    console.log("sending a multipart message envelope")
    await sock.send(["kitty cats", "meow!"])
    await new Promise(resolve => {
      setTimeout(resolve, 500)
    })
  }
}

run()

订阅者: subscriber.js

创建一个订阅者,连接到发布者的端口以接收消息。

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Subscriber()

  sock.connect("tcp://127.0.0.1:3000")
  sock.subscribe("kitty cats")
  console.log("Subscriber connected to port 3000")

  for await (const [topic, msg] of sock) {
    console.log(
      "received a message related to:",
      topic,
      "containing message:",
      msg,
    )
  }
}

run()

Req/Rep

此示例说明了客户端的请求和服务器的应答。

客户端: client.js

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Request()

  sock.connect("tcp://127.0.0.1:3000")
  console.log("Producer bound to port 3000")

  await sock.send("4")
  const [result] = await sock.receive()

  console.log(result)
}

run()

服务器: server.js

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Reply()

  await sock.bind("tcp://127.0.0.1:3000")

  for await (const [msg] of sock) {
    await sock.send(2 * parseInt(msg, 10))
  }
}

run()

TypeScript

此库为 TypeScript 3.0.x 及更高版本提供类型定义。

import {Request} from "zeromq"
// or as namespace
import * as zmq from "zeromq"

const reqSock = new Request()
//...
const repSock = new zmq.Reply()

更多示例 更高级的示例可在本仓库的 examples 目录下找到。

或者您可以浏览 API 参考文档,查看所有 socket 类型、方法和选项,以及如何应用它们的详细信息。