From c73590f2fb2e1c3858dde70750266d5afb797c88 Mon Sep 17 00:00:00 2001 From: HuangXin Date: Wed, 29 Apr 2020 13:12:14 +0800 Subject: [PATCH] =?UTF-8?q?OCT=20REM:=201.=20=E5=A2=9E=E5=8A=A0=E5=A4=84?= =?UTF-8?q?=E7=BD=AE=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E4=B8=9A=E5=8A=A1?= =?UTF-8?q?=202.=20=E4=BF=AE=E6=AD=A3=E9=83=A8=E5=88=86=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DisposeNodeInfoController.java | 7 +- .../com/dispose/dispose/impl/DPTechImpl.java | 4 +- .../dispose/impl/VirtualDeviceImpl.java | 3 +- .../com/dispose/mapper/DisposeTaskMapper.java | 8 -- .../com/dispose/service/AsyncService.java | 27 ---- .../java/com/dispose/service/TaskService.java | 17 ++- .../service/impl/AsyncServiceImpl.java | 25 ---- .../service/impl/DisposeNodeManagerImpl.java | 4 + .../dispose/service/impl/TaskServiceImpl.java | 121 ++++++++++++++---- .../com/dispose/task/TaskManagerTask.java | 43 ++++--- src/main/resources/mappers/DisposeTask.xml | 7 - .../DeviceNodeInfoControllerTest.java | 16 +-- .../dispose/mapper/DisposeTaskMapperTest.java | 2 +- 13 files changed, 151 insertions(+), 133 deletions(-) delete mode 100644 src/main/java/com/dispose/service/AsyncService.java delete mode 100644 src/main/java/com/dispose/service/impl/AsyncServiceImpl.java diff --git a/src/main/java/com/dispose/controller/DisposeNodeInfoController.java b/src/main/java/com/dispose/controller/DisposeNodeInfoController.java index 2f707338..bc701ced 100644 --- a/src/main/java/com/dispose/controller/DisposeNodeInfoController.java +++ b/src/main/java/com/dispose/controller/DisposeNodeInfoController.java @@ -169,12 +169,9 @@ public class DisposeNodeInfoController { if (devList != null && devList.size() > 0) { devList.forEach(v -> { - VersionRsp ver = VersionRsp.builder() - .version(v.getVersion()) - .build(); + VersionRsp ver = VersionRsp.builder().version(v.getVersion()).build(); - ver.setId(v.getId() - .toString()); + ver.setId(v.getId().toString()); ver.setStatus(ErrorCode.ERR_OK.getCode()); ver.setMessage(ErrorCode.ERR_OK.getMsg()); diff --git a/src/main/java/com/dispose/dispose/impl/DPTechImpl.java b/src/main/java/com/dispose/dispose/impl/DPTechImpl.java index 19203110..4e896c8e 100644 --- a/src/main/java/com/dispose/dispose/impl/DPTechImpl.java +++ b/src/main/java/com/dispose/dispose/impl/DPTechImpl.java @@ -291,8 +291,8 @@ public class DPTechImpl implements DisposeEntryManager { try { return (T) cleanTypePort.getAllProtectionObjectFromUMC().getProtectionObjectDataForService(); } catch (Exception ex) { - log.error(ex.getMessage()); - ex.printStackTrace(); + //log.error(ex.getMessage()); + //ex.printStackTrace(); return null; } } diff --git a/src/main/java/com/dispose/dispose/impl/VirtualDeviceImpl.java b/src/main/java/com/dispose/dispose/impl/VirtualDeviceImpl.java index c13cf93a..4f6e8d39 100644 --- a/src/main/java/com/dispose/dispose/impl/VirtualDeviceImpl.java +++ b/src/main/java/com/dispose/dispose/impl/VirtualDeviceImpl.java @@ -2,6 +2,7 @@ package com.dispose.dispose.impl; import com.dispose.common.DeviceCapacity; import com.dispose.common.ErrorCode; +import com.dispose.common.GlobalVar; import com.dispose.common.IPAddrType; import com.dispose.dispose.DisposeEntryManager; import com.dispose.dispose.po.DeviceInfo; @@ -119,7 +120,7 @@ public class VirtualDeviceImpl implements DisposeEntryManager { */ @Override public boolean getDeviceLinkStatus() { - return true; + return GlobalVar.USED_VIRTUAL_DISPOSE_MODE; } /** diff --git a/src/main/java/com/dispose/mapper/DisposeTaskMapper.java b/src/main/java/com/dispose/mapper/DisposeTaskMapper.java index 22a42c91..b8048e47 100644 --- a/src/main/java/com/dispose/mapper/DisposeTaskMapper.java +++ b/src/main/java/com/dispose/mapper/DisposeTaskMapper.java @@ -88,14 +88,6 @@ public interface DisposeTaskMapper extends Mapper, @Param("ipAddr") String ipAddr, @Param("status") int status); - /** - * Gets all task by ip. - * - * @param ipAddr the ip addr - * @return the all task by ip - */ - List getAllTaskByIp(@Param("ipAddr") String ipAddr); - /** * Gets all task by status. * diff --git a/src/main/java/com/dispose/service/AsyncService.java b/src/main/java/com/dispose/service/AsyncService.java deleted file mode 100644 index d25a9027..00000000 --- a/src/main/java/com/dispose/service/AsyncService.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.dispose.service; - -import com.dispose.common.DeviceCapacity; -import com.dispose.dispose.DisposeEntryManager; - -/** - * The interface Async service. - */ -public interface AsyncService { - /** - * Async start dispose device task. - * - * @param dp the dp - * @param ipAddr the ip addr - * @param type the type - */ - void asyncStartDisposeDeviceTask(DisposeEntryManager dp, String ipAddr, DeviceCapacity type); - - /** - * Async stop dispose device task. - * - * @param dp the dp - * @param ipAddr the ip addr - * @param type the type - */ - void asyncStopDisposeDeviceTask(DisposeEntryManager dp, String ipAddr, DeviceCapacity type); -} diff --git a/src/main/java/com/dispose/service/TaskService.java b/src/main/java/com/dispose/service/TaskService.java index 584fcd21..a0e5c4b0 100644 --- a/src/main/java/com/dispose/service/TaskService.java +++ b/src/main/java/com/dispose/service/TaskService.java @@ -15,10 +15,10 @@ public interface TaskService { void loadTaskFromDatabase(); /** - * Create task error code. + * Create task m return type. * * @param task the task - * @return the error code + * @return the m return type */ MReturnType createTask(TaskInfoDetail task); @@ -47,7 +47,7 @@ public interface TaskService { ErrorCode finishTask(Long taskId); /** - * Task is running boolean. + * Task is running xx boolean. * * @param task the task * @return the boolean @@ -62,6 +62,17 @@ public interface TaskService { */ boolean taskIsExpired(TaskInfoDetail task); + /** + * Task is exists boolean. + * + * @param devId the dev id + * @param userId the user id + * @param disposeIp the dispose ip + * @param disposeType the dispose type + * @return the boolean + */ + boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType); + /** * Gets node all running task. * diff --git a/src/main/java/com/dispose/service/impl/AsyncServiceImpl.java b/src/main/java/com/dispose/service/impl/AsyncServiceImpl.java deleted file mode 100644 index 039988aa..00000000 --- a/src/main/java/com/dispose/service/impl/AsyncServiceImpl.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.dispose.service.impl; - -import com.dispose.common.DeviceCapacity; -import com.dispose.dispose.DisposeEntryManager; -import com.dispose.service.AsyncService; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -/** - * The type Async service. - */ -@Service -public class AsyncServiceImpl implements AsyncService { - @Override - @Async("bizExecutor") - public void asyncStartDisposeDeviceTask(DisposeEntryManager dp, String ipAddr, DeviceCapacity type) { - dp.runDispose(ipAddr, type); - } - - @Override - @Async("bizExecutor") - public void asyncStopDisposeDeviceTask(DisposeEntryManager dp, String ipAddr, DeviceCapacity type) { - dp.stopDispose(ipAddr, type); - } -} diff --git a/src/main/java/com/dispose/service/impl/DisposeNodeManagerImpl.java b/src/main/java/com/dispose/service/impl/DisposeNodeManagerImpl.java index 470fa619..7d987067 100644 --- a/src/main/java/com/dispose/service/impl/DisposeNodeManagerImpl.java +++ b/src/main/java/com/dispose/service/impl/DisposeNodeManagerImpl.java @@ -123,6 +123,10 @@ public class DisposeNodeManagerImpl implements DisposeNodeManager { try { dp = DeviceRouter.deviceRouterFactory(dev.getType(), dev.getIpAddr(), IPAddrType.getIpAddrType(dev.getIpAddr())); + + if (!dp.getDeviceLinkStatus()) { + return new MReturnType<>(ErrorCode.ERR_NOSUCHDEVICE, String.valueOf(-1)); + } } catch (Exception ex) { return new MReturnType<>(ErrorCode.ERR_NOSUCHDEVICE, String.valueOf(-1)); } diff --git a/src/main/java/com/dispose/service/impl/TaskServiceImpl.java b/src/main/java/com/dispose/service/impl/TaskServiceImpl.java index 2684ed87..9a6f2927 100644 --- a/src/main/java/com/dispose/service/impl/TaskServiceImpl.java +++ b/src/main/java/com/dispose/service/impl/TaskServiceImpl.java @@ -10,7 +10,6 @@ import com.dispose.mapper.DisposeTaskMapper; import com.dispose.pojo.entity.DisposeDevice; import com.dispose.pojo.po.MReturnType; import com.dispose.pojo.vo.common.TaskInfoDetail; -import com.dispose.service.AsyncService; import com.dispose.service.DisposeNodeManager; import com.dispose.service.TaskService; import com.fasterxml.jackson.core.JsonProcessingException; @@ -24,6 +23,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -32,8 +32,6 @@ import java.util.stream.Collectors; @Service @Slf4j public class TaskServiceImpl implements TaskService { - @Resource - private AsyncService asyncService; /** * The Object mapper. @@ -53,9 +51,19 @@ public class TaskServiceImpl implements TaskService { @Resource private DisposeTaskMapper disposeTaskMapper; + /** + * The Dispose node manager. + */ @Resource private DisposeNodeManager disposeNodeManager; + /** + * Gets dispose device handle. + * + * @param disposeIp the dispose ip + * @param devCapType the dev cap type + * @return the dispose device handle + */ private DisposeEntryManager getDisposeDeviceHandle(String disposeIp, int devCapType) { DisposeDevice dev = getDisposeNode(disposeIp, devCapType); @@ -67,6 +75,13 @@ public class TaskServiceImpl implements TaskService { return DeviceRouter.deviceRouterFactory(dev.getType(), dev.getIpAddr()); } + /** + * Gets dispose node. + * + * @param disposeIp the dispose ip + * @param devCapType the dev cap type + * @return the dispose node + */ private DisposeDevice getDisposeNode(String disposeIp, int devCapType) { DeviceCapacity cap; @@ -108,10 +123,10 @@ public class TaskServiceImpl implements TaskService { } /** - * Create task error code. + * Create task m return type. * * @param task the task - * @return the error code + * @return the m return type */ @Override public MReturnType createTask(TaskInfoDetail task) { @@ -152,7 +167,12 @@ public class TaskServiceImpl implements TaskService { // 将该任务写入数据库和缓存,等到定时任务真正启动该任务 disposeTaskMapper.addNewTask(task); - ErrorCode err = taskCacheManager.addTask(task); + + task.setBeginTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + + TaskInfoDetail cacheTask = disposeTaskMapper.getTaskInfoById(task.getId()); + + ErrorCode err = taskCacheManager.addTask(cacheTask); return MReturnType.builder() .firstParam(err) @@ -184,17 +204,30 @@ public class TaskServiceImpl implements TaskService { return ErrorCode.ERR_NOSUCHDEVICE; } - if (!taskIsRunning(task)) { - // 启动处置任务 - err = dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()]); + // 对新建的任务执行启动操作 + if (task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode()) { - if (err == ErrorCode.ERR_OK) { - // 更新处置任务状态 - taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode()); - disposeTaskMapper.changeTaskCurrentStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode()); - } else { - log.error("Start task {}:{} error {}", task.getId(), task.getDisposeIp(), err.getMsg()); - } + taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode()); + + // 异步启动处置任务 + CompletableFuture future = CompletableFuture + .supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()])) + .whenComplete((v, ex) -> { + if (ex != null) { + log.error(ex.getMessage()); + // 执行任务失败恢复缓存中的任务状态 + taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_NEW.getCode()); + } else { + if (v != ErrorCode.ERR_OK) { + // 执行任务失败恢复缓存中的任务状态 + taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_NEW.getCode()); + log.error("Start task {}:{} error {}", task.getId(), task.getDisposeIp(), err.getMsg()); + } else { + // 任务执行完成后更新数据库处置任务状态 + disposeTaskMapper.changeTaskCurrentStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode()); + } + } + }); } return err; @@ -209,14 +242,10 @@ public class TaskServiceImpl implements TaskService { @Override public ErrorCode stopTask(Long taskId) { - if (taskId == -1) { - return ErrorCode.ERR_NOSUCHTASK; - } - ErrorCode err = taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_STOP.getCode()); - if (err == ErrorCode.ERR_OK) { - disposeTaskMapper.changeTaskCurrentStatus(taskId, DisposeTaskStatus.TASK_STOP.getCode()); + if (err != ErrorCode.ERR_OK) { + return err; } TaskInfoDetail task = taskCacheManager.getTaskById(taskId); @@ -224,10 +253,32 @@ public class TaskServiceImpl implements TaskService { if (task != null) { DisposeEntryManager dp = getDisposeDeviceHandle(task.getDisposeIp(), task.getType()); - if (dp != null) { - asyncService.asyncStopDisposeDeviceTask(dp, task.getDisposeIp(), - DeviceCapacity.values()[task.getType()]); + if (dp == null) { + return ErrorCode.ERR_NOSUCHDEVICE; } + + int prdStatus = task.getCurrentStatus(); + + // 异步启动处置任务 + CompletableFuture future = CompletableFuture + .supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()])) + .whenComplete((v, ex) -> { + if (ex != null) { + // 恢复缓存中任务状态到先前状态 + taskCacheManager.upgradeTaskStatus(taskId, prdStatus); + log.error(ex.getMessage()); + } else { + if (v != ErrorCode.ERR_OK) { + // 恢复缓存中任务状态到先前状态 + taskCacheManager.upgradeTaskStatus(taskId, prdStatus); + log.error("Start task {}:{} error {}", task.getId(), task.getDisposeIp(), err.getMsg()); + } else { + // 任务执行完成后更新数据库处置任务状态 + disposeTaskMapper.changeTaskCurrentStatus(taskId, DisposeTaskStatus.TASK_STOP.getCode()); + } + } + }); + } return ErrorCode.ERR_OK; @@ -251,7 +302,7 @@ public class TaskServiceImpl implements TaskService { } /** - * Task is running boolean. + * Task is running xx boolean. * * @param task the task * @return the boolean @@ -332,4 +383,22 @@ public class TaskServiceImpl implements TaskService { public List getAllTask() { return taskCacheManager.getAllTask(); } + + /** + * Task is exists boolean. + * + * @param devId the dev id + * @param userId the user id + * @param disposeIp the dispose ip + * @param disposeType the dispose type + * @return the boolean + */ + @Override + public boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType) { + return taskCacheManager.getAllTask().parallelStream() + .anyMatch(v -> Objects.equals(v.getAccountId(), userId) + && Objects.equals(v.getDeviceId(), devId) + && Objects.equals(v.getDisposeIp(), disposeIp) + && Objects.equals(v.getType(), disposeType)); + } } diff --git a/src/main/java/com/dispose/task/TaskManagerTask.java b/src/main/java/com/dispose/task/TaskManagerTask.java index 51d4f4ed..66ca98b0 100644 --- a/src/main/java/com/dispose/task/TaskManagerTask.java +++ b/src/main/java/com/dispose/task/TaskManagerTask.java @@ -1,6 +1,9 @@ package com.dispose.task; +import com.dispose.common.DisposeTaskStatus; +import com.dispose.common.ErrorCode; import com.dispose.manager.TaskCacheManager; +import com.dispose.pojo.vo.common.TaskInfoDetail; import com.dispose.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; @@ -32,28 +35,28 @@ public class TaskManagerTask { * Task runtime manager. */ @Async("bizExecutor") - @Scheduled(fixedRate = 5000) + @Scheduled(fixedDelay = 5000) public void taskRuntimeManager() { Iterator it = taskCacheManager.getAllTask().iterator(); -// while (it.hasNext()) { -// TaskInfoDetail taskData = (TaskInfoDetail) it.next(); -// -// if(taskService.taskIsExpired(taskData)) { -// log.info("Finish expired task {}:{} begin at {}", -// taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime()); -// taskService.stopTask(taskData.getId()); -// taskService.finishTask(taskData.getId()); -// continue; -// } -// -// if (taskData.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode()) { -// log.info("Start task {}:{} of {}", -// taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime()); -// if (taskService.startTask(taskData.getId()) != ErrorCode.ERR_OK) { -// log.error("startTask Task {}:{} error\n", taskData.getId(), taskData.getDisposeIp()); -// } -// } -// } + while (it.hasNext()) { + TaskInfoDetail taskData = (TaskInfoDetail) it.next(); + + if(taskService.taskIsExpired(taskData) && taskService.taskIsRunning(taskData)) { + log.info("Finish expired task {}:{} begin at {}", + taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime()); + taskService.stopTask(taskData.getId()); + taskService.finishTask(taskData.getId()); + continue; + } + + if (taskData.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode()) { + log.info("Start task {}:{} of {}", + taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime()); + if (taskService.startTask(taskData.getId()) != ErrorCode.ERR_OK) { + log.error("startTask Task {}:{} error\n", taskData.getId(), taskData.getDisposeIp()); + } + } + } } } diff --git a/src/main/resources/mappers/DisposeTask.xml b/src/main/resources/mappers/DisposeTask.xml index c927259c..4c4d2cfb 100644 --- a/src/main/resources/mappers/DisposeTask.xml +++ b/src/main/resources/mappers/DisposeTask.xml @@ -72,13 +72,6 @@ AND currentStatus = #{status, jdbcType=INTEGER} - -