基于消息队列定制化业务流程的最佳实践方式
通过实际业务场景,介绍如何基于消息队列定制化业务流程的最佳实践的方式。
场景描述
-
由于客户业务不同,在 DFS 业务中使用了自定义参数字段,在 DFS 系统中新建工单时默认情况下无法将自定义的参数值设置到报功系数中。
-
若为此功能进行定制化开发,使用 OcsController 服务端低代码平台较为合适,且开发成本较低。
-
故通过 OcsController 订阅 DFS 新建生产工单消息,根据自定义参数值设置报功系数,并更新到 DFS 工单中。
解决方案
- 使用 OcsController 「订阅消息」节点接受 DFS 新建生产工单消息。
- 将消息体进行转换,获取自定义参数值。
- 组装更新工单请求体,使用 OcsController 「API调用」节点调用 DFS 更新工单接口。
本案例中流程开发者需要了解的基础技术
- 什么是消息队列
- 什么时字符串、JSON以及两者之间的关系
- 什么是 API,参考表现层状态转换
- 如何在 Apifox 查看接口文档,参考Apifox 帮助文档
- 如何使用 NocoConsole 及 OcsController 低代码平台,参考NocoConsole 介绍、OcsController 介绍
实现细节
1. 整体对接流程如下图所示

2. 订阅消息节点配置如下

其中「客户端」使用默认即可,消息类型选择系统消息,系统应用选择 dfs,主题选择 event_value_chain, 其余配置使用默认即可。
注意:截止本文档编写时,dfs 系统消息结构定义暂不满足订阅配置需求,故不能使用消息过滤功能
3. 分支处理如下

作用:由于 event_value_chain 消息类型过多,且本案例中只需要工单创建的消息,故只处理 workOrder.add 消息类型,其余将会被丢弃,不会继续后续的流程
4. JSON 处理

作用:将消息中的数据(字符串)转换为 JSON 对象,方便后续处理
5. 调整消息属性

作用:获取工单信息对象,将较长的属性访问路径转换为短的路径,方便后续处理
6. API调用节点配置如下

作用:调用 DFS 更新工单接口,更新工单报功系数,其中 coefficient 系数使用 DFS 中的自定义属性字段 customFieldOne 的值
其中请求根路径无需配置,默认会自动使用边缘节点魔方网关地址,本案例中接口类型使用自定义接口,匹配模式使用自定义模版,编辑器中的 JSON 即为实际请求 DFS 工单更新接口的参数,支持模版语法,其中 {{workOrderData.workOrderNumber}} 表示使用流程中的 msg.workOrderData.workOrderNumber,节点在执行时将会自动替换为实际值
相关接口:新增-更新工单
7. 判断调用 DFS 工单更新接口响应结果

作用:判断调用 DFS 工单更新接口响应结果,若成功则结束流程,否则继续输出调用日志
8. 调试
可参考 调试章节
9. 每个节点的配置明细可通过选择节点,在右侧-侧边栏查看节点说明文档

选择节点,点击右侧-侧边栏查看节点说明文档
总结
- 在本案例中流程开发者需要清晰的认识当前需要定制化的业务需求/逻辑
- 在拖拉拽流程前可以简单规划业务流程流转,整体上对流程有一个大致的了解
- 在拖拉拽流程过程中,可能存在对节点功能使用的问题,可通过右侧-侧边栏查看节点说明文档
- 在调试阶段,可以通过 调试章节 提供的工具观察数据流动情况,方便流程开发者更好的理解流程执行逻辑
相关技术说明
订阅消息-Kafka 消费者
订阅消息配置
- 建议一个 OcsController 实例中使用一个 client 配置即可,client 配置为通用配置,可在订阅消息、发布消息节点中使用
- 如果要并行消费,且不共用偏移量时,可以使用同一个 client,使用不同的消费者组 group,这样每个消费者的 offset 都不影响,Kafka 每个 group 消费者组维护各自的偏移量,各自独立
- 同一个 client 或不同的 client 使用相同的 group、相同 topic,Kafka 的单个 Partition 分区仅由组内的一个消费者处理,当消费者组中的消费者大于 Kafka 分区时,多余的分区将处于闲置状态
对于多业务场景使用同一个 topic 的情况:
- 可使用同一个 client、同一个 group、在后续在流程中进行业务分流,如使用分支处理节点进行分流
- 可使用同一个 client、不同 group,实现多个消费者并行消费,且各自消费者偏移量独立
发布消息-Kafka 生产者

如果不太重要的业务可以将 acks 设置为「无需确认」,可避免高并发海量消息场景下发送消息阻塞问题(具体情况受网络、计算资源影响)
API调用
- 所有系统的 API 详细描述可在 工业魔方 API 文档 查看
- 建议在拖拉拽流程前先调试接口的可用性,可使用一个注入节点、一个 API 调用节点,一个日志输出节点进行调试,通过手动触发注入节点运行流程,通过 API 调用节点调用接口,通过日志输出节点查看接口调用结果
- 接口调用失败排查:
- 若有响应,且
msg.apiGatewayRes有值,存在code、messsge、data字段,则说明接口调用已经到达网关或业务系统服务侧,OcsController 正常 - 若返回的
msg.apiGatewayRes不符合预期,可通过msg.apiGatewayRes.message查看错误信息,msg.apiGatewayRes.data查看响应数据 - 可根据响应信息调整请求结构,重新调用
- 其余情况可登录服务器查看业务系统容器日志
- 若以上都无法解决,则需联系具体的业务开发人员
- 若有响应,且
示例流程JSON
在 OcsController 导入即可使用

流程JSON 数据:
[
{
"id": "fbbfc63c789ff907",
"type": "kafka-consumer",
"z": "90884b6014d070ef",
"name": "订阅消息",
"client": "709553632aefcca4",
"groupid": "ocs-controller-1741073302323",
"advancedoptions": false,
"msgFilter": false,
"autocommitinterval": 5000,
"autocommitthreshold": 100,
"sessiontimeout": 30000,
"rebalancetimeout": 60000,
"heartbeatinterval": 3000,
"metadatamaxage": 300000,
"maxbytesperpartition": 1048576,
"minbytes": 1,
"maxbytes": 10485760,
"maxwaittimeinms": 5000,
"frombeginning": false,
"clearoffsets": false,
"allowautotopiccreation": false,
"customTopicInput": "",
"productKey": "CP_k5bk81gebr9",
"devName": "19979_30",
"topicType": "system-msg",
"application": "/api/open/message/docs",
"systemKey": "",
"systemTopic": "event_value_chain",
"x": 160,
"y": 200,
"wires": [
[
"49d6e827257cde06"
]
]
},
{
"id": "49d6e827257cde06",
"type": "switch",
"z": "90884b6014d070ef",
"name": "分支处理",
"property": "payload.key",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "workOrder.add",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 1,
"x": 327,
"y": 200,
"wires": [
[
"5e0c99597db9fbe7"
]
]
},
{
"id": "5e0c99597db9fbe7",
"type": "json",
"z": "90884b6014d070ef",
"name": "JSON 处理",
"property": "payload.value",
"action": "",
"pretty": false,
"x": 491,
"y": 200,
"wires": [
[
"822a1e07a0e2c81d"
]
]
},
{
"id": "822a1e07a0e2c81d",
"type": "change",
"z": "90884b6014d070ef",
"name": "调整消息属性",
"rules": [
{
"t": "move",
"p": "payload.value.params.latestContent",
"pt": "msg",
"to": "workOrderData",
"tot": "msg"
}
],
"action": "",
"property": "",
"from": "",
"to": "",
"reg": false,
"x": 670,
"y": 200,
"wires": [
[
"f113c9b98d8b4fd2"
]
]
},
{
"id": "f113c9b98d8b4fd2",
"type": "open-api",
"z": "90884b6014d070ef",
"name": "",
"baseApiGatewayUrl": "",
"basePath": "/openApi/api",
"apiType": "customAPI",
"applicationOption": "/api/v3/api-docs",
"apiTag": "任务中心/生产工单接口",
"api": "/v1/open/work_orders/upsert",
"description": "唯一标识:工单编号,不存在新增,存在则更新",
"props": [
{
"in": "body",
"name": "workOrderUpdateDTO",
"description": "workOrderUpdateDTO",
"required": true,
"schema": {
"originalRef": "com.yelink.dfs.open.workOrder.dto.WorkOrderUpdateInsertDTO",
"$ref": "#/definitions/com.yelink.dfs.open.workOrder.dto.WorkOrderUpdateInsertDTO"
},
"properties": [
{
"name": "actualApprover",
"value": "",
"type": "str",
"description": "实际审批人"
},
{
"name": "actualBatches",
"value": "",
"type": "num",
"format": "double",
"description": "实际批数"
},
{
"name": "actualEndDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "实际结束时间"
},
{
"name": "actualStartDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "实际开始时间"
},
{
"name": "approvalStatus",
"value": "",
"type": "num",
"format": "int32",
"description": "审批状态"
},
{
"name": "approvalSuggestion",
"value": "",
"type": "str",
"description": "审批建议"
},
{
"name": "approvalTime",
"value": "",
"type": "str",
"format": "date-time",
"description": "审批时间"
},
{
"name": "approver",
"value": "",
"type": "str",
"description": "审核人"
},
{
"name": "assigned",
"value": "",
"type": "bool",
"description": "是否发料"
},
{
"name": "assignmentState",
"value": "",
"type": "str",
"description": "派工状态 toBeAssigned-待派工、assigned-已派工"
},
{
"name": "businessUnitCode",
"value": "",
"type": "str",
"description": "业务主体编码"
},
{
"name": "businessUnitName",
"value": "",
"type": "str",
"description": "业务主体名称"
},
{
"name": "coefficient",
"value": "",
"type": "num",
"format": "double",
"description": "报工系数"
},
{
"name": "craftCode",
"value": "",
"type": "str",
"description": "工艺编号"
},
{
"name": "craftProcedureEntities",
"value": "",
"type": "json",
"description": "绑定的工艺工序列表",
"items": {
"originalRef": "com.yelink.dfs.open.workOrder.dto.CraftProcedureEntityUpsertDTO",
"$ref": "#/definitions/com.yelink.dfs.open.workOrder.dto.CraftProcedureEntityUpsertDTO"
}
},
{
"name": "createBy",
"value": "",
"type": "str",
"description": "创建人"
},
{
"name": "createDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "创建时间"
},
{
"name": "customerCode",
"value": "",
"type": "str",
"description": "客户编号"
},
{
"name": "customerMaterialCode",
"value": "",
"type": "str",
"description": "客户物料编码"
},
{
"name": "deviceCode",
"value": "",
"type": "str",
"description": "设备编码"
},
{
"name": "effectiveHours",
"value": "",
"type": "num",
"format": "double",
"description": "有效工时"
},
{
"name": "endDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "计划截止时间,新增必填"
},
{
"name": "erpDocumentCode",
"value": "",
"type": "str",
"description": "erp关联单据编号"
},
{
"name": "finishCount",
"value": "",
"type": "num",
"format": "double",
"description": "已完成数量"
},
{
"name": "handOverDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "计划交付时间/计划开始生产时间"
},
{
"name": "importTime",
"value": "",
"type": "str",
"format": "date-time",
"description": "数据导入时间"
},
{
"name": "inputTotal",
"value": "",
"type": "num",
"format": "double",
"description": "投入数量"
},
{
"name": "inventoryQuantity",
"value": "",
"type": "num",
"format": "double",
"description": "已入库数"
},
{
"name": "investCheckResult",
"value": "",
"type": "bool",
"description": "投产检查结果(true--通过 false--不通过 null--空)"
},
{
"name": "isUpdateRelateOrder",
"value": "",
"type": "bool",
"description": "存在工单时,是否更新关联订单数据,默认为更新"
},
{
"name": "isUpdateRelevanceResource",
"value": "",
"type": "bool",
"description": "存在工单时,是否更新工单关联资源,默认为更新"
},
{
"name": "issue",
"value": "",
"type": "bool",
"description": "是否下发"
},
{
"name": "lineCode",
"value": "",
"type": "str",
"description": "产线编号"
},
{
"name": "magName",
"value": "",
"type": "str",
"description": "计划员账号"
},
{
"name": "materialCode",
"value": "",
"type": "str",
"description": "物料编号,新增必填"
},
{
"name": "measurementQuantity",
"value": "",
"type": "num",
"format": "double",
"description": "计量重量(工单物料计划数量记录计算用)"
},
{
"name": "measurementUnit",
"value": "",
"type": "str",
"description": "计量单位(工单物料计划数量记录计算用)"
},
{
"name": "noticeUsername",
"value": "",
"type": "str",
"description": "工单接收人(工单完成时通知人员账号,逗号隔开)"
},
{
"name": "operateByApi",
"value": "",
"type": "bool"
},
{
"name": "orderType",
"value": "",
"type": "str",
"description": "单据类型"
},
{
"name": "packageQuantity",
"value": "",
"type": "num",
"format": "double",
"description": "包装数"
},
{
"name": "packageSchemeCode",
"value": "",
"type": "str",
"description": "包装方案编码"
},
{
"name": "pickingQuantity",
"value": "",
"type": "num",
"format": "double",
"description": "已领料数"
},
{
"name": "planQuantity",
"value": "",
"type": "num",
"format": "double",
"description": "计划数量,新增必填"
},
{
"name": "plannedBatches",
"value": "",
"type": "num",
"format": "double",
"description": "计划批数"
},
{
"name": "plansPerBatch",
"value": "",
"type": "num",
"format": "double",
"description": "每批计划数"
},
{
"name": "prenatalStatus",
"value": "",
"type": "bool",
"description": "产前状态"
},
{
"name": "prepared",
"value": "",
"type": "bool",
"description": "是否备料"
},
{
"name": "priority",
"value": "",
"type": "str",
"description": "优先级 正常、优先、加急、特急"
},
{
"name": "productOrderNumber",
"value": "",
"type": "str",
"description": "订单号"
},
{
"name": "relatedProductOrderMaterialLineNumber",
"value": "",
"type": "num",
"format": "int32",
"description": "关联的生产订单物料行号"
},
{
"name": "relatedSaleOrderMaterialLineNumber",
"value": "",
"type": "num",
"format": "int32",
"description": "关联的销售订单物料行号"
},
{
"name": "relevanceDeviceCodes",
"value": "",
"type": "json",
"items": {
"type": "string"
}
},
{
"name": "relevanceLineCodes",
"value": "",
"type": "json",
"items": {
"type": "string"
}
},
{
"name": "relevanceTeamCodes",
"value": "",
"type": "json",
"items": {
"type": "string"
}
},
{
"name": "remark",
"value": "",
"type": "str",
"description": "备注"
},
{
"name": "saleOrderNumber",
"value": "",
"type": "str",
"description": "销售订单号"
},
{
"name": "skuId",
"value": "",
"type": "num",
"format": "int32",
"description": "特征参数skuId"
},
{
"name": "startDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "计划开始时间,新增必填"
},
{
"name": "state",
"value": "",
"type": "num",
"format": "int32",
"description": "状态 1-创建、2-发放、3-投入、4-挂起、5-完成、6-关闭、7-取消"
},
{
"name": "supplierCode",
"value": "",
"type": "str",
"description": "供应商code"
},
{
"name": "teamCode",
"value": "",
"type": "str",
"description": "班组编号"
},
{
"name": "theoreticalWorkingHours",
"value": "",
"type": "num",
"format": "double",
"description": "理论工作时长"
},
{
"name": "unqualified",
"value": "",
"type": "num",
"format": "double",
"description": "不合格量"
},
{
"name": "updateBy",
"value": "",
"type": "str",
"description": "更新人"
},
{
"name": "updateDate",
"value": "",
"type": "str",
"format": "date-time",
"description": "更新时间"
},
{
"name": "workCenterCode",
"value": "",
"type": "str",
"description": "工作中心编码,新增必填"
},
{
"name": "workOrderExtendFieldEight",
"value": "",
"type": "str",
"description": "工单拓展字段8"
},
{
"name": "workOrderExtendFieldFive",
"value": "",
"type": "str",
"description": "工单拓展字段5"
},
{
"name": "workOrderExtendFieldFour",
"value": "",
"type": "str",
"description": "工单拓展字段4"
},
{
"name": "workOrderExtendFieldNine",
"value": "",
"type": "str",
"description": "工单拓展字段9"
},
{
"name": "workOrderExtendFieldOne",
"value": "",
"type": "str",
"description": "工单拓展字段1"
},
{
"name": "workOrderExtendFieldSeven",
"value": "",
"type": "str",
"description": "工单拓展字段7"
},
{
"name": "workOrderExtendFieldSix",
"value": "",
"type": "str",
"description": "工单拓展字段6"
},
{
"name": "workOrderExtendFieldTen",
"value": "",
"type": "str",
"description": "工单拓展字段10"
},
{
"name": "workOrderExtendFieldThree",
"value": "",
"type": "str",
"description": "工单拓展字段3"
},
{
"name": "workOrderExtendFieldTwo",
"value": "",
"type": "str",
"description": "工单拓展字段2"
},
{
"name": "workOrderMaterialExtendFieldEight",
"value": "",
"type": "str",
"description": "工单物料拓展字段8"
},
{
"name": "workOrderMaterialExtendFieldFive",
"value": "",
"type": "str",
"description": "工单物料拓展字段5"
},
{
"name": "workOrderMaterialExtendFieldFour",
"value": "",
"type": "str",
"description": "工单物料拓展字段4"
},
{
"name": "workOrderMaterialExtendFieldNine",
"value": "",
"type": "str",
"description": "工单物料拓展字段9"
},
{
"name": "workOrderMaterialExtendFieldOne",
"value": "",
"type": "str",
"description": "工单物料拓展字段1"
},
{
"name": "workOrderMaterialExtendFieldSeven",
"value": "",
"type": "str",
"description": "工单物料拓展字段7"
},
{
"name": "workOrderMaterialExtendFieldSix",
"value": "",
"type": "str",
"description": "工单物料拓展字段6"
},
{
"name": "workOrderMaterialExtendFieldTen",
"value": "",
"type": "str",
"description": "工单物料拓展字段10"
},
{
"name": "workOrderMaterialExtendFieldThree",
"value": "",
"type": "str",
"description": "工单物料拓展字段3"
},
{
"name": "workOrderMaterialExtendFieldTwo",
"value": "",
"type": "str",
"description": "工单物料拓展字段2"
},
{
"name": "workOrderName",
"value": "",
"type": "str",
"description": "工单名称"
},
{
"name": "workOrderNumber",
"value": "",
"type": "str",
"description": "工单号"
}
],
"type": "json"
}
],
"requestType": "post",
"configType": "keyValue",
"manualSyntaxType": "json",
"manualValue": "",
"messageValue": "",
"customConfigType": "template",
"customTemplate": "{\n \"path\": \"/openApi/api/v1/open/work_orders/upsert\",\n \"method\": \"POST\",\n \"body\": {\n \"workOrderNumber\": \"{{workOrderData.workOrderNumber}}\",\n \"coefficient\": \"{{workOrderData.materialFields.customFieldOne}}\"\n }\n}",
"customTemplateSyntax": "json",
"customUrl": "",
"customRequestMethod": "get",
"customParam": [],
"customPath": [],
"customBody": {
"bodyType": "json",
"bodyJsonValue": [],
"bodyRawValue": "",
"formatType": "json"
},
"customHeader": [],
"x": 851,
"y": 199,
"wires": [
[
"429223ce1f381eda"
]
]
},
{
"id": "76bf85f05fbb9b4c",
"type": "debug",
"z": "90884b6014d070ef",
"logName": "异常信息",
"active": true,
"tosidebar": true,
"console": true,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"logLevel": "info",
"dataType": "field",
"format": "handlebars",
"syntax": "mustache",
"template": "输出日志: payload 值为 {{payload}}",
"output": "str",
"x": 1178,
"y": 199,
"wires": []
},
{
"id": "429223ce1f381eda",
"type": "switch",
"z": "90884b6014d070ef",
"name": "分支处理",
"property": "apiGatewayRes.code",
"propertyType": "msg",
"rules": [
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 1,
"x": 1010,
"y": 199,
"wires": [
[
"76bf85f05fbb9b4c"
]
]
},
{
"id": "709553632aefcca4",
"type": "kafka-client",
"name": "ocs-controller-kafka-client_gd",
"brokers": "172.16.0.3:9092",
"clientid": "gd",
"connectiontimeout": "3000",
"requesttimeout": "25000",
"advancedretry": false,
"maxretrytime": "30000",
"initialretrytime": "300",
"factor": "0.2",
"multiplier": "2",
"retries": "5",
"auth": "none",
"tlsselfsign": false,
"tlscacert": "",
"tlsclientcert": "",
"tlsprivatekey": "",
"tlspassphrase": "",
"saslssl": true,
"saslmechanism": "plain",
"loglevel": "error"
}
]