跳到主要内容

OCS-Controller MQTT数据采集与转换实践指南

· 阅读需 18 分钟

基于 MQTT 的数采指导

一、引言

本指南旨在详细介绍如何利用 OCS-Controller,将来自第三方 MQTT 服务或设备的非“渊联”标准 IoT 数据,经过处理后转换为符合“渊联”标准的数据格式,并推送到“魔方” MQTT 服务。

通过本指南,您将学习如何实现以下功能:

  • 场景案例分析:深入理解数据采集的需求与挑战。
  • IoT 平台操作:涵盖产品创建、功能定义及设备创建的全过程。
  • OCS-Controller 操作
    1. 设备数据模拟控制流:模拟外部设备数据上报。
    2. 数据上报控制流:实现数据清洗、转换并上报至 IoT 平台。

二、场景案例

场景描述: 假设我们有一个环境检测设备,它会定期将当前的环境数据上报到 98 服务的 MQTT Server。然而,该硬件厂商上报的数据格式固定如下所示,与 IoT 平台要求的标准数据格式不符。因此,我们需要将原始数据进行清洗和转换,提取 locationsensor_datadevice_status 中的关键数据,并将其转换为标准的 IoT 上报格式。

原始设备上报数据示例:

{
"device_id": "env_monitor_001",
"timestamp": "2025-08-14T14:30:00.123Z",
"location": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304"
},
"sensor_data": {
"temperature": {
"value": 25.5,
"unit": "°C",
"status": "normal"
},
"humidity": {
"value": 60.2,
"unit": "%RH",
"status": "normal"
},
"pm2_5": {
"value": 35.8,
"unit": "μg/m³",
"status": "normal"
}
},
"device_status": {
"battery_level": 85,
"signal_strength": -65,
"network_type": "WiFi",
"firmware_version": "1.2.0",
"uptime_seconds": 36000,
"is_online": true,
"error_code": null,
"last_reboot_reason": "power_on"
},
"metadata": {
"data_schema_version": "1.0",
"manufacturer": "Yelink",
"model": "EM-2000"
}
}

目标 IoT 平台标准上报数据格式: 我们需要将上述原始数据中的 locationsensor_datadevice_status 的部分数据,转换为以下标准 IoT 数据格式,并通过 MQTT 写入节点上报到 IoT 平台。

{
"params": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304",
"temperature": 25.5,
"humidity": 60.2,
"pm2_5": 35.8,
"battery_level": 85,
"network_type": "WiFi"
},
"method": "thing.event.property.post"
}

数据采集流程概览:

数据采集流程概览图

三、IoT 平台操作

1、创建产品

创建产品步骤一

创建产品步骤二

2、定义产品模型

定义产品模型步骤一

定义产品模型步骤二

根据标准 IoT 上报数据格式中的 params 对象,自定义功能字段。这些字段将与我们通过 MQTT 上传的 params 字段一一对应。

{
"params": {
"latitude": 34.0522,
"longitude": -118.2437,
"description": "深圳市软件产业基地5B-304",
"temperature": 25.5,
"humidity": 60.2,
"pm2_5": 35.8,
"battery_level": 85,
"network_type": "WiFi"
},
"method": "thing.event.property.post"
}

自定义功能字段示例

按照 params 对象,完整的功能定义字段如下:

完整功能定义字段

3、创建关联设备

创建关联设备步骤一

创建关联设备步骤二

设备创建后,初始状态为“未激活”,因为此时我们尚未向该设备推送任何数据。

设备未激活状态

点击“查看”按钮,您可以获取设备详细信息,包括后续在 OCS-Controller 中推送数据所需的主题(topic)和推送数据格式。

设备详情页面

四、OCS-Controller 操作

以下操作可在任何 OCS-Controller 实例上进行,本指南以数字工厂环境的 OCS-Controller 实例为例。

OCS-Controller 实例界面

1、设备数据模拟控制流

设备数据模拟控制流由以下三个核心节点组成:

  • 注入(Inject)节点:用于周期性触发数据流。
  • 函数(Function)节点:用于生成模拟设备数据。
  • 写入 MQTT(MQTT Out)节点:用于将模拟数据发送到指定的 MQTT Broker

操作步骤:

  1. 新建流程新建流程

  2. 重命名流程:双击流程 tab 页,将名称修改为“数采示例”,或使用默认名称。 重命名流程

  3. 配置注入节点: 将“注入”节点拖入流程画布,双击配置。在配置面板下方,将“重复”选项设置为“周期性执行”,并将间隔周期设置为 3 秒。 配置注入节点一 配置注入节点二

  4. 配置函数节点(模拟设备数据): 拖入一个“函数”节点并将其与“注入”节点相连。双击“函数”节点,输入以下 JavaScript 代码。这段代码的作用是模拟设备数据,当“注入”节点每 3 秒触发一次时,“函数”节点会执行一次代码,生成新的模拟数据。 配置函数节点

    function setRandomSensorData(jsonData) {
    const newJsonData = JSON.parse(JSON.stringify(jsonData));

    // 设置随机温度 (例如: 10.0 到 40.0 °C),保留一位小数
    newJsonData.sensor_data.temperature.value = parseFloat((Math.random() * (30) + 10).toFixed(1));

    // 设置随机湿度 (例如: 30.0 到 90.0 %RH),保留一位小数
    newJsonData.sensor_data.humidity.value = parseFloat((Math.random() * (60) + 30).toFixed(1));

    // 设置随机PM2.5 (例如: 10.0 到 100.0 μg/m³),保留一位小数
    newJsonData.sensor_data.pm2_5.value = parseFloat((Math.random() * (90) + 10).toFixed(1));

    // 更新时间戳,模拟数据为当前生成时间
    newJsonData.timestamp = new Date().toISOString();

    return newJsonData;
    }

    const originalData = {
    "device_id": "env_monitor_001",
    "timestamp": "2025-08-14T14:30:00.123Z",
    "location": {
    "latitude": 34.0522,
    "longitude": -118.2437,
    "description": "深圳市软件产业基地5B-304"
    },
    "sensor_data": {
    "temperature": {
    "value": 25.5,
    "unit": "°C",
    "status": "normal"
    },
    "humidity": {
    "value": 60.2,
    "unit": "%RH",
    "status": "normal"
    },
    "pm2_5": {
    "value": 35.8,
    "unit": "μg/m³",
    "status": "normal"
    }
    },
    "device_status": {
    "battery_level": 85,
    "signal_strength": -65,
    "network_type": "WiFi",
    "firmware_version": "1.2.0",
    "uptime_seconds": 36000,
    "is_online": true,
    "error_code": null,
    "last_reboot_reason": "power_on"
    },
    "metadata": {
    "data_schema_version": "1.0",
    "manufacturer": "Yelink",
    "model": "EM-2000"
    }
    };

    // 调用函数并获取带有随机值的新数据
    const randomData = setRandomSensorData(originalData);
    // console.log(JSON.stringify(randomData, null, 2)); // 调试时可启用
    msg.payload = randomData;
    return msg;
  5. 配置写入 MQTT 节点(发布模拟数据): 拖入一个“写入 MQTT”节点,并将其与“函数”节点相连。 添加写入 MQTT 节点

    双击“写入 MQTT”节点,点击“服务端”配置右侧的铅笔图标,添加 MQTT Broker 配置。Broker 的服务端 IP 和端口请根据实际可访问的 MQTT 服务填写。 配置写入 MQTT 服务端

    Broker 配置示例: 本示例使用的是另一台内网服务上安装的 MQTT。实际操作时,请根据您的环境填写,例如使用“魔方”安装的 MQTT,默认端口为 1883IP 为当前“魔方”的 IP 地址。 MQTT Broker 配置

  6. MQTT 写入节点配置: 选择刚刚新建的 MQTT Broker。主题可以自定义,但需遵循 MQTT 规则。具体可参考 MQTT 主题高级特性。 本示例中,我们将上传主题设置为:/env/sensor/realtime/properties。 点击“完成”。 MQTT 写入节点配置

  7. 部署流程: 点击右上角的“部署”按钮。 部署流程

  8. 验证数据上报: 为了验证数据是否成功上报,拖入一个“订阅 MQTT”节点。双击打开配置界面,选择刚刚新建的 MQTT Broker,并填写主题:/env/sensor/realtime/properties添加订阅 MQTT 节点 配置订阅 MQTT 节点

    再拖入一个“日志输出”(Debug)节点,将配置好的“订阅 MQTT”节点与“日志输出”节点相连。点击“部署”后,如果一切正常,您将在调试窗口看到每隔 3 秒输出一条数据。 验证数据上报

  9. 完整的设备数据模拟控制流: 以下是完整的设备数据模拟控制流 JSON 配置,您可以直接复制并导入。请注意,导入后需要根据实际环境调整 BrokerIP 地址。

    [
    {
    "id": "960ae230906e1293",
    "type": "inject",
    "z": "4d55701a329e5413",
    "name": "3s 触发一次",
    "props": [
    {
    "p": "payload"
    },
    {
    "p": "topic",
    "vt": "str"
    }
    ],
    "repeat": "3",
    "crontab": "",
    "once": false,
    "onceDelay": 0.1,
    "topic": "",
    "payload": "",
    "payloadType": "date",
    "x": 190,
    "y": 160,
    "wires": [
    [
    "bde0743076feea85"
    ]
    ]
    },
    {
    "id": "faac6e0cc4a641a6",
    "type": "mqtt out",
    "z": "4d55701a329e5413",
    "name": "上传主题:/env/sensor/realtime/properties",
    "topic": "/env/sensor/realtime/properties",
    "qos": "",
    "retain": "",
    "respTopic": "",
    "contentType": "",
    "userProps": "",
    "correl": "",
    "expiry": "",
    "broker": "69c9138ba19bfbdf",
    "x": 780,
    "y": 160,
    "wires": []
    },
    {
    "id": "bde0743076feea85",
    "type": "function",
    "z": "4d55701a329e5413",
    "name": "数据mock,模拟设备数据",
    "func": "function setRandomSensorData(jsonData) {\n const newJsonData = JSON.parse(JSON.stringify(jsonData));\n\n // 设置随机温度 (例如: 10.0 到 40.0 °C)\n // 确保随机数有小数位,模拟真实传感器数据\n newJsonData.sensor_data.temperature.value = parseFloat((Math.random() * (30) + 10).toFixed(1));\n\n // 设置随机湿度 (例如: 30.0 到 90.0 %RH)\n newJsonData.sensor_data.humidity.value = parseFloat((Math.random() * (60) + 30).toFixed(1));\n\n // 设置随机PM2.5 (例如: 10.0 到 100.0 μg/m³)\n newJsonData.sensor_data.pm2_5.value = parseFloat((Math.random() * (90) + 10).toFixed(1));\n\n // 更新时间戳,模拟数据是当前生成的\n newJsonData.timestamp = new Date().toISOString();\n\n return newJsonData;\n}\n\nconst originalData = {\n \"device_id\": \"env_monitor_001\",\n \"timestamp\": \"2025-08-14T14:30:00.123Z\",\n \"location\": {\n \"latitude\": 34.0522,\n \"longitude\": -118.2437,\n \"description\": \"深圳市软件产业基地5B-304\"\n },\n \"sensor_data\": {\n \"temperature\": {\n \"value\": 25.5,\n \"unit\": \"°C\",\n \"status\": \"normal\"\n },\n \"humidity\": {\n \"value\": 60.2,\n \"unit\": \"%RH\",\n \"status\": \"normal\"\n },\n \"pm2_5\": {\n \"value\": 35.8,\n \"unit\": \"μg/m³\",\n \"status\": \"normal\"\n }\n },\n \"device_status\": {\n \"battery_level\": 85,\n \"signal_strength\": -65,\n \"network_type\": \"WiFi\",
    \"firmware_version\": \"1.2.0\",\n \"uptime_seconds\": 36000,\n \"is_online\": true,\n \"error_code\": null,\n \"last_reboot_reason\": \"power_on\"\n },\n \"metadata\": {\n \"data_schema_version\": \"1.0\",\n \"manufacturer\": \"Yelink\",\n \"model\": \"EM-2000\"\n }\n};\n\n// 调用函数并获取带有随机值的新数据\nconst randomData = setRandomSensorData(originalData);\n// console.log(JSON.stringify(randomData, null, 2));\nmsg.payload = randomData;\nreturn msg;",
    "outputs": 1,
    "noerr": 0,
    "initialize": "",
    "finalize": "",
    "libs": [],
    "x": 430,
    "y": 160,
    "wires": [
    [
    "faac6e0cc4a641a6"
    ]
    ]
    },
    {
    "id": "69c9138ba19bfbdf",
    "type": "mqtt-broker",
    "name": "98MQTT",
    "broker": "192.168.102.98",
    "port": "1883",
    "clientid": "mqttx_e9931bad",
    "autoConnect": true,
    "usetls": false,
    "protocolVersion": "5",
    "keepalive": "60",
    "cleansession": true,
    "birthTopic": "",
    "birthQos": "0",
    "birthRetain": "false",
    "birthPayload": "",
    "birthMsg": {},
    "closeTopic": "",
    "closeQos": "0",
    "closeRetain": "false",
    "closePayload": "",
    "closeMsg": {},
    "willTopic": "",
    "willQos": "0",
    "willRetain": "false",
    "willPayload": "",
    "willMsg": {},
    "userProps": "",
    "sessionExpiry": ""
    }
    ]

2、数据上报控制流(数据清洗与转换)

在“设备数据模拟”的步骤 8 中,我们已经通过“订阅 MQTT”节点采集到了原始设备数据。接下来,我们将在此节点之后继续添加其他节点,以完成原始数据的转换并上报至 IoT 平台。

  1. 添加函数节点(数据转换): 在之前拖入的“订阅 MQTT”节点(即“设备数据模拟”步骤 8 中的节点)后面,新增一个“函数”节点。 添加数据转换函数节点

    双击该节点并添加以下 JavaScript 代码。这段代码将从原始设备上报数据中提取所需字段,并组装成 IoT 平台的标准格式。该格式包含一个 params 对象和一个固定值 method: "thing.event.property.post" 字段。params 中的字段应与您在 IoT 平台定义产品时设置的功能参数字段保持一致。

    {
    "params": {
    "latitude": "xxx",
    // ... 其他字段
    },
    "method": "thing.event.property.post"
    }
    // 订阅节点接收到的消息,可能需要 JSON.parse(msg.payload) 进行转换(取决于平台配置)
    const sensorData = msg.payload;

    // 组装成标准的 IoT 数据格式
    const publishMsg = {
    params: {
    latitude: sensorData.location.latitude,
    longitude: sensorData.location.longitude,
    description: sensorData.location.description,
    temperature: sensorData.sensor_data.temperature.value,
    humidity: sensorData.sensor_data.humidity.value,
    pm2_5: sensorData.sensor_data.pm2_5.value,
    battery_level: sensorData.device_status.battery_level,
    network_type: sensorData.device_status.network_type
    },
    method: "thing.event.property.post" // 固定值:设备属性上报
    };
    // 将组装好的参数赋值给 msg.payload,写入 MQTT 节点会自动读取并发送
    msg.payload = publishMsg;
    return msg;
  2. 添加写入 MQTT 节点(上报至 IoT 平台): 在“函数”节点后面添加一个“写入 MQTT”节点,用于将上一步函数节点组装好的标准数据发送到 IoT 平台的设备。 添加写入 IoT 平台 MQTT 节点

    双击该节点配置一个新的 Broker,此 Broker 用于连接“魔方”的 MQTT 服务。 配置魔方 MQTT Broker

    Broker 具体配置: 其中,客户端 ID 是在 IoT 平台上创建设备时生成的“设备编码”和“ProductKey”的组合,格式为:${设备编码}&${ProductKey}。您可以在 IoT 平台的设备列表中点击“查看”按钮获取详细信息。 魔方 MQTT Broker 配置详情 客户端 ID 示例

  3. 配置写入 MQTT 节点(上报主题): 配置好 Broker 后,返回“写入 MQTT”节点,选择刚刚创建的“魔方 MQTT Broker”,并添加上报主题。主题信息同样可在设备详情界面获取。 配置写入 MQTT 上报主题 上报主题示例

  4. 部署流程: 配置完成后,点击“部署”。 部署数据上报流程

  5. 查看上报结果: 返回 IoT 平台,进入设备详情页。 点击“物模型数据”选项卡。 查看物模型数据

    在“实时数据”部分,您将能看到 OCS-Controller 成功上报的设备数据。 实时数据展示

  6. 完整的上报控制流: 以下是完整的上报控制流 JSON 配置,您可以直接复制并导入。请注意,导入后需要根据实际环境修改 Broker 配置。

    [
    {
    "id": "ea7e903820f46e83",
    "type": "mqtt in",
    "z": "4d55701a329e5413",
    "name": "订阅主题:/env/sensor/realtime/properties",
    "topic": "/env/sensor/realtime/properties",
    "qos": "2",
    "datatype": "auto-detect",
    "broker": "69c9138ba19bfbdf",
    "nl": false,
    "rap": true,
    "rh": 0,
    "inputs": 0,
    "x": 260,
    "y": 320,
    "wires": [
    [
    "f1557fc664280bf1"
    ]
    ]
    },
    {
    "id": "f1557fc664280bf1",
    "type": "function",
    "z": "4d55701a329e5413",
    "name": "数据转换",
    "func": "// 订阅节点接收到的消息,可能需要 JSON.parse(msg.payload) 进行转换(取决于平台配置)\nconst sensorData = msg.payload;\n\n// 组装成标准的 IoT 数据格式,其中 method 参数是一个固定值:\"thing.event.property.post\"\nconst publishMsg = {\n params: {\n latitude: sensorData.location.latitude,\n longitude: sensorData.location.longitude,\n description: sensorData.location.description,\n temperature: sensorData.sensor_data.temperature.value,\n humidity: sensorData.sensor_data.humidity.value,\n pm2_5: sensorData.sensor_data.pm2_5.value,\n battery_level: sensorData.device_status.battery_level,\n network_type: sensorData.device_status.network_type\n },\n method: \"thing.event.property.post\" // 固定值\n};\n// 将组装好的参数赋值给 msg.payload,写入 MQTT 节点会自动读取并发送\nmsg.payload = publishMsg;\nreturn msg;",
    "outputs": 1,
    "noerr": 0,
    "initialize": "",
    "finalize": "",
    "libs": [],
    "x": 540,
    "y": 320,
    "wires": [
    [
    "73a1058a187311ee"
    ]
    ]
    },
    {
    "id": "73a1058a187311ee",
    "type": "mqtt out",
    "z": "4d55701a329e5413",
    "name": "写入设备:env_monitor_industry_base_5b304",
    "topic": "/sys/CP_7k1bmb2igbr/env_monitor_industry_base_5b304/thing/event/property/post",
    "qos": "",
    "retain": "",
    "respTopic": "",
    "contentType": "",
    "userProps": "",
    "correl": "",
    "expiry": "",
    "broker": "1911e61bcd1b4f2a",
    "x": 840,
    "y": 320,
    "wires": []
    },
    {
    "id": "69c9138ba19bfbdf",
    "type": "mqtt-broker",
    "name": "98MQTT",
    "broker": "192.168.102.98",
    "port": "1883",
    "clientid": "mqttx_e9931bad",
    "autoConnect": true,
    "usetls": false,
    "protocolVersion": "5",
    "keepalive": "60",
    "cleansession": true,
    "birthTopic": "",
    "birthQos": "0",
    "birthRetain": "false",
    "birthPayload": "",
    "birthMsg": {},
    "closeTopic": "",
    "closeQos": "0",
    "closeRetain": "false",
    "closePayload": "",
    "closeMsg": {},
    "willTopic": "",
    "willQos": "0",
    "willRetain": "false",
    "willPayload": "",
    "willMsg": {},
    "userProps": "",
    "sessionExpiry": ""
    },
    {
    "id": "1911e61bcd1b4f2a",
    "type": "mqtt-broker",
    "name": "数字工厂-数采",
    "broker": "172.16.0.3",
    "port": "1883",
    "clientid": "env_monitor_industry_base_5b304&CP_7k1bmb2igbr",
    "autoConnect": true,
    "usetls": false,
    "protocolVersion": "5",
    "keepalive": "60",
    "cleansession": true,
    "birthTopic": "",
    "birthQos": "0",
    "birthRetain": "false",
    "birthPayload": "",
    "birthMsg": {},
    "closeTopic": "",
    "closeQos": "0",
    "closeRetain": "false",
    "closePayload": "",
    "closeMsg": {},
    "willTopic": "",
    "willQos": "0",
    "willRetain": "false",
    "willPayload": "",
    "willMsg": {},
    "userProps": "",
    "sessionExpiry": ""
    }
    ]