【VisionFive 2 Lite 单板计算机】物联网环境监测终端

【VisionFive 2 Lite 单板计算机】物联网环境监测终端

本文介绍了昉·星光 VisionFive2 Lite 单板计算机结合盛思锐 SEN66 传感器套件实现环境气体监测,并通过 MQTT 协议上传至 Home Assistant 智能家居平台,实现物联网环境监测终端的项目设计。

项目介绍

  • 准备工作:硬件连接、软件包安装、Docker部署、EMQX和HomeAssistant平台部署;

  • 工程测试:加载官方 Demo 例程,实现 SEN66 传感器数据终端打印;

  • MQTT:结合 MQTT 协议,实现传感器数据上传至 EMQX 服务器平台,并远程读取;

  • Home Assistant:通过 MQTT 集成,实现 SEN66 传感器数据卡片自动获取,实现物联网环境气体监测。

准备工作

包括硬件连接、软件包安装、Home Assistant 平台搭建等。

硬件连接

默认情况下,SEN66 示例程序设定传感器已连接到端口 /dev/i2c-1

SEN66 VisionFive 2 Lite Note
SDA (green) SDA (3) Serial Data
SCL (yellow) SCL (5) Serial Clock
GND (black) GND (6) Ground
VCC (red) 3.3V (1) Power

实物图

VisionFive2 Lite 40-Pin 接口定义

SEN66 引脚

引脚定义及描述

Pin Color Name Description Comments
1 red VDD Supply Voltage 3.3V ±5%
2 black GND Ground
3 green SDA I2C: Serial data input / output TTL 5V compatible
4 yellow SCL I2C: Serial clock input TTL 5V compatible
5 NC Do not connect Ground (Pins 2 and 5 are connected internally)
6 NC Do not connect Supply voltage (Pins 1 and 6 are connected internally)

详见:https://github.com/Sensirion/raspberry-pi-i2c-sen66/

软件包安装

终端执行如下指令,安装 Sensirion Sen66 库

 sudo pip3 install sensirion_i2c_sen66 --break-system-packages
 sudo apt install python3-paho-mqtt

详见:https://sensirion.github.io/python-i2c-sen66/index.html

搭建 Home Assistant 平台

电脑主机安装 Docker 软件;

 sudo apt-get update && sudo apt-get upgrade
 curl -fsSL https://get.docker.com -o get-docker.sh
 sudo sh get-docker.sh
 docker version

拉取 EMQX 和 Home Assistant 最新镜像;

 sudo docker pull emqx/emqx:latest
 sudo docker pull homeassistant/home-assistant:latest

启动 EMQX 和 HA 容器;

 docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
 ​
 docker run -d --restart always --name homeassistant -v /data/homeassistant/config:/config -e TZ=Asia/Shanghai -p 8123:8123 homeassistant/home-assistant:latest

MQTT 客户端

EMQX 创建 MQTT 客户端

  • 进入 EMQX 主页,如 http://192.168.31.116:18083

  • 依次打开 访问控制 - 客户端认证 - 创建 - Password-Based - 内置数据库 - (默认配置)- 创建

创建用户

  • 用户管理 - 新建用户 - 自定义用户名和密码 - 保存.

HA 配置

  • 浏览器输入网址 http://<IP>:8123,如 192.168.1.107:8123

  • 进入 HA 主界面(首次打开需进行注册),输入用户名、密码等信息;

  • 配置完成后进入 HA 概览 标签页;

工程测试

包括 AHT10和BMP280模块IIC通信测试、SEN66模块驱动测试等。

AHT10 & BMP280

终端执行指令 touch aht20_bmp280_print.py 新建程序文件并添加如下代码

 from aht10 import AHT10
 from bmp280 import BMP280
 import time
 ​
 while True:
     sensor = AHT10(bus=0, address=0x38)
     temp, hum = sensor.read()
     bmp280 = BMP280(bus=0, address=0x77)
     temperature, pressure = bmp280.get_temperature_and_pressure()
     print("Temperature: {:.2f} °C  Humidity: {:.2f} %RH  Pressure: {:.3f} kPa".format(temp, hum, pressure/1000))
     time.sleep(2)

AHT10 驱动

 """
 AHT10温湿度传感器驱动
 """
 ​
 import smbus2
 import time
 ​
 class AHT10:
     def __init__(self, bus=1, address=0x38):
         self.bus = smbus2.SMBus(bus)
         self.addr = address
         self._initialize()
         time.sleep(0.5)  # 延长初始化等待时间
 ​
     def _initialize(self):
         """初始化AHT10,发送0xE10800命令"""
         try:
             self.bus.write_i2c_block_data(self.addr, 0xE1, [0x08, 0x00])
             return True
         except:
             return False
 ​
     def read(self):
         """读取一次温湿度测量"""
         # 触发测量 0xAC3300
         self.bus.write_i2c_block_data(self.addr, 0xAC, [0x33, 0x00])
         time.sleep(0.08)  # 等待测量完成
 ​
         # 读取6字节数据
         data = self.bus.read_i2c_block_data(self.addr, 0x00, 6)
 ​
         # 解析数据
         hum_raw = ((data[1] << 12) | (data[2] << 4) | (data[3] >> 4))
         temp_raw = (((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5])
 ​
         humidity = hum_raw * 100 / (1 << 20)  # 2^20 = 1048576
         temperature = temp_raw * 200 / (1 << 20) - 50
 ​
         return round(temperature, 1), round(humidity, 1)
 ​
 # 使用示例
 def demo(interval=1):
     """连续打印温湿度,interval:采样间隔(秒)"""
     sensor = AHT10(bus=0, address=0x38)
     while True:
         temp, hum = sensor.read()
         print(f"Temperature: {temp}°C, Humidity: {hum}%RH")
         time.sleep(interval)

BMP280 驱动

 import time
 import smbus
 ​
 # BMP280 iic address.
 BMP280_I2C_ADDRESS = 0x77        # SDO = 0
 ​
 # Registers value
 BMP280_ID_Value = 0x58           # BMP280 ID
 BMP280_RESET_VALUE = 0xB6
 ​
 # BMP280 Registers definition
 BMP280_TEMP_XLSB_REG = 0xFC      # Temperature XLSB Register
 BMP280_TEMP_LSB_REG = 0xFB       # Temperature LSB Register
 BMP280_TEMP_MSB_REG = 0xFA       # Temperature LSB Register
 BMP280_PRESS_XLSB_REG = 0xF9     # Pressure XLSB  Register
 BMP280_PRESS_LSB_REG = 0xF8      # Pressure LSB Register
 BMP280_PRESS_MSB_REG = 0xF7      # Pressure MSB Register
 BMP280_CONFIG_REG = 0xF5         # Configuration Register
 BMP280_CTRL_MEAS_REG = 0xF4      # Ctrl Measure Register
 BMP280_STATUS_REG = 0xF3         # Status Register
 BMP280_RESET_REG = 0xE0          # Softreset Register
 BMP280_ID_REG = 0xD0             # Chip ID Register
 ​
 # calibration parameters
 BMP280_DIG_T1_LSB_REG = 0x88
 BMP280_DIG_T1_MSB_REG = 0x89
 BMP280_DIG_T2_LSB_REG = 0x8A
 BMP280_DIG_T2_MSB_REG = 0x8B
 BMP280_DIG_T3_LSB_REG = 0x8C
 BMP280_DIG_T3_MSB_REG = 0x8D
 BMP280_DIG_P1_LSB_REG = 0x8E
 BMP280_DIG_P1_MSB_REG = 0x8F
 BMP280_DIG_P2_LSB_REG = 0x90
 BMP280_DIG_P2_MSB_REG = 0x91
 BMP280_DIG_P3_LSB_REG = 0x92
 BMP280_DIG_P3_MSB_REG = 0x93
 BMP280_DIG_P4_LSB_REG = 0x94
 BMP280_DIG_P4_MSB_REG = 0x95
 BMP280_DIG_P5_LSB_REG = 0x96
 BMP280_DIG_P5_MSB_REG = 0x97
 BMP280_DIG_P6_LSB_REG = 0x98
 BMP280_DIG_P6_MSB_REG = 0x99
 BMP280_DIG_P7_LSB_REG = 0x9A
 BMP280_DIG_P7_MSB_REG = 0x9B
 BMP280_DIG_P8_LSB_REG = 0x9C
 BMP280_DIG_P8_MSB_REG = 0x9D
 BMP280_DIG_P9_LSB_REG = 0x9E
 BMP280_DIG_P9_MSB_REG = 0x9F
 ​
 ​
 class BMP280(object):
     def __init__(self, bus=1, address=BMP280_I2C_ADDRESS):
         self._address = address
         self._bus = smbus.SMBus(bus)    # 1: iic编号为1(根据自己的硬件接口选择对应的编号)
         # Load calibration values.
         if self._read_byte(BMP280_ID_REG) == BMP280_ID_Value: # read bmp280 id
             self._load_calibration()                          # load calibration data
             # BMP280_T_MODE_1 << 5 | BMP280_P_MODE_1 << 2 | BMP280_SLEEP_MODE;
             # 修复:使用正常的测量模式而不是 0xFF(0xFF 会使传感器进入强制模式但可能不稳定)
             # 0x27 表示:温度过采样x1,压力过采样x1,正常模式
             ctrlmeas = 0x27
             # BMP280_T_SB1 << 5 | BMP280_FILTER_MODE_1 << 2;
             config = 0x14
             self._write_byte(BMP280_CTRL_MEAS_REG, ctrlmeas)  # write bmp280 config
             # sets the data acquisition options
             self._write_byte(BMP280_CONFIG_REG, config)
             # 等待传感器稳定
             time.sleep(0.01)
         else:
             print("Read BMP280 id error!\r\n")
 ​
     def _read_byte(self, cmd):
         return self._bus.read_byte_data(self._address, cmd)
 ​
     def _read_u16(self, cmd):
         LSB = self._bus.read_byte_data(self._address, cmd)
         MSB = self._bus.read_byte_data(self._address, cmd+1)
         return (MSB << 8) + LSB
 ​
     def _read_s16(self, cmd):
         result = self._read_u16(cmd)
         if result > 32767:
             result -= 65536
         return result
 ​
     def _write_byte(self, cmd, val):
         self._bus.write_byte_data(self._address, cmd, val)
 ​
     def _load_calibration(self):                           # load calibration data
         "load calibration"
 ​
         """ read the temperature calibration parameters """
         self.dig_T1 = self._read_u16(BMP280_DIG_T1_LSB_REG)
         self.dig_T2 = self._read_s16(BMP280_DIG_T2_LSB_REG)
         self.dig_T3 = self._read_s16(BMP280_DIG_T3_LSB_REG)
         """ read the pressure calibration parameters """
         self.dig_P1 = self._read_u16(BMP280_DIG_P1_LSB_REG)
         self.dig_P2 = self._read_s16(BMP280_DIG_P2_LSB_REG)
         self.dig_P3 = self._read_s16(BMP280_DIG_P3_LSB_REG)
         self.dig_P4 = self._read_s16(BMP280_DIG_P4_LSB_REG)
         self.dig_P5 = self._read_s16(BMP280_DIG_P5_LSB_REG)
         self.dig_P6 = self._read_s16(BMP280_DIG_P6_LSB_REG)
         self.dig_P7 = self._read_s16(BMP280_DIG_P7_LSB_REG)
         self.dig_P8 = self._read_s16(BMP280_DIG_P8_LSB_REG)
         self.dig_P9 = self._read_s16(BMP280_DIG_P9_LSB_REG)
 ​
         # print(self.dig_T1)
         # print(self.dig_T2)
         # print(self.dig_T3)
         # print(self.dig_P1)
         # print(self.dig_P2)
         # print(self.dig_P3)
         # print(self.dig_P4)
         # print(self.dig_P5)
         # print(self.dig_P6)
         # print(self.dig_P7)
         # print(self.dig_P8)
         # print(self.dig_P9)
 ​
     def compensate_temperature(self, adc_T):
         """Returns temperature in DegC, double precision. Output value of "1.23"equals 51.23 DegC."""
         var1 = ((adc_T) / 16384.0 - (self.dig_T1) / 1024.0) * (self.dig_T2)
         var2 = (((adc_T) / 131072.0 - (self.dig_T1) / 8192.0) *
                 ((adc_T) / 131072.0 - (self.dig_T1) / 8192.0)) * (self.dig_T3)
         self.t_fine = var1 + var2
         temperature = (var1 + var2) / 5120.0
         return temperature
 ​
     def compensate_pressure(self, adc_P):
         """Returns pressure in Pa as double. Output value of "6386.2"equals 96386.2 Pa = 963.862 hPa."""
         var1 = (self.t_fine / 2.0) - 64000.0
         var2 = var1 * var1 * (self.dig_P6) / 32768.0
         var2 = var2 + var1 * (self.dig_P5) * 2.0
         var2 = (var2 / 4.0) + ((self.dig_P4) * 65536.0)
         var1 = ((self.dig_P3) * var1 * var1 / 524288.0 +
                 (self.dig_P2) * var1) / 524288.0
         var1 = (1.0 + var1 / 32768.0) * (self.dig_P1)
 ​
         if var1 == 0.0:
             return 0  # avoid exception caused by division by zero
 ​
         pressure = 1048576.0 - adc_P
         pressure = (pressure - (var2 / 4096.0)) * 6250.0 / var1
         var1 = (self.dig_P9) * pressure * pressure / 2147483648.0
         var2 = pressure * (self.dig_P8) / 32768.0
         pressure = pressure + (var1 + var2 + (self.dig_P7)) / 16.0
 ​
         return pressure
 ​
     def get_temperature_and_pressure(self):
         """Returns pressure in Pa as double. Output value of "6386.2"equals 96386.2 Pa = 963.862 hPa."""
         xlsb = self._read_byte(BMP280_TEMP_XLSB_REG)
         lsb = self._read_byte(BMP280_TEMP_LSB_REG)
         msb = self._read_byte(BMP280_TEMP_MSB_REG)
 ​
         adc_T = (msb << 12) | (lsb << 4) | (
             xlsb >> 4)      # temperature registers data
         temperature = self.compensate_temperature(
             adc_T)    # temperature compensate
 ​
         xlsb = self._read_byte(BMP280_PRESS_XLSB_REG)
         lsb = self._read_byte(BMP280_PRESS_LSB_REG)
         msb = self._read_byte(BMP280_PRESS_MSB_REG)
 ​
         adc_P = (msb << 12) | (lsb << 4) | (
             xlsb >> 4)      # pressure registers data
         pressure = self.compensate_pressure(
             adc_P)          # pressure compensate
         return temperature, pressure
 ​
 def demo(interval=1):
     """连续打印温度压强,interval:采样间隔(秒)"""
     bmp280 = BMP280(bus=4, address=0x77)
     while True:
         time.sleep(1)
         temperature, pressure = bmp280.get_temperature_and_pressure()
         print('Temperature = %.2f C Pressure = %.3f kPa' %
               (temperature, pressure/1000))
  • 终端执行指令 python3 aht20_bmp280_print.py 运行程序;

  • 连续打印温湿度和气压数据;

MQTT 上传

 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 """
 温湿度 + 气压 → MQTT → Home Assistant
 传感器:AHT10 + BMP280(I²C 总线 0)
 """
 ​
 import json
 import time
 from aht10 import AHT10
 from bmp280 import BMP280
 import paho.mqtt.client as mqtt
 ​
 # =============== 参数 ==============
 MQTT_BROKER = "192.168.1.107"   # EMQX IP
 MQTT_PORT   = 1883
 MQTT_USER   = "xxx"
 MQTT_PASS   = "xxx"
 NODE_ID     = "bosch"          # 设备节点 ID
 IIC_BUS     = 0                # i2c bus 0
 # ===================================
 ​
 # HA 自动发现主题模板
 DISCOVERY_PREFIX = "homeassistant"
 ​
 # 传感器配置
 SENSORS = {
     "temp": {"name": "Temperature", "unit": "°C",   "dev_cla": "temperature", "ic": "mdi:thermometer"},
     "hum":  {"name": "Humidity", "unit": "%",    "dev_cla": "humidity",    "ic": "mdi:water-percent"},
     "pres": {"name": "Pressure", "unit": "hPa",  "dev_cla": "pressure",    "ic": "mdi:gauge"},
 }
 ​
 def publish_discovery(client):
     """向 HA 发送自动发现配置,只需一次即可"""
     for key, cfg in SENSORS.items():
         topic = f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/{key}/config"
         payload = {
             "name": cfg["name"],
             "unit_of_measurement": cfg["unit"],
             "device_class": cfg["dev_cla"],
             "icon": cfg["ic"],
             "state_topic": f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/{key}/state",
             "unique_id": f"{NODE_ID}_{key}",
             "device": {
                 "identifiers": [NODE_ID],
                 "name": "Sensors",
                 "model": "AHT20_BMP280",
                 "manufacturer": "Bosch",
             },
         }
         client.publish(topic, json.dumps(payload), retain=True)
 ​
 def main():
     client = mqtt.Client()
     client.username_pw_set(MQTT_USER, MQTT_PASS)
     client.connect(MQTT_BROKER, MQTT_PORT, 60)
     client.loop_start()
 ​
     # 上电后先发送一次发现配置
     publish_discovery(client)
 ​
     # 传感器初始化
     aht  = AHT10(bus=IIC_BUS, address=0x38)
     bmp  = BMP280(bus=IIC_BUS, address=0x77)
 ​
     while True:
         try:
             temp, hum   = aht.read()
             _, pressure = bmp.get_temperature_and_pressure()  # 温度 AHT10 采集
             pressure_hpa = pressure * 10                      # kPa → hPa
             print(f"Temp: {temp:.2f} °C  Hum: {hum:.2f} %  Pres: {pressure_hpa:.2f} hPa")
 ​
             client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/temp/state", f"{temp:.2f}")
             client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/hum/state",  f"{hum:.2f}")
             client.publish(f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/pres/state", f"{pressure_hpa:.2f}")
         except Exception as e:
             print("读取/发送失败:", e)
         time.sleep(2)  # 2 s 上报一次
 ​
 if __name__ == "__main__":
     main()

保存代码;

SEN66

包括 Sen66 传感器 python 驱动代码、终端打印 PM 、CO2、NOx 浓度数据等。

介绍

SEN66 是 Sensirion 盛思锐公司推出的 PM, RH/T, VOC, Nox 和 CO2 测量传感器平台,可实现空气质量传感数据采集。

  • 外形紧凑、集成多个传感器、可测量 9 种环境参数 (PM1, PM2.5, PM4, PM10, T, RH, VOC指数,NOx指数,CO2) ;

  • 核心组件是 SPS6x —— 小型化的、基于 mems 技术的颗粒物传感器;

  • 适配于需要符合不同室内空气质量标准的应用,如RESET®,WELL建筑标准™和California Title 24建筑能效标准。

Python 代码

终端执行 touch sen66.py 新建文件并添加如下代码

 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 #
 # (c) Copyright 2025 Sensirion AG, Switzerland
 #
 #     THIS FILE IS AUTOMATICALLY GENERATED!
 #
 # Generator:     sensirion-driver-generator 1.1.2
 # Product:       sen66
 # Model-Version: 1.6.0
 #
 ​
 import argparse
 import time
 from sensirion_i2c_driver import LinuxI2cTransceiver, I2cConnection, CrcCalculator
 from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
 from sensirion_i2c_sen66.device import Sen66Device
 ​
 parser = argparse.ArgumentParser()
 parser.add_argument('--i2c-port', '-p', default='/dev/i2c-1')
 args = parser.parse_args()
 ​
 with LinuxI2cTransceiver(args.i2c_port) as i2c_transceiver:
     channel = I2cChannel(I2cConnection(i2c_transceiver),
                          slave_address=0x6B,
                          crc=CrcCalculator(8, 0x31, 0xff, 0x0))
     sensor = Sen66Device(channel)
     sensor.device_reset()
     time.sleep(1.2)
     serial_number = sensor.get_serial_number()
     print(f"serial_number: {serial_number}; "
           )
     sensor.start_continuous_measurement()
     for i in range(100):
         try:
             time.sleep(1.0)
             (mass_concentration_pm1p0, mass_concentration_pm2p5, mass_concentration_pm4p0, mass_concentration_pm10p0, humidity,
              temperature, voc_index, nox_index, co2
              ) = sensor.read_measured_values()
             print(f"mass_concentration_pm1p0: {mass_concentration_pm1p0}; "
                   f"mass_concentration_pm2p5: {mass_concentration_pm2p5}; "
                   f"mass_concentration_pm4p0: {mass_concentration_pm4p0}; "
                   f"mass_concentration_pm10p0: {mass_concentration_pm10p0}; "
                   f"humidity: {humidity}; "
                   f"temperature: {temperature}; "
                   f"voc_index: {voc_index}; "
                   f"nox_index: {nox_index}; "
                   f"co2: {co2}; "
                   )
         except BaseException:
             continue
     sensor.stop_measurement()

保存代码。

详见:https://github.com/Sensirion/python-i2c-sen66

效果

终端执行 python sen66.py 运行程序;

打印设备序列号、环境参数(PM1.0、PM2.5、PM4.0、PM10.0、湿度、温度、VOC指数、NOx指数、二氧化碳浓度)

关键函数

read_measured_values ( )

返回测量值。

  • 返回 mass_concentration_pm1p0:

    PM1.0 [µg/m³]

  • 返回 mass_concentration_pm2p5:

    PM2.5 [µg/m³]

  • 返回 mass_concentration_pm4p0:

    PM4.0 [µg/m³]

  • 返回 mass_concentration_pm10p0:

    PM10.0 [µg/m³]

  • 返回环境湿度:

    RH [%]

  • 返回环境温度:

    T [°C]

  • 返回 voc_index:

    VOC 指数

  • 返回 nox_index:

    NOx 指数

  • 返回二氧化碳:

    二氧化碳浓度 [ppm]

详见:https://sensirion.github.io/python-i2c-sen66/index.html

MQTT

通过 MQTT 协议实现传感器数据上传,并使用 EMQX 的 WebSocket 工具接收 JSON 报文。

流程图

代码

终端执行 touch sen66_mqtt_ha.py 指令新建程序文件,并添加如下代码

 #!/usr/bin/env python3
 """
 SEN66 → MQTT → Home Assistant
 """
 ​
 import argparse
 import time
 import json
 import socket
 import paho.mqtt.client as mqtt
 from sensirion_i2c_driver import LinuxI2cTransceiver, I2cConnection, CrcCalculator
 from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
 from sensirion_i2c_sen66.device import Sen66Device
 ​
 # ---------- 参数 ----------
 I2C_PORT         = '/dev/i2c-1'
 MQTT_BROKER      = '192.168.31.117'   # MQTT Broker IP
 MQTT_PORT        = 1883
 MQTT_USER        = 'xxx'
 MQTT_PASS        = 'xxx'
 MQTT_CLIENT_ID   = f'sen66_{socket.gethostname()}'
 BASE_TOPIC       = 'homeassistant/sensor/sen66'
 DEVICE_NAME      = 'SEN66'
 DEVICE_ID        = 'sen66_01'
 # -----------------------------
 ​
 # HA 自动发现的通用设备段
 DEVICE_INFO = {
     "name": DEVICE_NAME,
     "ids": [DEVICE_ID],
     "mdl": "SEN66",
     "mf": "Sensirion"
 }
 ​
 # 上传 9 个参量:topic_suffix, unit, icon, class
 QUANTITIES = [
     ("pm1",   "μg/m³", "mdi:blur",  "pm1"),
     ("pm25",  "μg/m³", "mdi:blur",  "pm25"),
     ("pm4",   "μg/m³", "mdi:blur",  "pm25"),
     ("pm10",  "μg/m³", "mdi:blur",  "pm10"),
     ("hum",   "%",     "mdi:water", "humidity"),
     ("temp",  "°C",    "mdi:thermometer", "temperature"),
     ("voc",   " ",     "mdi:cloud", None),
     ("nox",   " ",     "mdi:cloud", None),
     ("co2",   "ppm",   "mdi:molecule-co2", "carbon_dioxide"),
 ]
 ​
 def publish_discovery(client):
     """向 HA 发送自动发现配置,只需一次即可持久化"""
     for qty, unit, icon, dev_cla in QUANTITIES:
         topic = f"{BASE_TOPIC}/{qty}/config"
         payload = {
             "name": f"{DEVICE_NAME} {qty.upper()}",
             "state_topic": f"{BASE_TOPIC}/{qty}/state",
             "unit_of_measurement": unit,
             "icon": icon,
             "device": DEVICE_INFO,
             "unique_id": f"{DEVICE_ID}_{qty}",
         }
         if dev_cla:
             payload["device_class"] = dev_cla
         client.publish(topic, json.dumps(payload), retain=True)
 ​
 def main():
     parser = argparse.ArgumentParser()
     parser.add_argument('--i2c-port', '-p', default=I2C_PORT)
     args = parser.parse_args()
 ​
     # 连接 MQTT
     client = mqtt.Client(client_id=MQTT_CLIENT_ID)
     client.username_pw_set(MQTT_USER, MQTT_PASS)
     client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
     client.loop_start()
 ​
     with LinuxI2cTransceiver(args.i2c_port) as i2c_transceiver:
         channel = I2cChannel(I2cConnection(i2c_transceiver),
                              slave_address=0x6B,
                              crc=CrcCalculator(8, 0x31, 0xff, 0x0))
         sensor = Sen66Device(channel)
         sensor.device_reset()
         time.sleep(1.2)
         print("SEN66 serial:", sensor.get_serial_number())
         sensor.start_continuous_measurement()
 ​
         # 发送一次自动发现配置
         publish_discovery(client)
 ​
         try:
             while True:
                 time.sleep(2)          # 2 s 上报一次
                 try:
                     vals = sensor.read_measured_values()
                     # vals 顺序与官方 demo 一致
                     pm1  = vals[0].value
                     pm25 = vals[1].value
                     pm4  = vals[2].value
                     pm10 = vals[3].value
                     hum  = vals[4].value
                     temp = vals[5].value
                     voc  = int(vals[6].value)
                     nox  = int(vals[7].value)
                     co2  = int(vals[8].value)
                     #----------------------------------
                     data = {
                         "pm1":  float(pm1),
                         "pm25": float(pm25),
                         "pm4":  float(pm4),
                         "pm10": float(pm10),
                         "hum":  float(hum),
                         "temp": float(temp),
                         "voc":  int(voc),
                         "nox":  int(nox),
                         "co2":  int(co2),
                     }
                     # 分开发,方便 HA 各自解析
                     for qty, val in data.items():
                         client.publish(f"{BASE_TOPIC}/{qty}/state", val)
                     print("published:", data)
                 except Exception as e:
                     print("read error:", e)
                     continue
         except KeyboardInterrupt:
             print("Ctrl-C 退出")
         finally:
             sensor.stop_measurement()
             client.loop_stop()
             client.disconnect()
 ​
 if __name__ == '__main__':
     main()

保存代码。

测试

  • 终端执行 python sen66_mqtt_ha.py 指令运行程序;

  • 连接 MQTT 服务器并上传数据;

  • 终端打印 JSON 格式报文;

EMQX 测试

  • 进入 EMQX 网页控制界面,打开侧边标签栏,选择 诊断工具 - WebSocket 客户端

  • 输入用户名和密码,连接 MQTT 服务器;

  • 在订阅版块输入目标传感器主题,如 homeassistant/sensor/sen66/temp/state ,点击 订阅 按钮;

  • 在已接收版块可看到传感器数据,两秒更新一次;

Home Assistant

通过 MQTT 协议上传 SEN66 传感器数据至 Home Assistant 平台,实现远程 IoT 数据实时监测。

参数配置

打开 Home Assistant 网页界面 http://192.168.31.118:8123 ,在 设置 - 设备与服务 标签页添加 MQTT 集成;

  • 输入 EMQX 客户端信息,刷新页面,可看到新增的传感器;

  • 点击 SEN66 传感器,进入详情页面,获取设备信息;

  • 点击编辑按钮,配置所在区域等信息,并添加至仪表盘;

效果

进入 概览 页面,刷新网页,可获得实时传感器数据

  • 点击目标传感器,可获得历史数据及演化曲线

总结

本文介绍了昉·星光 VisionFive2 Lite 单板计算机结合盛思锐SEN66传感器套件实现环境气体监测,并通过 MQTT 协议上传至 Home Assistant 智能家居平台,实现物联网环境监测终端的项目设计,为相关产品在物联网领域的快速开发和应用设计提供了参考。

1 Like