首页 » 博客 » CnosDB向EMQX实时推送消息,实现端到端的数据实时流转

CnosDB向EMQX实时推送消息,实现端到端的数据实时流转


《构建高性能物联网数据平台:EMQX和CnosDB的完整教程》中,我们探讨了如何利用EMQX的数据桥接功能,将采集来的时序数据实时传输到CnosDB中。本次分享将进一步介绍如何通过 CnosDB 的订阅功能,实现数据推送至 EMQX。通过这种方式,我们不仅能够确保数据的即时传输,还能充分发挥两者的优势,提升整体数据处理能力。

前置条件

在开始之前,需要准备以下工具和环境:

  1. CnosDB 是一款高性能、高压缩率的开源分布式时序数据库。作为数据的发起端,需要将数据写入 CnosDB,并分发到 EMQX。

  2. Cnos-Telegraf 是一款基于 Telegraf 开源的数据采集代理,主要用于数据监控、处理与分析。它可以将数据分发到 CnosDB 或将 CnosDB 中的数据进行外部订阅。

  3. EMQX(Erlang/OTP MQTT Broker)是基于 Erlang/OTP 开发的开源 MQTT 消息中间件,负责在数据库场景中进行消息传递和数据流转。

  4. mosquitto_sub 是一个命令行工具,用于订阅 MQTT 主题。EMQX 中的数据以主题为单位进行发布和订阅。通过 mosquitto_sub 订阅特定主题,可以获取相关数据。

环境搭建

  1. 操作系统
  • Ubuntu 20.04.6 LTS x86_64
  1. EMQX

从官方仓库下载最新的 EMQX 镜像(版本 5.8.2)。启动容器服务时,请确保将 1883 端口映射出来:

其他安装方式请参考:https://docs.cnosdb.com/docs/deploy/

docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:latest
  1. Cnos-Telegraf 服务

通过官方仓库下载 Cnos-Telegraf 的最新代码并编译。修改配置文件如下:

[global_tags]
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "0s"

[[outputs.mqtt]]
  servers = ["tcp://127.0.0.1:1883"]
  topic = "telegraf/metrics"
  username = "admin"
  password = "public"
  data_format = "influx"

[[inputs.cnosdb_subscription]]
  service_address = ":7703"

启动 Cnos-Telegraf 服务:

./bin/telegraf --config ./telegraf.conf
  1. mosquitto_sub 服务

如果未安装 mosquitto_sub,可以通过以下命令安装:

sudo apt-get update
sudo apt-get install mosquitto-clients

启动 mosquitto_sub 监听 EMQX 的订阅:

mosquitto_sub -h 127.0.0.1 -u admin -P public -t telegraf/metrics -p 1883
  1. 下载并编译最新版企业版 CnosDB(版本 2.4.2),使用默认配置启动服务:
./target/release/cnosdb run --config ./config/config_8902.toml -M singleton

在 CnosDB 中创建订阅(目标端口为 Cnos-Telegraf 的监听端口):

CREATE SUBSCRIPTION test ON public DESTINATIONS ALL "127.0.0.1:7703";

然后创建表并插入数据:

CREATE TABLE air (visibility DOUBLE, temperature DOUBLE, pressure DOUBLE, TAGS(station));
INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES (1667456411000000000, 'XiaoMaiDao1', 56, 69, 411);

数据验证

写入成功后,CnosDB 表中和  mosquitto_sub 的订阅都会收到一条数据。如下:

  • CnosDB 接收到的数据:

  • 推送至 EMQX 中并使用 mosquitto_sub 订阅的数据(默认格式):

以上就是将 CnosDB 数据发送到 EMQX 的完整流程。希望这次的分享能够帮助你更好地理解数据流转的实现!


欣赏CnosDB社区名画: