本文将为你讲解,如何借助基于 Snowflake 打造的开源工具集 SNACK-AI,将一台普通树莓派改造成功能强大的热成像数据流传输设备。
SNACK-AI:https://medium.com/@tspann/snack-ai-d8c5dd692929
在物联网与边缘计算领域,实时处理环境数据的能力具有颠覆性意义。无论是监控服务器机房温度、检测暖通空调系统泄漏,还是在安防系统中检测人体存在,热成像技术能提供的丰富数据维度,都是普通温度传感器无法比拟的。
源码地址: https://github.com/tspannhw/RPIThermalStreaming
硬件配置:高性价比的热成像方案
本项目的核心硬件是树莓派 4—— 这款应用广泛的单板计算机,为无数边缘 AI 项目提供算力支撑。而真正的核心传感设备,则是 Pimoroni MLX90640 热成像摄像头转接板。
与仅能检测运动的普通被动红外传感器不同,MLX90640 是一款远红外热成像传感器阵列,分辨率为 32×24 像素。这个分辨率相较于 4K 摄像头看似偏低,但对于热成像数据采集而言,每帧可提供 768 个独立的温度读数,足以生成实际的热成像图和热成像视频流。
核心硬件参数
主控设备:树莓派 4
传感设备:迈来芯 MLX90640 热成像传感器(通过 Pimoroni Breakout Garden 转接板连接)
经典架构:从边缘端到云端的数据流传输
RPIThermalStreaming 项目并非简单的传感器数据读取工具,更是现代数据工程的实操典范。项目依托 FLiP 技术栈(Flink、NiFi、Pulsar/Kafka),实现了边缘端数据向云端的可靠传输,整体流程分为三步
1.边缘端数据采集(Python 开发)
树莓派本地运行基于 Python 开发的应用程序,通过脚本调用 MLX90640 传感器驱动完成数据帧采集。该项目代码仓库在 2025 年 12 月前后完成了重要的性能优化,让数据帧的采集与传输流程变得前所未有的流畅。
项目提供了可直接用于生产环境的启动脚本 start_production.sh,该脚本的功能包括
清理 Python 缓存,确保程序干净运行;
验证配置文件与项目依赖是否齐全;
启动 SnowpipeStreamingClient 客户端,处理数据传输工作。
2.数据接入、路由与流存储(Apache Kafka/Pulsar)
为实现边缘设备与分析引擎的解耦,项目会将采集到的数据缓存至 Apache Kafka 或 Apache Pulsar 这类流处理平台。这一设计能确保即便分析层处于高负载状态,也不会出现热成像数据帧丢失的情况。
而在本项目的实际应用中,我们选择通过 SSL 加密的 Snowpipe Streaming 高速 V2 版 REST API,将数据直接传输至 Snowflake 云数据平台,省去了中间的缓存和解耦层。原因在于本项目仅使用一台设备,移除该环节能提升传输速度并降低成本;同时,由于热成像数据具有高度重复性,少量数据丢失在可接受范围内。当然,我们也可轻松将数据通过 Pulsar/Kafka/MQTT 协议发送至 Pulsar 或 Kafka 平台,若为多设备的生产环境部署,建议搭配 StreamNative 或 Confluent Cloud 云服务。
3.数据分析与存储(Snowflake)
数据的最终目的地是 Snowflake 云数据平台,项目借助 Snowpipe Streaming V2 高速 API,实现了数据的低延迟接入。Snowflake 可对历史热成像阵列数据进行存储,支持后续的回放分析与异常检测工作。
应用场景
热成像数据的实时流传输拥有丰富的应用场景,核心包括以下几类:
工业监控:在设备发生故障前,检测出机械过热的问题,实现预防性维护;
智能交通:分析列车车厢内的乘客密度,采用热成像轮廓识别,避免人脸识别带来的隐私泄露问题;
智能家居:实现真正的人体存在检测,即便人体保持静止也能精准识别,弥补运动传感器的不足;
安全防护:打造可穿透烟雾识别热源的火灾探测系统,提升火灾预警的可靠性。
快速上手指南
如果你手边已有树莓派 4 和 MLX90640 传感器,即可立即开始部署,步骤如下,
1.克隆项目代码仓库。
git clone [https://github.com/tspannhw/RPIThermalStreaming](https://github.com/tspannhw/RPIThermalStreaming)
2.安装项目依赖:按照项目 README.md 文档的说明,配置所需的 Python 库与 I2C 驱动。
3.运行启动脚本:
./start_production.sh
技术深度解析
项目概述
本应用程序可持续读取树莓派连接的各类传感器数据,并以近实时的方式将数据流式传输至 Snowflake 平台,核心特性包括:
✅ 支持 Snowpipe Streaming V2 REST API,采用高性能架构;
✅ 基于 JWT 的身份认证,实现安全的密钥对认证机制;
✅ 支持真实传感器数据采集,兼容 BME680、SGP30 等环境传感器;
✅ 提供仿真模式,无需物理硬件即可完成功能测试;
✅ 支持自动批处理,批处理大小与时间间隔均可配置;
✅ 实现优雅停机,程序退出前确保数据完成提交;
✅ 提供全面的日志记录功能,同时向文件和控制台输出详细日志。
系统架构
┌─────────────────────────────────────────────┐│ Raspberry Pi 4 ││ ││ ┌─────────────────────────────────────┐ ││ │ Environmental Sensors │ ││ │ - BME680: Temp, Humidity, Pressure │ ││ │ - SGP30: CO2, VOC │ ││ │ - System: CPU, Memory, Disk │ ││ └────────────┬────────────────────────┘ ││ │ ││ ┌────────────▼────────────────────────┐ ││ │ Python Application │ ││ │ - thermal_sensor.py │ ││ │ - thermal_streaming_client.py │ ││ │ - snowflake_jwt_auth.py │ ││ └────────────┬────────────────────────┘ │└───────────────┼─────────────────────────────┘│ REST API (HTTPS)│ JWT Authentication▼┌───────────────────────────────────────────┐│ Snowflake Cloud ││ ││ Snowpipe Streaming v2 (REST API) ││ ↓ ││ THERMAL_SENSOR_PIPE ││ ↓ ││ THERMAL_SENSOR_DATA (Table) ││ ││ Near real-time ingestion (5-10s latency) │└───────────────────────────────────────────┘
特性
采集的传感器数据
应用程序可采集全面的环境数据、系统指标与元数据,具体分类如下:
环境传感器数据
SCD4X:温度(℃)、湿度(%)、二氧化碳浓度(ppm)
ICP10125:大气压力(Pa)、温度(℃)
SGP30:等效二氧化碳浓度(ppm)、总挥发性有机物浓度(ppb)
系统运行指标:
CPU 温度(华氏度 / 摄氏度)
CPU 使用率(%)
内存使用率(%)
磁盘使用量(MB)
元数据:
设备唯一标识 UUID 与行 ID
主机名与 IP 地址
物理地址 MAC
多格式时间戳
部署前提
硬件要求
树莓派 4(或树莓派 3 Model B+)
集成电路总线(I2C)连接的各类传感器:
SCD4X:二氧化碳、温度、湿度传感器
ICP10125:气压、温度传感器
SGP30:等效二氧化碳、总挥发性有机物传感器
网络连接
软件要求
Python 3.9 及以上版本
拥有相应操作权限的 Snowflake 账号
可访问 Snowflake 平台的网络环境
简要步骤
第一步:克隆项目并完成基础配置
# Clone or copy the projectcd RPIThermalStreaming
# Create Python virtual environmentpython3 -m venv venvsource venv/bin/activate # On Windows: venvScriptsactivate# Install dependenciespip install -r requirements.txt
第二步:选择身份认证方式
本应用支持两种 Snowflake 身份认证方式,可按需选择:
方式 A:编程访问令牌(PAT)—— 推荐使用,操作更简便
在 Snowflake 中生成编程访问令牌:
-- Generate PAT for user (valid 15 days by default)ALTER USER THERMAL_STREAMING_USERADD PROGRAMMATIC ACCESS TOKENNAME = 'thermal_pat'EXPIRES_IN = 90;
立即复制输出结果中的密钥,该密钥仅能查看一次!
将获取的编程访问令牌填写至 snowflake_config.json 配置文件中(详见第四步)。
更多细节可参考 Snowflake 官方的编程访问令牌文档。
https://docs.snowflake.com/en/user-guide/programmatic-access-tokens
方式 B:JWT 密钥对认证
生成 RSA 密钥对:
chmod +x generate_keys.sh./generate_keys.sh
该脚本会生成私钥文件 rsa_key.p8,并在控制台输出用于注册公钥的 SQL 命令。
详细配置步骤可参考项目中的 AUTHENTICATION.md 文档。
https://github.com/tspannhw/RPIThermalStreaming/blob/main/AUTHENTICATION.md
第三步:配置 Snowflake
1.在 Snowflake 中运行对应的 SQL 脚本:
# Copy the ALTER USER command from generate_keys.sh output# Then run setup_snowflake.sql in Snowflake
该脚本会自动创建以下资源:
用户:THERMAL_STREAMING_USER
角色:THERMAL_STREAMING_ROLE
数据库:DEMO
模式:DEMO
数据表:THERMAL_SENSOR_DATA
数据管道:THERMAL_SENSOR_PIPE
2.验证数据管道是否创建成功:
SHOW PIPES LIKE 'THERMAL_SENSOR_PIPE' IN SCHEMA DEMO.DEMO;DESC PIPE DEMO.DEMO.THERMAL_SENSOR_PIPE;
第四步:配置应用程序
基于模板创建配置文件:
cp snowflake_config.json.template snowflake_config.json
编辑snowflake_config.json:
{"user": "THERMAL_STREAMING_USER","account": "xy12345","url": "https://xy12345.snowflakecomputing.com:443","private_key_file": "rsa_key.p8","role": "THERMAL_STREAMING_ROLE","database": "DEMO","schema": "DEMO","pipe": "THERMAL_SENSOR_PIPE","channel_name": "thermal_channel_001"}
将配置中的 xy12345 替换为你的 Snowflake 账号标识。
第五步:检测传感器状态(可选)
若已连接物理传感器,可运行以下命令验证传感器是否正常工作:
python check_sensors.py
该脚本会自动检测并测试所有已连接的传感器。
第六步:运行应用程序
连接物理传感器运行
python main.py
仿真模式运行(无需物理硬件)
python main.py --simulate
自定义配置运行
python main.py--config snowflake_config.json--batch-size 20--interval 10.0--simulate
第七步:在 Snowflake 中验证数据
通过 SQL 查询验证数据是否成功接入:
-- Check row countSELECT COUNT(*) FROM DEMO.DEMO.THERMAL_SENSOR_DATA;
-- View recent readingsSELECT * FROM DEMO.DEMO.THERMAL_SENSOR_DATAORDER BY ingestion_timestamp DESCLIMIT 100;-- Temperature trendsSELECThostname,datetimestamp,temperature,humidity,co2,cputempfFROM DEMO.DEMO.THERMAL_SENSOR_DATAORDER BY datetimestamp DESCLIMIT 50;
命令行参数说明
python main.py [OPTIONS]
Options:--config FILE Path to Snowflake config (default: snowflake_config.json)--batch-size N Readings per batch (default: 10)--interval SECONDS Seconds between batches (default: 5.0)--simulate Use simulated sensor data--verbose Enable debug logging-h, --help Show help message
项目目录结构
RPIThermalStreaming/├── main.py # Main application entry point├── thermal_streaming_client.py # Snowpipe Streaming REST API client├── snowflake_jwt_auth.py # JWT authentication for Snowflake├── thermal_sensor.py # Sensor data reader (real & simulated)├── requirements.txt # Python dependencies├── setup_snowflake.sql # Snowflake setup script├── snowflake_config.json.template # Configuration template├── generate_keys.sh # Key pair generation script├── .gitignore # Git ignore file└── README.md # This file
配置
批处理大小与时间间隔
可根据实际需求调整批处理参数:
高频低延迟场景:--batch-size 5 --interval 2.0
均衡配置(默认):--batch-size 10 --interval 5.0
低频采集场景:--batch-size 20 --interval 15.0
传感器配置
应用程序会自动检测已连接的传感器,若未检测到物理传感器,将自动切换至仿真模式。
支持的传感器
SCD4X:二氧化碳、温度、湿度(I2C 地址:0x62)
ICP10125:气压、温度(I2C 地址:0x63)
SGP30:等效二氧化碳、总挥发性有机物(I2C 地址:0x58)
启用传感器:取消 requirements.txt 文件中对应传感器库的注释,并执行以下命令安装
pip install python-scd4x icp10125 sgp30 smbus2
监控
应用程序日志
日志会同时输出至以下位置:
控制台标准输出(stdout)
日志文件:thermal_streaming.log
应用程序每传输 10 批数据,会在控制台打印一次数据接入统计信息:
===== INGESTION STATISTICS =====Total rows sent: 150Total batches: 15Total bytes sent: 45,234Errors: 0Elapsed time: 75.23 secondsAverage throughput: 1.99 rows/secCurrent offset token: 15================================
On Raspberry PI Example Run2025-12-02 10:29:44,901 [INFO] __main__ - Sample reading: Temp=27.8?C, Humidity=25.9%, CO2=1162ppm, CPU=28.1%2025-12-02 10:29:44,901 [INFO] thermal_streaming_client - Appending 10 rows...2025-12-02 10:29:44,989 [INFO] thermal_streaming_client - Successfully appended 10 rows2025-12-02 10:29:44,992 [INFO] __main__ - [OK] Successfully sent 10 readings to Snowpipe Streaming2025-12-02 10:29:44,993 [INFO] __main__ -======================================================================2025-12-02 10:29:44,993 [INFO] __main__ - Shutting down...2025-12-02 10:29:44,993 [INFO] __main__ - ======================================================================2025-12-02 10:29:44,993 [INFO] thermal_streaming_client - ============================================================2025-12-02 10:29:44,994 [INFO] thermal_streaming_client - INGESTION STATISTICS2025-12-02 10:29:44,994 [INFO] thermal_streaming_client - ============================================================2025-12-02 10:29:44,994 [INFO] thermal_streaming_client - Total rows sent: 302025-12-02 10:29:44,994 [INFO] thermal_streaming_client - Total batches: 32025-12-02 10:29:44,994 [INFO] thermal_streaming_client - Total bytes sent: 19,5022025-12-02 10:29:44,995 [INFO] thermal_streaming_client - Errors: 02025-12-02 10:29:44,995 [INFO] thermal_streaming_client - Elapsed time: 144.83 seconds2025-12-02 10:29:44,995 [INFO] thermal_streaming_client - Average throughput: 0.21 rows/sec2025-12-02 10:29:44,995 [INFO] thermal_streaming_client - Current offset token: 32025-12-02 10:29:44,996 [INFO] thermal_streaming_client - ============================================================2025-12-02 10:29:44,996 [INFO] __main__ - Closing streaming channel...2025-12-02 10:29:44,996 [INFO] thermal_streaming_client - Closing channel: TH_CHNL_20251202_1027202025-12-02 10:29:44,996 [INFO] thermal_streaming_client - Channel will auto-close after inactivity period2025-12-02 10:29:44,996 [INFO] __main__ - [OK] Channel closed2025-12-02 10:29:44,997 [INFO] __main__ - Shutdown complete
Snowflake 端监控
查看流传输通道状态:
-- View channel historySELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_CHANNEL_HISTORYWHERE PIPE_NAME = 'THERMAL_SENSOR_PIPE'ORDER BY START_TIME DESC;
-- Check ingestion latencySELECThostname,datetimestamp as sensor_time,ingestion_timestamp,DATEDIFF('second', datetimestamp, ingestion_timestamp) as latency_secondsFROM DEMO.DEMO.THERMAL_SENSOR_DATAORDER BY ingestion_timestamp DESCLIMIT 100;
常用查询示例
各设备最新采集数据
SELECThostname,temperature,humidity,co2,pressure,datetimestampFROM DEMO.DEMO.THERMAL_SENSOR_DATAQUALIFY ROW_NUMBER() OVER (PARTITION BY hostname ORDER BY datetimestamp DESC) = 1;
每小时数据平均值
SELECTDATE_TRUNC('hour', datetimestamp) as hour,hostname,AVG(temperature) as avg_temp_c,AVG(humidity) as avg_humidity,AVG(co2) as avg_co2,COUNT(*) as reading_countFROM DEMO.DEMO.THERMAL_SENSOR_DATAGROUP BY 1, 2ORDER BY 1 DESC;
高温预警查询
SELECThostname,datetimestamp,temperature,cputempfFROM DEMO.DEMO.THERMAL_SENSOR_DATAWHERE temperature > 30ORDER BY datetimestamp DESC;
常见问题排查
问题 1:提示 “Private key file not found”(未找到私钥文件)
解决方案:运行./generate_keys.sh 脚本生成 RSA 密钥对。
问题 2:提示 “Failed to get scoped token”(获取范围令牌失败)
解决方案:
1.验证公钥是否已在 Snowflake 中完成注册;
2.检查配置文件中的用户名与账号标识是否正确;
3.确认私钥与公钥是否匹配。
问题 3:提示 “No ingest_host returned”(未返回接入主机)
解决方案:
1.验证 Snowflake 账号标识是否正确;
2.检查树莓派与 Snowflake 平台的网络连通性;
3.确认数据管道已创建且配置正确。
问题 4:提示 “Channel open failed”(通道打开失败)
解决方案:
1.验证配置文件中的数据库、模式与数据管道名称是否正确;
2.检查当前用户是否拥有该数据管道的操作权限;
3.查看日志中的 Snowflake 官方错误信息,定位具体问题。
问题 5:传感器库无法导入
解决方案:
1.安装对应的传感器库:pip install python-scd4x icp10125 sgp30 smbus2;
2.在树莓派中启用 I2C 接口:sudo raspi-config → 接口选项 → I2C;
3.验证传感器连接是否正常:i2cdetect -y 1(正常情况下会显示 0x58、0x62、0x63);
4.或直接启用仿真模式运行:python main.py --simulate。
性能调优
提升数据吞吐量
1.增大批处理大小:--batch-size 50;
2.减小批处理时间间隔:--interval 2.0;
3.监控 Snowflake 平台成本(按吞吐量计费)。
降低使用成本
1.降低批处理频率:--interval 15.0;
2.减小批处理大小:--batch-size 5。
预期性能指标
数据接入延迟:端到端 5-10 秒;
最大吞吐量:单表每秒 10GB(Snowflake 平台限制);
API 调用开销:每次 REST 调用约 100-200 毫秒。
安全最佳实践
切勿提交凭证至代码仓库:
rsa_key.p8 和 rsa_key.pub 已加入.gitignore 忽略列表;
snowflake_config.json 配置文件也已设置为忽略。
保护私钥文件:
chmod 600 rsa_key.p8
使用专用的 Snowflake 服务账号:
创建仅拥有最小必要权限的专用用户;
仅为该用户授予指定数据管道的操作和监控权限。
定期轮换密钥:
Snowflake 平台支持密钥轮换功能;
生成新的密钥对,并在 Snowflake 中更新公钥配置。
监控账号访问记录:
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY WHERE USER_NAME = ‘THERMAL_STREAMING_USER’ ORDER BY EVENT_TIMESTAMP DESC
配置 Systemd 服务(开机自启)
创建 systemd 服务文件,实现应用程序开机自动运行:
sudo nano /etc/systemd/system/thermal-streaming.service
[Unit]Description=Thermal Sensor Streaming to SnowflakeAfter=network.target
[Service]Type=simpleUser=piWorkingDirectory=/home/pi/RPIThermalStreamingEnvironment="PATH=/home/pi/RPIThermalStreaming/venv/bin"ExecStart=/home/pi/RPIThermalStreaming/venv/bin/python main.pyRestart=alwaysRestartSec=10[Install]WantedBy=multi-user.target
启用并启动服务:
sudo systemctl enable thermal-streamingsudo systemctl start thermal-streamingsudo systemctl status thermal-streaming
查看服务运行日志:
sudo journalctl -u thermal-streaming -f
许可证:本项目仅作演示使用,按现状提供,无任何形式的担保。
技术支持
Snowflake: Snowflake Documentation
https://docs.snowflake.com/en/
树莓派传感器: Adafruit Learning System
https://learn.adafruit.com/
官方网站:https://edatec.cn/zh/cm0
淘宝店铺:https://edatec.taobao.com/
222