diff --git a/tzipc-server/tzipc-admin/src/main/resources/application.yml b/tzipc-server/tzipc-admin/src/main/resources/application.yml index 74dca54..ba27afb 100644 --- a/tzipc-server/tzipc-admin/src/main/resources/application.yml +++ b/tzipc-server/tzipc-admin/src/main/resources/application.yml @@ -134,12 +134,12 @@ xss: # 数据接入模块配置 datasyn: mqtt: - host: tcp://47.105.46.242:1883 - clientId: phm_tzipc - topicu: /usr/plcnet/(devname)/edge/u #数据上行主题 设备向服务器推送数据的主题 - topicd: /usr/plcnet/(devname)/edge/d #数据下行主题 服务器向设备推送数据主题 - userName: inspur - password: inspur + host: tcp://111.11.4.77:15002 + clientId: tzipc + topicu: /usr/plcnet/roller/edge/u #数据上行主题 设备向服务器推送数据的主题 + topicd: /usr/plcnet/roller/edge/d #数据下行主题 服务器向设备推送数据主题 + userName: taizhong + password: 6IYC@A5By0l timeOut: 10 keepAlive: 20 Qos: 1 diff --git a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataSyncTask.java b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataSyncTask.java index 11d3219..dd67b66 100644 --- a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataSyncTask.java +++ b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataSyncTask.java @@ -78,7 +78,7 @@ public class IPCDataSyncTask implements ApplicationRunner { timer3.schedule(new TimerTask() { @Override public void run() { - threadPoolTaskExecutor.execute(new IPCDataUploadSyncThread(host,clientId,topicu)); + threadPoolTaskExecutor.execute(new IPCDataUploadSyncThread(mqttHost,clientId,topicu)); } }, 1000, 1000 * 60); } diff --git a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java index d0e4c81..6d213fe 100644 --- a/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java +++ b/tzipc-server/tzipc-datasyn/src/main/java/com/inspur/datasyn/modbus/IPCDataUploadSyncThread.java @@ -8,6 +8,11 @@ import com.inspur.ipc.service.IIpcMonitorFieldService; import com.inspur.ipc.utils.IpcConstant; import com.inspur.ipc.utils.JsonSerilizable; import com.inspur.system.service.influx.InfluxDBService; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.influxdb.dto.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +22,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.stream.Collectors; public class IPCDataUploadSyncThread implements Runnable { private static final Logger logger = LoggerFactory.getLogger(IPCPlcDataSyncThread.class); @@ -44,7 +50,7 @@ public class IPCDataUploadSyncThread implements Runnable { InfluxDBService influxDBService = SpringUtils.getBean(InfluxDBService.class); IIpcMonitorFieldService iIpcMonitorFieldService = SpringUtils.getBean(IIpcMonitorFieldService.class); List partList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.MONITOR_PART_KEY); - List plcList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_KEY); + List plcListRead = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.PLC_MONITOR_PARAMS_READ_KEY); List sensorList = iIpcMonitorFieldService.selectFieldDataByType(CacheConstants.SENSOR_MONITOR_PARAMS_KEY); // 数据处理 Map msgMap = new HashMap<>(); @@ -52,6 +58,8 @@ public class IPCDataUploadSyncThread implements Runnable { msgMap.put("f", IpcConstant.MQTT_FLAG); List> dataList = new ArrayList<>(); for (IpcMonitorField part : partList) { + // 筛选当前部位参数 + List plcList = plcListRead.stream().filter(field -> part.getFieldCode().equals(field.getParentCode())).collect(Collectors.toList()); // // TODO 遍历生成振动数据 开始 // Map tags = new HashMap<>(); // tags.put("part",part.getFieldValue()); @@ -78,7 +86,7 @@ public class IPCDataUploadSyncThread implements Runnable { String insertTime = String.valueOf(plcMap.get("insertTime")); for (IpcMonitorField ipcMonitorField : plcList) { Map map = new HashMap<>(); - map.put("sid", partLabel + "-" + ipcMonitorField.getFieldLabel()); + map.put("pid", ipcMonitorField.getSourceField()); map.put("v", plcMap.get(ipcMonitorField.getFieldValue()) == null ? "0" : String.valueOf(plcMap.get(ipcMonitorField.getFieldValue()))); map.put("s", getSeconds(insertTime)); map.put("ms", getMillis(insertTime)); @@ -106,9 +114,9 @@ public class IPCDataUploadSyncThread implements Runnable { } msgMap.put("d", dataList); // 进行数据推送 -// System.out.println(msgMap.toString()); -// System.out.println(changeMapToByte(msgMap)); -// System.out.println(host + "----------" + clientId+"----------"+topicu); + System.out.println(msgMap.toString()); + System.out.println(changeMapToByte(msgMap)); + System.out.println(host + "----------" + clientId+"----------"+topicu); // MqttClient sampleClient = new MqttClient(host, clientId, new MemoryPersistence()); // MqttConnectOptions connOpts = new MqttConnectOptions(); // sampleClient.connect(connOpts);