mqtt上传接口修改

This commit is contained in:
zhanghan11 2024-04-28 17:37:34 +08:00
parent 1af77effbc
commit 1e19aa556e
3 changed files with 20 additions and 12 deletions

View File

@ -134,12 +134,12 @@ xss:
# 数据接入模块配置 # 数据接入模块配置
datasyn: datasyn:
mqtt: mqtt:
host: tcp://47.105.46.242:1883 host: tcp://111.11.4.77:15002
clientId: phm_tzipc clientId: tzipc
topicu: /usr/plcnet/(devname)/edge/u #数据上行主题 设备向服务器推送数据的主题 topicu: /usr/plcnet/roller/edge/u #数据上行主题 设备向服务器推送数据的主题
topicd: /usr/plcnet/(devname)/edge/d #数据下行主题 服务器向设备推送数据主题 topicd: /usr/plcnet/roller/edge/d #数据下行主题 服务器向设备推送数据主题
userName: inspur userName: taizhong
password: inspur password: 6IYC@A5By0l
timeOut: 10 timeOut: 10
keepAlive: 20 keepAlive: 20
Qos: 1 Qos: 1

View File

@ -78,7 +78,7 @@ public class IPCDataSyncTask implements ApplicationRunner {
timer3.schedule(new TimerTask() { timer3.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
threadPoolTaskExecutor.execute(new IPCDataUploadSyncThread(host,clientId,topicu)); threadPoolTaskExecutor.execute(new IPCDataUploadSyncThread(mqttHost,clientId,topicu));
} }
}, 1000, 1000 * 60); }, 1000, 1000 * 60);
} }

View File

@ -8,6 +8,11 @@ import com.inspur.ipc.service.IIpcMonitorFieldService;
import com.inspur.ipc.utils.IpcConstant; import com.inspur.ipc.utils.IpcConstant;
import com.inspur.ipc.utils.JsonSerilizable; import com.inspur.ipc.utils.JsonSerilizable;
import com.inspur.system.service.influx.InfluxDBService; import com.inspur.system.service.influx.InfluxDBService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,6 +22,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public class IPCDataUploadSyncThread implements Runnable { public class IPCDataUploadSyncThread implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(IPCPlcDataSyncThread.class); private static final Logger logger = LoggerFactory.getLogger(IPCPlcDataSyncThread.class);
@ -44,7 +50,7 @@ public class IPCDataUploadSyncThread implements Runnable {
InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class); InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class);
IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class); IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class);
List<IpcMonitorField> partList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.MONITOR_PART_KEY); List<IpcMonitorField> partList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.MONITOR_PART_KEY);
List<IpcMonitorField> plcList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_KEY); List<IpcMonitorField> plcListRead = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_READ_KEY);
List<IpcMonitorField> sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY); List<IpcMonitorField> sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY);
// 数据处理 // 数据处理
Map<String, Object> msgMap = new HashMap<>(); Map<String, Object> msgMap = new HashMap<>();
@ -52,6 +58,8 @@ public class IPCDataUploadSyncThread implements Runnable {
msgMap.put("f", IpcConstant.MQTT_FLAG); msgMap.put("f", IpcConstant.MQTT_FLAG);
List<Map<String, String>> dataList = new ArrayList<>(); List<Map<String, String>> dataList = new ArrayList<>();
for (IpcMonitorField part : partList) { for (IpcMonitorField part : partList) {
// 筛选当前部位参数
List<IpcMonitorField> plcList = plcListRead.stream().filter(field -> part.getFieldCode().equals(field.getParentCode())).collect(Collectors.toList());
// // TODO 遍历生成振动数据 开始 // // TODO 遍历生成振动数据 开始
// Map<String,String> tags = new HashMap<>(); // Map<String,String> tags = new HashMap<>();
// tags.put("part",part.getFieldValue()); // tags.put("part",part.getFieldValue());
@ -78,7 +86,7 @@ public class IPCDataUploadSyncThread implements Runnable {
String insertTime = String.valueOf(plcMap.get("insertTime")); String insertTime = String.valueOf(plcMap.get("insertTime"));
for (IpcMonitorField ipcMonitorField : plcList) { for (IpcMonitorField ipcMonitorField : plcList) {
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel()); map.put("pid", ipcMonitorField.getSourceField());
map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue()))); map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue())));
map.put("s", getSeconds(insertTime)); map.put("s", getSeconds(insertTime));
map.put("ms", getMillis(insertTime)); map.put("ms", getMillis(insertTime));
@ -106,9 +114,9 @@ public class IPCDataUploadSyncThread implements Runnable {
} }
msgMap.put("d", dataList); msgMap.put("d", dataList);
// 进行数据推送 // 进行数据推送
// System.out.println(msgMap.toString()); System.out.println(msgMap.toString());
// System.out.println(changeMapToByte(msgMap)); System.out.println(changeMapToByte(msgMap));
// System.out.println(host + "----------" + clientId+"----------"+topicu); System.out.println(host + "----------" + clientId+"----------"+topicu);
// MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence()); // MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence());
// MqttConnectOptions connOpts = new MqttConnectOptions(); // MqttConnectOptions connOpts = new MqttConnectOptions();
// sampleClient.connect(connOpts); // sampleClient.connect(connOpts);