From 2636997d2dc355c6ee57cbb36816dccc22035171 Mon Sep 17 00:00:00 2001 From: zhanghan11 Date: Sun, 7 Apr 2024 18:14:30 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E4=B8=8A=E4=BC=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modbus/IPCDataUploadSyncThread.java | 67 ++++++++++--------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java index f5ec2ae..a1ac760 100644 --- a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java +++ b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java @@ -24,7 +24,7 @@ public class IPCDataUploadSyncThread implements Runnable { private String clientId; private String topicu; - public IPCDataUploadSyncThread(String host,String clientId,String topicu) { + public IPCDataUploadSyncThread(String host, String clientId, String topicu) { this.host = host; this.clientId = clientId; this.topicu = topicu; @@ -36,50 +36,55 @@ public class IPCDataUploadSyncThread implements Runnable { // 获取service InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class); IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class); + List partList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.MONITOR_PART_KEY); List plcList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_KEY); List sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY); - // 查询最新plc数据 - QueryResult plcQuery = influxDBService.query("select * from " + IpcConstant.PLC_MEASUREMENT + " order by time desc limit 1"); - List> plcMaps = influxDBService.queryResultProcess(plcQuery); - // 查询最新传感器数据 - QueryResult sensorQuery = influxDBService.query("select * from " + IpcConstant.SENSOR_MEASUREMENT + " order by time desc limit 1"); - List> sensorMaps = influxDBService.queryResultProcess(sensorQuery); // 数据处理 Map msgMap = new HashMap<>(); // 主动上报 flag 字段固定为“rp” ,云端接收后无需回复 msgMap.put("f", IpcConstant.MQTT_FLAG); List> dataList = new ArrayList<>(); - if (plcMaps != null && !plcMaps.isEmpty()) { - Map plcMap = plcMaps.get(0); - if (plcMap.get("insertTime") != null) { - String insertTime = String.valueOf(plcMap.get("insertTime")); - for (IpcMonitorField ipcMonitorField : plcList) { - Map map = new HashMap<>(); - map.put("sid", ipcMonitorField.getFieldLabel()); - map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue()))); - map.put("s", getSeconds(insertTime)); - map.put("ms", getMillis(insertTime)); - dataList.add(map); + for (IpcMonitorField part : partList) { + String partLabel = part.getFieldLabel(); + String partValue = part.getFieldValue(); + // 查询最新plc数据 + QueryResult plcQuery = influxDBService.query("select * from " + IpcConstant.PLC_MEASUREMENT + " where part = '" + partValue + "' order by time desc limit 1"); + List> plcMaps = influxDBService.queryResultProcess(plcQuery); + if (plcMaps != null && !plcMaps.isEmpty()) { + Map plcMap = plcMaps.get(0); + if (plcMap.get("insertTime") != null) { + String insertTime = String.valueOf(plcMap.get("insertTime")); + for (IpcMonitorField ipcMonitorField : plcList) { + Map map = new HashMap<>(); + map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel()); + map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue()))); + map.put("s", getSeconds(insertTime)); + map.put("ms", getMillis(insertTime)); + dataList.add(map); + } } } - } - if (sensorMaps != null && !sensorMaps.isEmpty()) { - Map sensorMap = sensorMaps.get(0); - if (sensorMap.get("insertTime") != null) { - String insertTime = String.valueOf(sensorMap.get("insertTime")); - for (IpcMonitorField ipcMonitorField : sensorList) { - Map map = new HashMap<>(); - map.put("sid", ipcMonitorField.getFieldLabel()); - map.put("v", sensorMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(sensorMap.get(ipcMonitorField.getFieldValue()))); - map.put("s", getSeconds(insertTime)); - map.put("ms", getMillis(insertTime)); - dataList.add(map); + // 查询最新传感器数据 + QueryResult sensorQuery = influxDBService.query("select * from " + IpcConstant.SENSOR_MEASUREMENT + " where part = '" + partValue + "' order by time desc limit 1"); + List> sensorMaps = influxDBService.queryResultProcess(sensorQuery); + if (sensorMaps != null && !sensorMaps.isEmpty()) { + Map sensorMap = sensorMaps.get(0); + if (sensorMap.get("insertTime") != null) { + String insertTime = String.valueOf(sensorMap.get("insertTime")); + for (IpcMonitorField ipcMonitorField : sensorList) { + Map map = new HashMap<>(); + map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel()); + map.put("v", sensorMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(sensorMap.get(ipcMonitorField.getFieldValue()))); + map.put("s", getSeconds(insertTime)); + map.put("ms", getMillis(insertTime)); + dataList.add(map); + } } } } msgMap.put("d", dataList); // 进行数据推送 -// System.out.println(msgMap); + System.out.println(msgMap); // System.out.println(changeMapToByte(msgMap)); // System.out.println(host + "----------" + clientId+"----------"+topicu); // MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence());