OCS-Controller MQTT数据采集与转换实践指南
基于 MQTT 的数采指导
一、引言
本指南旨在详细介绍如何利用 OCS-Controller,将来自第三方 MQTT 服务或设备的非“渊联”标准 IoT 数据,经过处理后转换为符合“渊联”标准的数据格式,并推送到“魔方” MQTT 服务。
通过本指南,您将学习如何实现以下功能:
- 场景案例分析:深入理解数据采集的需求与挑战。
- IoT 平台操作:涵盖产品创建、功能定义及设备创建的全过程。
- OCS-Controller 操作:
- 设备数据模拟控制流:模拟外部设备数据上报。
- 数据上报控制流:实现数据清洗、转换并上报至
IoT平台。
二、场景案例
场景描述:
假设我们有一个环境检测设备,它会定期将当前的环境数据上报到 98 服务的 MQTT Server。然而,该硬件厂商上报的数据格式固定如下所示,与 IoT 平台要求的标准数据格式不符。因此,我们需要将原始数据进行清洗和转换,提取 location、sensor_data 和 device_status 中的关键数据,并将其转换为标准的 IoT 上报格式。
原始设备上报数据示例:
{
"device_id": "env_monitor_001",
"timestamp": "2025-08-14T14:30:00.123Z",
"location": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304"
},
"sensor_data": {
"temperature": {
"value": 25.5,
"unit": "°C",
"status": "normal"
},
"humidity": {
"value": 60.2,
"unit": "%RH",
"status": "normal"
},
"pm2_5": {
"value": 35.8,
"unit": "μg/m³",
"status": "normal"
}
},
"device_status": {
"battery_level": 85,
"signal_strength": -65,
"network_type": "WiFi",
"firmware_version": "1.2.0",
"uptime_seconds": 36000,
"is_online": true,
"error_code": null,
"last_reboot_reason": "power_on"
},
"metadata": {
"data_schema_version": "1.0",
"manufacturer": "Yelink",
"model": "EM-2000"
}
}
目标 IoT 平台标准上报数据格式:
我们需要将上述原始数据中的 location、sensor_data 和 device_status 的部分数据,转换为以下标准 IoT 数据格式,并通过 MQTT 写入节点上报到 IoT 平台。
{
"params": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304",
"temperature": 25.5,
"humidity": 60.2,
"pm2_5": 35.8,
"battery_level": 85,
"network_type": "WiFi"
},
"method": "thing.event.property.post"
}
数据采集流程概览:

三、IoT 平台操作
1、创建产品


2、定义产品模型


根据标准 IoT 上报数据格式中的 params 对象,自定义功能字段。这些字段将与我们通过 MQTT 上传的 params 字段一一对应。
{
"params": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304",
"temperature": 25.5,
"humidity": 60.2,
"pm2_5": 35.8,
"battery_level": 85,
"network_type": "WiFi"
},
"method": "thing.event.property.post"
}

按照 params 对象,完整的功能定义字段如下:

3、创建关联设备


设备创建后,初始状态为“未激活”,因为此时我们尚未向该设备推送任何数据。

点击“查看”按钮,您可以获取设备详细信息,包括后续在 OCS-Controller 中推送数据所需的主题(topic)和推送数据格式。

四、OCS-Controller 操作
以下操作可在任何 OCS-Controller 实例上进行,本指南以数字工厂环境的 OCS-Controller 实例为例。

1、设备数据模拟控制流
设备数据模拟控制流由以下三个核心节点组成:
- 注入(Inject)节点:用于周期性触发数据流。
- 函数(Function)节点:用于生成模拟设备数据。
- 写入 MQTT(MQTT Out)节点:用于将模拟数据发送到指定的
MQTT Broker。
操作步骤:
-
新建流程:

-
重命名流程:双击流程
tab页,将名称修改为“数采示例”,或使用默认名称。
-
配置注入节点: 将“注入”节点拖入流程画布,双击配置。在配置面板下方,将“重复”选项设置为“周期性执行”,并将间隔周期设置为
3秒。

-
配置函数节点(模拟设备数据): 拖入一个“函数”节点并将其与“注入”节点相连。双击“函数”节点,输入以下
JavaScript代码。这段代码的作用是模拟设备数据,当“注入”节点每3秒触发一次时,“函数”节点会执行一次代码,生成新的模拟数据。
function setRandomSensorData(jsonData) {
const newJsonData = JSON.parse(JSON.stringify(jsonData));
// 设置随机温度 (例如: 10.0 到 40.0 °C),保留一位小数
newJsonData.sensor_data.temperature.value = parseFloat((Math.random() * (30) + 10).toFixed(1));
// 设置随机湿度 (例如: 30.0 到 90.0 %RH),保留一位小数
newJsonData.sensor_data.humidity.value = parseFloat((Math.random() * (60) + 30).toFixed(1));
// 设置随机PM2.5 (例如: 10.0 到 100.0 μg/m³),保留一位小数
newJsonData.sensor_data.pm2_5.value = parseFloat((Math.random() * (90) + 10).toFixed(1));
// 更新时间戳,模拟数据为当前生成时间
newJsonData.timestamp = new Date().toISOString();
return newJsonData;
}
const originalData = {
"device_id": "env_monitor_001",
"timestamp": "2025-08-14T14:30:00.123Z",
"location": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304"
},
"sensor_data": {
"temperature": {
"value": 25.5,
"unit": "°C",
"status": "normal"
},
"humidity": {
"value": 60.2,
"unit": "%RH",
"status": "normal"
},
"pm2_5": {
"value": 35.8,
"unit": "μg/m³",
"status": "normal"
}
},
"device_status": {
"battery_level": 85,
"signal_strength": -65,
"network_type": "WiFi",
"firmware_version": "1.2.0",
"uptime_seconds": 36000,
"is_online": true,
"error_code": null,
"last_reboot_reason": "power_on"
},
"metadata": {
"data_schema_version": "1.0",
"manufacturer": "Yelink",
"model": "EM-2000"
}
};
// 调用函数并获取带有随机值的新数据
const randomData = setRandomSensorData(originalData);
// console.log(JSON.stringify(randomData, null, 2)); // 调试时可启用
msg.payload = randomData;
return msg; -
配置写入 MQTT 节点(发布模拟数据): 拖入一个“写入 MQTT”节点,并将其与“函数”节点相连。

双击“写入 MQTT”节点,点击“服务端”配置右侧的铅笔图标,添加
MQTT Broker配置。Broker的服务端IP和端口请根据实际可访问的MQTT服务填写。
Broker配置示例: 本示例使用的是另一台内网服务上安装的MQTT。实际操作时,请根据您的环境填写,例如使用“魔方”安装的MQTT,默认端口为1883,IP为当前“魔方”的IP地址。
-
MQTT 写入节点配置: 选择刚刚新建的
MQTT Broker。主题可以自定义,但需遵循MQTT规则。具体可参考 MQTT 主题高级特性。 本示例中,我们将上传主题设置为:/env/sensor/realtime/properties。 点击“完成”。
-
部署流程: 点击右上角的“部署”按钮。

-
验证数据上报: 为了验证数据是否成功上报,拖入一个“订阅 MQTT”节点。双击打开配置界面,选择刚刚新建的
MQTT Broker,并填写主题:/env/sensor/realtime/properties。

再拖入一个“日志输出”(
Debug)节点,将配置好的“订阅 MQTT”节点与“日志输出”节点相连。点击“部署”后,如果一切正常,您将在调试窗口看到每隔3秒输出一条数据。
-
完整的设备数据模拟控制流: 以下是完整的设备数据模拟控制流
JSON配置,您可以直接复制并导入。请注意,导入后需要根据实际环境调整Broker的IP地址。[
{
"id": "960ae230906e1293",
"type": "inject",
"z": "4d55701a329e5413",
"name": "3s 触发一次",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "3",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 190,
"y": 160,
"wires": [
[
"bde0743076feea85"
]
]
},
{
"id": "faac6e0cc4a641a6",
"type": "mqtt out",
"z": "4d55701a329e5413",
"name": "上传主题:/env/sensor/realtime/properties",
"topic": "/env/sensor/realtime/properties",
"qos": "",
"retain": "",
"respTopic": "",
"contentType": "",
"userProps": "",
"correl": "",
"expiry": "",
"broker": "69c9138ba19bfbdf",
"x": 780,
"y": 160,
"wires": []
},
{
"id": "bde0743076feea85",
"type": "function",
"z": "4d55701a329e5413",
"name": "数据mock,模拟设备数据",
"func": "function setRandomSensorData(jsonData) {\n const newJsonData = JSON.parse(JSON.stringify(jsonData));\n\n // 设置随机温度 (例如: 10.0 到 40.0 °C)\n // 确保随机数有小数位,模拟真实传感器数据\n newJsonData.sensor_data.temperature.value = parseFloat((Math.random() * (30) + 10).toFixed(1));\n\n // 设置随机湿度 (例如: 30.0 到 90.0 %RH)\n newJsonData.sensor_data.humidity.value = parseFloat((Math.random() * (60) + 30).toFixed(1));\n\n // 设置随机PM2.5 (例如: 10.0 到 100.0 μg/m³)\n newJsonData.sensor_data.pm2_5.value = parseFloat((Math.random() * (90) + 10).toFixed(1));\n\n // 更新时间戳,模拟数据是当前生成的\n newJsonData.timestamp = new Date().toISOString();\n\n return newJsonData;\n}\n\nconst originalData = {\n \"device_id\": \"env_monitor_001\",\n \"timestamp\": \"2025-08-14T14:30:00.123Z\",\n \"location\": {\n \"latitude\": 34.0522,\n \"longitude\": -118.2437,\n \"description\": \"深圳市软件产业基地5B-304\"\n },\n \"sensor_data\": {\n \"temperature\": {\n \"value\": 25.5,\n \"unit\": \"°C\",\n \"status\": \"normal\"\n },\n \"humidity\": {\n \"value\": 60.2,\n \"unit\": \"%RH\",\n \"status\": \"normal\"\n },\n \"pm2_5\": {\n \"value\": 35.8,\n \"unit\": \"μg/m³\",\n \"status\": \"normal\"\n }\n },\n \"device_status\": {\n \"battery_level\": 85,\n \"signal_strength\": -65,\n \"network_type\": \"WiFi\",
\"firmware_version\": \"1.2.0\",\n \"uptime_seconds\": 36000,\n \"is_online\": true,\n \"error_code\": null,\n \"last_reboot_reason\": \"power_on\"\n },\n \"metadata\": {\n \"data_schema_version\": \"1.0\",\n \"manufacturer\": \"Yelink\",\n \"model\": \"EM-2000\"\n }\n};\n\n// 调用函数并获取带有随机值的新数据\nconst randomData = setRandomSensorData(originalData);\n// console.log(JSON.stringify(randomData, null, 2));\nmsg.payload = randomData;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 430,
"y": 160,
"wires": [
[
"faac6e0cc4a641a6"
]
]
},
{
"id": "69c9138ba19bfbdf",
"type": "mqtt-broker",
"name": "98MQTT",
"broker": "192.168.102.98",
"port": "1883",
"clientid": "mqttx_e9931bad",
"autoConnect": true,
"usetls": false,
"protocolVersion": "5",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthRetain": "false",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closeRetain": "false",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willRetain": "false",
"willPayload": "",
"willMsg": {},
"userProps": "",
"sessionExpiry": ""
}
]
2、数据上报控制流(数据清洗与转换)
在“设备数据模拟”的步骤 8 中,我们已经通过“订阅 MQTT”节点采集到了原始设备数据。接下来,我们将在此节点之后继续添加其他节点,以完成原始数据的转换并上报至 IoT 平台。
-
添加函数节点(数据转换): 在之前拖入的“订阅 MQTT”节点(即“设备数据模拟”步骤 8 中的节点)后面,新增一个“函数”节点。

双击该节点并添加以下
JavaScript代码。这段代码将从原始设备上报数据中提取所需字段,并组装成IoT平台的标准格式。该格式包含一个params对象和一个固定值method: "thing.event.property.post"字段。params中的字段应与您在IoT平台定义产品时设置的功能参数字段保持一致。{
"params": {
"latitude": "xxx",
// ... 其他字段
},
"method": "thing.event.property.post"
}// 订阅节点接收到的消息,可能需要 JSON.parse(msg.payload) 进行转换(取决于平台配置)
const sensorData = msg.payload;
// 组装成标准的 IoT 数据格式
const publishMsg = {
params: {
latitude: sensorData.location.latitude,
longitude: sensorData.location.longitude,
description: sensorData.location.description,
temperature: sensorData.sensor_data.temperature.value,
humidity: sensorData.sensor_data.humidity.value,
pm2_5: sensorData.sensor_data.pm2_5.value,
battery_level: sensorData.device_status.battery_level,
network_type: sensorData.device_status.network_type
},
method: "thing.event.property.post" // 固定值:设备属性上报
};
// 将组装好的参数赋值给 msg.payload,写入 MQTT 节点会自动读取并发送
msg.payload = publishMsg;
return msg; -
添加写入 MQTT 节点(上报至 IoT 平台): 在“函数”节点后面添加一个“写入 MQTT”节点,用于将上一步函数节点组装好的标准数据发送到
IoT平台的设备。
双击该节点配置一个新的
Broker,此Broker用于连接“魔方”的MQTT服务。
Broker具体配置: 其中,客户端ID是在IoT平台上创建设备时生成的“设备编码”和“ProductKey”的组合,格式为:${设备编码}&${ProductKey}。您可以在IoT平台的设备列表中点击“查看”按钮获取详细信息。

-
配置写入 MQTT 节点(上报主题): 配置好
Broker后,返回“写入MQTT”节点,选择刚刚创建的“魔方MQTT Broker”,并添加上报主题。主题信息同样可在设备详情界面获取。

-
部署流程: 配置完成后,点击“部署”。

-
查看上报结果: 返回
IoT平台,进入设备详情页。 点击“物模型数据”选项卡。
在“实时数据”部分,您将能看到
OCS-Controller成功上报的设备数据。
-
完整的上报控制流: 以下是完整的上报控制流
JSON配置,您可以直接复制并导入。请注意,导入后需要根据实际环境修改Broker配置。[
{
"id": "ea7e903820f46e83",
"type": "mqtt in",
"z": "4d55701a329e5413",
"name": "订阅主题:/env/sensor/realtime/properties",
"topic": "/env/sensor/realtime/properties",
"qos": "2",
"datatype": "auto-detect",
"broker": "69c9138ba19bfbdf",
"nl": false,
"rap": true,
"rh": 0,
"inputs": 0,
"x": 260,
"y": 320,
"wires": [
[
"f1557fc664280bf1"
]
]
},
{
"id": "f1557fc664280bf1",
"type": "function",
"z": "4d55701a329e5413",
"name": "数据转换",
"func": "// 订阅节点接收到的消息,可能需要 JSON.parse(msg.payload) 进行转换(取决于平台配置)\nconst sensorData = msg.payload;\n\n// 组装成标准的 IoT 数据格式,其中 method 参数是一个固定值:\"thing.event.property.post\"\nconst publishMsg = {\n params: {\n latitude: sensorData.location.latitude,\n longitude: sensorData.location.longitude,\n description: sensorData.location.description,\n temperature: sensorData.sensor_data.temperature.value,\n humidity: sensorData.sensor_data.humidity.value,\n pm2_5: sensorData.sensor_data.pm2_5.value,\n battery_level: sensorData.device_status.battery_level,\n network_type: sensorData.device_status.network_type\n },\n method: \"thing.event.property.post\" // 固定值\n};\n// 将组装好的参数赋值给 msg.payload,写入 MQTT 节点会自动读取并发送\nmsg.payload = publishMsg;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 540,
"y": 320,
"wires": [
[
"73a1058a187311ee"
]
]
},
{
"id": "73a1058a187311ee",
"type": "mqtt out",
"z": "4d55701a329e5413",
"name": "写入设备:env_monitor_industry_base_5b304",
"topic": "/sys/CP_7k1bmb2igbr/env_monitor_industry_base_5b304/thing/event/property/post",
"qos": "",
"retain": "",
"respTopic": "",
"contentType": "",
"userProps": "",
"correl": "",
"expiry": "",
"broker": "1911e61bcd1b4f2a",
"x": 840,
"y": 320,
"wires": []
},
{
"id": "69c9138ba19bfbdf",
"type": "mqtt-broker",
"name": "98MQTT",
"broker": "192.168.102.98",
"port": "1883",
"clientid": "mqttx_e9931bad",
"autoConnect": true,
"usetls": false,
"protocolVersion": "5",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthRetain": "false",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closeRetain": "false",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willRetain": "false",
"willPayload": "",
"willMsg": {},
"userProps": "",
"sessionExpiry": ""
},
{
"id": "1911e61bcd1b4f2a",
"type": "mqtt-broker",
"name": "数字工厂-数采",
"broker": "172.16.0.3",
"port": "1883",
"clientid": "env_monitor_industry_base_5b304&CP_7k1bmb2igbr",
"autoConnect": true,
"usetls": false,
"protocolVersion": "5",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthRetain": "false",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closeRetain": "false",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willRetain": "false",
"willPayload": "",
"willMsg": {},
"userProps": "",
"sessionExpiry": ""
}
]