mqtt上传数据调整

This commit is contained in:
zhanghan11 2024-04-07 18:14:30 +08:00
parent 3b8fbcfd16
commit 2636997d2d

View File

@ -24,7 +24,7 @@ public class IPCDataUploadSyncThread implements Runnable {
private String clientId; private String clientId;
private String topicu; private String topicu;
public IPCDataUploadSyncThread(String host,String clientId,String topicu) { public IPCDataUploadSyncThread(String host, String clientId, String topicu) {
this.host = host; this.host = host;
this.clientId = clientId; this.clientId = clientId;
this.topicu = topicu; this.topicu = topicu;
@ -36,50 +36,55 @@ public class IPCDataUploadSyncThread implements Runnable {
// 获取service // 获取service
InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class); InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class);
IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class); IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class);
List<IpcMonitorField> partList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.MONITOR_PART_KEY);
List<IpcMonitorField> plcList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_KEY); List<IpcMonitorField> plcList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_KEY);
List<IpcMonitorField> sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY); List<IpcMonitorField> sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY);
// 查询最新plc数据
QueryResult plcQuery = influxDBService.query("select * from " + IpcConstant.PLC_MEASUREMENT + " order by time desc limit 1");
List<Map<String, Object>> plcMaps = influxDBService.queryResultProcess(plcQuery);
// 查询最新传感器数据
QueryResult sensorQuery = influxDBService.query("select * from " + IpcConstant.SENSOR_MEASUREMENT + " order by time desc limit 1");
List<Map<String, Object>> sensorMaps = influxDBService.queryResultProcess(sensorQuery);
// 数据处理 // 数据处理
Map<String, Object> msgMap = new HashMap<>(); Map<String, Object> msgMap = new HashMap<>();
// 主动上报 flag 字段固定为rp 云端接收后无需回复 // 主动上报 flag 字段固定为rp 云端接收后无需回复
msgMap.put("f", IpcConstant.MQTT_FLAG); msgMap.put("f", IpcConstant.MQTT_FLAG);
List<Map<String, String>> dataList = new ArrayList<>(); List<Map<String, String>> dataList = new ArrayList<>();
if (plcMaps != null && !plcMaps.isEmpty()) { for (IpcMonitorField part : partList) {
Map<String, Object> plcMap = plcMaps.get(0); String partLabel = part.getFieldLabel();
if (plcMap.get("insertTime") != null) { String partValue = part.getFieldValue();
String insertTime = String.valueOf(plcMap.get("insertTime")); // 查询最新plc数据
for (IpcMonitorField ipcMonitorField : plcList) { QueryResult plcQuery = influxDBService.query("select * from " + IpcConstant.PLC_MEASUREMENT + " where part = '" + partValue + "' order by time desc limit 1");
Map<String, String> map = new HashMap<>(); List<Map<String, Object>> plcMaps = influxDBService.queryResultProcess(plcQuery);
map.put("sid", ipcMonitorField.getFieldLabel()); if (plcMaps != null && !plcMaps.isEmpty()) {
map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue()))); Map<String, Object> plcMap = plcMaps.get(0);
map.put("s", getSeconds(insertTime)); if (plcMap.get("insertTime") != null) {
map.put("ms", getMillis(insertTime)); String insertTime = String.valueOf(plcMap.get("insertTime"));
dataList.add(map); for (IpcMonitorField ipcMonitorField : plcList) {
Map<String, String> map = new HashMap<>();
map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel());
map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue())));
map.put("s", getSeconds(insertTime));
map.put("ms", getMillis(insertTime));
dataList.add(map);
}
} }
} }
} // 查询最新传感器数据
if (sensorMaps != null && !sensorMaps.isEmpty()) { QueryResult sensorQuery = influxDBService.query("select * from " + IpcConstant.SENSOR_MEASUREMENT + " where part = '" + partValue + "' order by time desc limit 1");
Map<String, Object> sensorMap = sensorMaps.get(0); List<Map<String, Object>> sensorMaps = influxDBService.queryResultProcess(sensorQuery);
if (sensorMap.get("insertTime") != null) { if (sensorMaps != null && !sensorMaps.isEmpty()) {
String insertTime = String.valueOf(sensorMap.get("insertTime")); Map<String, Object> sensorMap = sensorMaps.get(0);
for (IpcMonitorField ipcMonitorField : sensorList) { if (sensorMap.get("insertTime") != null) {
Map<String, String> map = new HashMap<>(); String insertTime = String.valueOf(sensorMap.get("insertTime"));
map.put("sid", ipcMonitorField.getFieldLabel()); for (IpcMonitorField ipcMonitorField : sensorList) {
map.put("v", sensorMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(sensorMap.get(ipcMonitorField.getFieldValue()))); Map<String, String> map = new HashMap<>();
map.put("s", getSeconds(insertTime)); map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel());
map.put("ms", getMillis(insertTime)); map.put("v", sensorMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(sensorMap.get(ipcMonitorField.getFieldValue())));
dataList.add(map); map.put("s", getSeconds(insertTime));
map.put("ms", getMillis(insertTime));
dataList.add(map);
}
} }
} }
} }
msgMap.put("d", dataList); msgMap.put("d", dataList);
// 进行数据推送 // 进行数据推送
// System.out.println(msgMap); System.out.println(msgMap);
// System.out.println(changeMapToByte(msgMap)); // System.out.println(changeMapToByte(msgMap));
// System.out.println(host + "----------" + clientId+"----------"+topicu); // System.out.println(host + "----------" + clientId+"----------"+topicu);
// MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence()); // MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence());