REM:
1. 修改迪普启动和停止清洗任务为异步方式
This commit is contained in:
chenlinghy 2020-07-06 17:02:00 +08:00
parent 3b4550364f
commit f8a66100da
2 changed files with 90 additions and 12 deletions

View File

@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* The type Dp tech.
@ -214,12 +215,17 @@ public class DpTechImpl implements DisposeEntryManager {
for (int d : new int[]{0, 1}) {
// 遍历所有清洗类型
for (DpTechAttackType t : DpTechAttackType.values()) {
NtcRequestResultInfo ret = cleanTypePort.startAbnormalTaskForUMC(ip, t.getCode(), d);
log.debug("Cleanup: {} --> {}:{}", d, t.getReadme(), t.getCode());
if (ret.getResultRetVal() != ErrorCode.ERR_OK.getCode()) {
log.error("Start {} cleanup task error: {}", ip, ret.getResultInfo());
return new MulReturnType<>(ErrorCode.ERR_CALLDEVICE, null);
}
CompletableFuture.supplyAsync(() -> cleanTypePort.startAbnormalTaskForUMC(ip, t.getCode(), d))
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("DPTech run dispose: {}, {}, error:{}", ip, t.getCode(), ex.getMessage());
} else {
log.debug("Cleanup: {} --> {}:{}", d, t.getReadme(), t.getCode());
if(v.getResultRetVal() != ErrorCode.ERR_OK.getCode()) {
log.error("DPTech run dispose {} error: {}", ip, v.getResultInfo());
}
}
});
}
}
@ -243,6 +249,8 @@ public class DpTechImpl implements DisposeEntryManager {
*/
@Override
public MulReturnType<ErrorCode, Long> stopDispose(String ipAddr, DeviceCapacity type, Long devTaskId) {
ErrorCode err = ErrorCode.ERR_OK;
if (type != DeviceCapacity.CLEANUP) {
return new MulReturnType<>(ErrorCode.ERR_UNSUPPORT, null);
}
@ -251,19 +259,29 @@ public class DpTechImpl implements DisposeEntryManager {
log.info("++++Begging DPTech Stop Cleanup Task: {}", ipAddr);
// 遍历入口出口两个方向
for (int d : new int[]{0, 1}) {
// 遍历所有攻击类型的清洗任务
for (DpTechAttackType t : DpTechAttackType.values()) {
cleanTypePort.stopAbnormalTaskForUMC(ipAddr, t.getCode(), d);
CompletableFuture.supplyAsync(() -> cleanTypePort.stopAbnormalTaskForUMC(ipAddr, t.getCode(), d))
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("DPTech stop dispose: {}, {}, error:{}", ipAddr, t.getCode(), ex.getMessage());
} else {
log.debug("Stop Cleanup: {} --> {}:{}", d, t.getReadme(), t.getCode());
if(v.getResultRetVal() != ErrorCode.ERR_OK.getCode()) {
log.error("DPTech stop dispose {} error: {}", ipAddr, v.getResultInfo());
}
}
});
}
}
log.info("----Finish DPTech Stop Cleanup Task: {}", ipAddr);
return new MulReturnType<>(ErrorCode.ERR_OK, null);
} catch (Exception ex) {
log.error(ex.getMessage());
log.error("----Error DPTech Stop Cleanup Task: {}", ipAddr);
return new MulReturnType<>(ErrorCode.ERR_SYSTEMEXCEPTION, null);
err = ErrorCode.ERR_SYSTEMEXCEPTION;
}
return new MulReturnType<>(err, null);
}
/**

View File

@ -186,6 +186,37 @@ public class TaskServiceImpl implements TaskService {
return ErrorCode.ERR_NOSUCHDEVICE;
}
// // 对新建的任务执行启动操作
// if (task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode()) {
// // 更新任务状态为进行中
// taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode());
//
// dev.forEach(k -> {
// DisposeEntryManager dp = DeviceRouter.getDeviceRouterFactory(k.getType(), k.getIpAddr());
//
// MulReturnType<ErrorCode, Long> ret = dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], planDuration);
//
// if (ret.getFirstParam() == ErrorCode.ERR_SYSTEMEXCEPTION) {
// log.error("Start task: taskId:{}, error:{}", taskId, ret.getFirstParam());
// // 增加设备执行清洗任务信息
// taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null,
// (long) ErrorCode.ERR_SYSTEMEXCEPTION.getCode());
// } else {
// if (ret.getFirstParam() != ErrorCode.ERR_OK) {
// // 执行任务失败恢复缓存中的任务状态
// taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, (long) ret.getFirstParam().getCode());
// log.error("Start task: taskId:{}, disposeIp:{}, error:{}",
// taskId, task.getDisposeIp(), ret.getFirstParam());
// } else {
// // 执行任务成功
// taskInfoMapper.addNewTaskInfo(taskId, k.getId(), ret.getSecondParam(), (long) ret.getFirstParam().getCode());
// log.info("Start task finished: taskId:{}, disposeId:{}, type:{}",
// taskId, task.getDisposeIp(), task.getType());
// }
// }
// });
// }
// 对新建的任务执行启动操作
if (task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode()) {
// 更新任务状态为进行中
@ -230,7 +261,6 @@ public class TaskServiceImpl implements TaskService {
*/
@Override
public ErrorCode stopTask(Long taskId) {
TaskInfoDetail task = taskCacheManager.getTaskById(taskId);
if (task != null) {
@ -251,6 +281,36 @@ public class TaskServiceImpl implements TaskService {
return err;
}
// dev.forEach(k -> {
// DisposeEntryManager dp = DeviceRouter.getDeviceRouterFactory(k.getType(), k.getIpAddr());
//
// TaskInfo taskInfo = taskInfoMapper.getTaskInfo(taskId, k.getId());
//
// MulReturnType<ErrorCode, Long> ret = dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()],
// (taskInfo == null || taskInfo.getExternId() == null) ? -1 : taskInfo.getExternId());
//
// if (ret.getFirstParam() == ErrorCode.ERR_SYSTEMEXCEPTION) {
// // 恢复缓存中任务状态到先前状态
// taskCacheManager.upgradeTaskStatus(taskId, prdStatus);
// log.error("Stop task: taskId:{}, error:{}", taskId, ret.getFirstParam());
// } else {
// if (ret.getFirstParam() != ErrorCode.ERR_OK) {
// // 恢复缓存中任务状态到先前状态
// taskCacheManager.upgradeTaskStatus(taskId, prdStatus);
// log.error("Stop task: taskId:{}, error:{}", taskId, ret.getFirstParam());
// } else {
// // 任务执行完成后更新数据库处置任务状态
// finishTask(taskId);
// log.info("Stop task finished: taskId:{}, disposeId:{}, type:{}",
// taskId, task.getDisposeIp(), task.getType());
// }
// }
// });
// } else {
// log.error("No such task: taskId:{}", taskId);
// return ErrorCode.ERR_NOSUCHTASK;
// }
dev.forEach(k -> {
DisposeEntryManager dp = DeviceRouter.getDeviceRouterFactory(k.getType(), k.getIpAddr());
@ -259,7 +319,7 @@ public class TaskServiceImpl implements TaskService {
// 异步启动处置任务
CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(),
DeviceCapacity.values()[task.getType()],
(taskInfo == null || taskInfo.getExternId() == null) ? -1 :taskInfo.getExternId()))
(taskInfo == null || taskInfo.getExternId() == null) ? -1 : taskInfo.getExternId()))
.whenComplete((v, ex) -> {
if (ex != null) {
// 恢复缓存中任务状态到先前状态