From cd865e79e2c2db1e93f40874a7a622459a15c825 Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Wed, 1 Jul 2020 17:27:34 +0800 Subject: [PATCH] =?UTF-8?q?OCT=20REM:=201.=20=E4=BF=AE=E6=94=B9=E5=81=9C?= =?UTF-8?q?=E6=AD=A2=E6=B8=85=E6=B4=97=E4=BB=BB=E5=8A=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispose/service/impl/TaskServiceImpl.java | 175 +++++++++--------- 1 file changed, 92 insertions(+), 83 deletions(-) diff --git a/src/main/java/com/dispose/service/impl/TaskServiceImpl.java b/src/main/java/com/dispose/service/impl/TaskServiceImpl.java index 97ae2da0..8b77fe97 100644 --- a/src/main/java/com/dispose/service/impl/TaskServiceImpl.java +++ b/src/main/java/com/dispose/service/impl/TaskServiceImpl.java @@ -11,6 +11,7 @@ import com.dispose.mapper.DisposeTaskMapper; import com.dispose.mapper.TaskInfoMapper; import com.dispose.pojo.entity.DisposeDevice; import com.dispose.pojo.po.MulReturnType; +import com.dispose.pojo.vo.common.TaskInfo; import com.dispose.pojo.vo.common.TaskInfoDetail; import com.dispose.service.DisposeNodeManager; import com.dispose.service.TaskService; @@ -91,8 +92,8 @@ public class TaskServiceImpl implements TaskService { public void loadTaskFromDatabase() { // 从数据库中取出所有任务 List taskList = disposeTaskMapper.selectAll().stream() - .filter(this::taskIsRunning) - .collect(Collectors.toList()); + .filter(this::taskIsRunning) + .collect(Collectors.toList()); taskList.forEach(v -> { v.setRetryTimes(0); @@ -132,30 +133,30 @@ public class TaskServiceImpl implements TaskService { if (disposeDevice.size() == 0) { log.error("No such device to dispose this task: devId:{}, disposeIp:{}, type:{}", - task.getDeviceId(), task.getDisposeIp(), task.getType()); + task.getDeviceId(), task.getDisposeIp(), task.getType()); return MulReturnType.builder() - .firstParam(ErrorCode.ERR_NOSUCHDEVICE) - .secondParam(-1L) - .build(); + .firstParam(ErrorCode.ERR_NOSUCHDEVICE) + .secondParam(-1L) + .build(); } // 查询当前是否有相同能力节点,相同用户,相同处置IP的且正在执行的处置任务,如果存在则忽略该次任务(依照产品需求) List taskList = taskCacheManager.getAllRunningTask() - .stream() - .filter(v -> ((taskId == null || taskId == -1L) || Objects.equals(v.getDeviceId(), task.getDeviceId())) - && Objects.equals(v.getAccountId(), task.getAccountId()) - && taskIsRunning(v) - && Objects.equals(v.getType(), task.getType()) - && Objects.equals(v.getDisposeIp(), task.getDisposeIp())) - .collect(Collectors.toList()); + .stream() + .filter(v -> ((taskId == null || taskId == -1L) || Objects.equals(v.getDeviceId(), task.getDeviceId())) + && Objects.equals(v.getAccountId(), task.getAccountId()) + && taskIsRunning(v) + && Objects.equals(v.getType(), task.getType()) + && Objects.equals(v.getDisposeIp(), task.getDisposeIp())) + .collect(Collectors.toList()); if (taskList.size() > 0) { log.error("Same dispose task is running: devId:{}, disposeIp:{}, type:{}", - task.getDeviceId(), task.getDisposeIp(), task.getType()); + task.getDeviceId(), task.getDisposeIp(), task.getType()); return MulReturnType.builder() - .firstParam(ErrorCode.ERR_TASKRUNNING) - .secondParam(taskList.get(0).getId()) - .build(); + .firstParam(ErrorCode.ERR_TASKRUNNING) + .secondParam(taskList.get(0).getId()) + .build(); } // 将该任务写入数据库和缓存,等到定时任务真正启动该任务 @@ -168,7 +169,7 @@ public class TaskServiceImpl implements TaskService { ErrorCode err = taskCacheManager.addTask(cacheTask); log.info("Create new dispose task: devId:{}, disposeIp:{}, type:{}, result:{}", - task.getDeviceId(), task.getDisposeIp(), task.getType(), err.getMsg()); + task.getDeviceId(), task.getDisposeIp(), task.getType(), err.getMsg()); return MulReturnType.builder().firstParam(err).secondParam(task.getId()).build(); } @@ -211,26 +212,26 @@ public class TaskServiceImpl implements TaskService { // 异步启动处置任务 CompletableFuture.supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], planDuration)) - .whenComplete((v, ex) -> { - if (ex != null) { - log.error("Start task: taskId:{}, error:{}", taskId, ex.getMessage()); - // 增加设备执行清洗任务信息 - taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, - (long) ErrorCode.ERR_SYSTEMEXCEPTION.getCode()); - } else { - if (v.getFirstParam() != ErrorCode.ERR_OK) { - // 执行任务失败恢复缓存中的任务状态 - taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, (long) v.getFirstParam().getCode()); - log.error("Start task: taskId:{}, disposeIp:{}, error:{}", - taskId, task.getDisposeIp(), v.getFirstParam().getMsg()); + .whenComplete((v, ex) -> { + if (ex != null) { + log.error("Start task: taskId:{}, error:{}", taskId, ex.getMessage()); + // 增加设备执行清洗任务信息 + taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, + (long) ErrorCode.ERR_SYSTEMEXCEPTION.getCode()); } else { - // 执行任务失败恢复缓存中的任务状态 - taskInfoMapper.addNewTaskInfo(taskId, k.getId(), v.getSecondParam(), (long) v.getFirstParam().getCode()); - log.info("Start task finished: taskId:{}, disposeId:{}, type:{}", - taskId, task.getDisposeIp(), task.getType()); + if (v.getFirstParam() != ErrorCode.ERR_OK) { + // 执行任务失败恢复缓存中的任务状态 + taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, (long) v.getFirstParam().getCode()); + log.error("Start task: taskId:{}, disposeIp:{}, error:{}", + taskId, task.getDisposeIp(), v.getFirstParam().getMsg()); + } else { + // 执行任务成功 + taskInfoMapper.addNewTaskInfo(taskId, k.getId(), v.getSecondParam(), (long) v.getFirstParam().getCode()); + log.info("Start task finished: taskId:{}, disposeId:{}, type:{}", + taskId, task.getDisposeIp(), task.getType()); + } } - } - }); + }); }); } @@ -269,26 +270,34 @@ public class TaskServiceImpl implements TaskService { dev.forEach(k -> { DisposeEntryManager dp = DeviceRouter.deviceRouterFactory(k.getType(), k.getIpAddr()); - // 异步启动处置任务 - CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], -1L)) - .whenComplete((v, ex) -> { - if (ex != null) { - // 恢复缓存中任务状态到先前状态 - taskCacheManager.upgradeTaskStatus(taskId, prdStatus); - log.error("Stop task: taskId:{}, error:{}", taskId, ex.getMessage()); - } else { - if (v.getFirstParam() != ErrorCode.ERR_OK) { - // 恢复缓存中任务状态到先前状态 - taskCacheManager.upgradeTaskStatus(taskId, prdStatus); - log.error("Stop task: taskId:{}, error:{}", taskId, v.getFirstParam().getMsg()); - } else { - // 任务执行完成后更新数据库处置任务状态 - finishTask(taskId); - log.info("Stop task finished: taskId:{}, disposeId:{}, type:{}", - taskId, task.getDisposeIp(), task.getType()); - } - } - }); + List TaskInfoList = taskInfoMapper.getTaskInfo(taskId, k.getId()); + + TaskInfoList.forEach(t -> { + if (t.getExternId() == null) { + t.setExternId(-1L); + } + + // 异步启动处置任务 + CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], t.getExternId())) + .whenComplete((v, ex) -> { + if (ex != null) { + // 恢复缓存中任务状态到先前状态 + taskCacheManager.upgradeTaskStatus(taskId, prdStatus); + log.error("Stop task: taskId:{}, error:{}", taskId, ex.getMessage()); + } else { + if (v.getFirstParam() != ErrorCode.ERR_OK) { + // 恢复缓存中任务状态到先前状态 + taskCacheManager.upgradeTaskStatus(taskId, prdStatus); + log.error("Stop task: taskId:{}, error:{}", taskId, v.getFirstParam().getMsg()); + } else { + // 任务执行完成后更新数据库处置任务状态 + finishTask(taskId); + log.info("Stop task finished: taskId:{}, disposeId:{}, type:{}", + taskId, task.getDisposeIp(), task.getType()); + } + } + }); + }); }); } else { log.error("No such task: taskId:{}", taskId); @@ -330,7 +339,7 @@ public class TaskServiceImpl implements TaskService { @Override public boolean taskIsRunning(TaskInfoDetail task) { return task.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode() - || task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode(); + || task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode(); } /** @@ -342,8 +351,8 @@ public class TaskServiceImpl implements TaskService { @Override public boolean taskIsExpired(TaskInfoDetail task) { return LocalDateTime.parse(task.getPlanEndTime(), - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) - .isBefore(LocalDateTime.now()); + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + .isBefore(LocalDateTime.now()); } /** @@ -355,11 +364,11 @@ public class TaskServiceImpl implements TaskService { @Override public List getNodeAllRunningTask(Long devId) { List taskList = taskCacheManager - .getAllRunningTask() - .stream() - .filter(v -> v.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode() - && v.getDeviceId().equals(devId)) - .collect(Collectors.toList()); + .getAllRunningTask() + .stream() + .filter(v -> v.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode() + && v.getDeviceId().equals(devId)) + .collect(Collectors.toList()); if (taskList.size() > 0) { log.info("The device has tasks running: devId:{}, tasksRunningNumber:{}", devId, taskList.size()); @@ -430,9 +439,9 @@ public class TaskServiceImpl implements TaskService { // 根据处置IP,拿出所有正在处置的任务 List taskList = taskCacheManager.getAllRunningTask().stream() - .filter(v -> (Objects.equals(v.getId(), taskId)) - && taskIsRunning(v)) - .collect(Collectors.toList()); + .filter(v -> (Objects.equals(v.getId(), taskId)) + && taskIsRunning(v)) + .collect(Collectors.toList()); // 对符合条件的任务调用停止命令 taskList.forEach(v -> { @@ -455,10 +464,10 @@ public class TaskServiceImpl implements TaskService { @Override public boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType) { return taskCacheManager.getAllRunningTask().stream() - .anyMatch(v -> Objects.equals(v.getAccountId(), userId) - && Objects.equals(v.getDeviceId(), devId) - && Objects.equals(v.getDisposeIp(), disposeIp) - && Objects.equals(v.getType(), disposeType)); + .anyMatch(v -> Objects.equals(v.getAccountId(), userId) + && Objects.equals(v.getDeviceId(), devId) + && Objects.equals(v.getDisposeIp(), disposeIp) + && Objects.equals(v.getType(), disposeType)); } /** @@ -475,11 +484,11 @@ public class TaskServiceImpl implements TaskService { // 根据处置IP,拿出所有正在处置的任务 List taskList = taskCacheManager.getAllRunningTask().stream() - .filter(v -> (Objects.equals(v.getDeviceId(), devId)) - && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) - && Objects.equals(v.getDisposeIp(), ipAddr) - && taskIsRunning(v)) - .collect(Collectors.toList()); + .filter(v -> (Objects.equals(v.getDeviceId(), devId)) + && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) + && Objects.equals(v.getDisposeIp(), ipAddr) + && taskIsRunning(v)) + .collect(Collectors.toList()); // 对符合条件的任务调用停止命令 taskList.forEach(v -> { @@ -503,10 +512,10 @@ public class TaskServiceImpl implements TaskService { // 根据处置IP,拿出所有正在处置的任务 List taskList = taskCacheManager.getAllRunningTask().stream() - .filter(v -> ((devId == -1L) || Objects.equals(v.getDeviceId(), devId)) - && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) - && taskIsRunning(v)) - .collect(Collectors.toList()); + .filter(v -> ((devId == -1L) || Objects.equals(v.getDeviceId(), devId)) + && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) + && taskIsRunning(v)) + .collect(Collectors.toList()); // 对符合条件的任务调用停止命令 taskList.forEach(v -> { @@ -529,9 +538,9 @@ public class TaskServiceImpl implements TaskService { // 根据处置IP,拿出所有正在处置的任务 List taskList = taskCacheManager.getAllRunningTask().stream() - .filter(v -> (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) - && taskIsRunning(v)) - .collect(Collectors.toList()); + .filter(v -> (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) + && taskIsRunning(v)) + .collect(Collectors.toList()); // 对符合条件的任务调用停止命令 taskList.forEach(v -> {