在《构建高性能物联网数据平台:EMQX和CnosDB的完整教程》中,我们探讨了如何利用EMQX的数据桥接功能,将采集来的时序数据实时传输到CnosDB中。本次分享将进一步介绍如何通过 CnosDB 的订阅功能,实现数据推送至 EMQX。通过这种方式,我们不仅能够确保数据的即时传输,还能充分发挥两者的优势,提升整体数据处理能力。
前置条件
在开始之前,需要准备以下工具和环境:
-
CnosDB 是一款高性能、高压缩率的开源分布式时序数据库。作为数据的发起端,需要将数据写入 CnosDB,并分发到 EMQX。
-
Cnos-Telegraf 是一款基于 Telegraf 开源的数据采集代理,主要用于数据监控、处理与分析。它可以将数据分发到 CnosDB 或将 CnosDB 中的数据进行外部订阅。
-
EMQX(Erlang/OTP MQTT Broker)是基于 Erlang/OTP 开发的开源 MQTT 消息中间件,负责在数据库场景中进行消息传递和数据流转。
-
mosquitto_sub 是一个命令行工具,用于订阅 MQTT 主题。EMQX 中的数据以主题为单位进行发布和订阅。通过 mosquitto_sub 订阅特定主题,可以获取相关数据。
环境搭建
- 操作系统
- Ubuntu 20.04.6 LTS x86_64
- EMQX
从官方仓库下载最新的 EMQX 镜像(版本 5.8.2)。启动容器服务时,请确保将 1883 端口映射出来:
其他安装方式请参考:https://docs.cnosdb.com/docs/deploy/
docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:latest
- Cnos-Telegraf 服务
通过官方仓库下载 Cnos-Telegraf 的最新代码并编译。修改配置文件如下:
[global_tags]
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "0s"
[[outputs.mqtt]]
servers = ["tcp://127.0.0.1:1883"]
topic = "telegraf/metrics"
username = "admin"
password = "public"
data_format = "influx"
[[inputs.cnosdb_subscription]]
service_address = ":7703"
启动 Cnos-Telegraf 服务:
./bin/telegraf --config ./telegraf.conf
- mosquitto_sub 服务
如果未安装 mosquitto_sub,可以通过以下命令安装:
sudo apt-get update
sudo apt-get install mosquitto-clients
启动 mosquitto_sub 监听 EMQX 的订阅:
mosquitto_sub -h 127.0.0.1 -u admin -P public -t telegraf/metrics -p 1883
- 下载并编译最新版企业版 CnosDB(版本 2.4.2),使用默认配置启动服务:
./target/release/cnosdb run --config ./config/config_8902.toml -M singleton
在 CnosDB 中创建订阅(目标端口为 Cnos-Telegraf 的监听端口):
CREATE SUBSCRIPTION test ON public DESTINATIONS ALL "127.0.0.1:7703";
然后创建表并插入数据:
CREATE TABLE air (visibility DOUBLE, temperature DOUBLE, pressure DOUBLE, TAGS(station));
INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES (1667456411000000000, 'XiaoMaiDao1', 56, 69, 411);
数据验证
写入成功后,CnosDB 表中和 mosquitto_sub 的订阅都会收到一条数据。如下:
- CnosDB 接收到的数据:
- 推送至 EMQX 中并使用 mosquitto_sub 订阅的数据(默认格式):
以上就是将 CnosDB 数据发送到 EMQX 的完整流程。希望这次的分享能够帮助你更好地理解数据流转的实现!
欣赏CnosDB社区名画: