REM:
1. 修改停止清洗任务接口
This commit is contained in:
chenlinghy 2020-07-01 17:27:34 +08:00
parent 5b7f390e73
commit cd865e79e2
1 changed files with 92 additions and 83 deletions

View File

@ -11,6 +11,7 @@ import com.dispose.mapper.DisposeTaskMapper;
import com.dispose.mapper.TaskInfoMapper; import com.dispose.mapper.TaskInfoMapper;
import com.dispose.pojo.entity.DisposeDevice; import com.dispose.pojo.entity.DisposeDevice;
import com.dispose.pojo.po.MulReturnType; import com.dispose.pojo.po.MulReturnType;
import com.dispose.pojo.vo.common.TaskInfo;
import com.dispose.pojo.vo.common.TaskInfoDetail; import com.dispose.pojo.vo.common.TaskInfoDetail;
import com.dispose.service.DisposeNodeManager; import com.dispose.service.DisposeNodeManager;
import com.dispose.service.TaskService; import com.dispose.service.TaskService;
@ -91,8 +92,8 @@ public class TaskServiceImpl implements TaskService {
public void loadTaskFromDatabase() { public void loadTaskFromDatabase() {
// 从数据库中取出所有任务 // 从数据库中取出所有任务
List<TaskInfoDetail> taskList = disposeTaskMapper.selectAll().stream() List<TaskInfoDetail> taskList = disposeTaskMapper.selectAll().stream()
.filter(this::taskIsRunning) .filter(this::taskIsRunning)
.collect(Collectors.toList()); .collect(Collectors.toList());
taskList.forEach(v -> { taskList.forEach(v -> {
v.setRetryTimes(0); v.setRetryTimes(0);
@ -132,30 +133,30 @@ public class TaskServiceImpl implements TaskService {
if (disposeDevice.size() == 0) { if (disposeDevice.size() == 0) {
log.error("No such device to dispose this task: devId:{}, disposeIp:{}, type:{}", log.error("No such device to dispose this task: devId:{}, disposeIp:{}, type:{}",
task.getDeviceId(), task.getDisposeIp(), task.getType()); task.getDeviceId(), task.getDisposeIp(), task.getType());
return MulReturnType.<ErrorCode, Long>builder() return MulReturnType.<ErrorCode, Long>builder()
.firstParam(ErrorCode.ERR_NOSUCHDEVICE) .firstParam(ErrorCode.ERR_NOSUCHDEVICE)
.secondParam(-1L) .secondParam(-1L)
.build(); .build();
} }
// 查询当前是否有相同能力节点相同用户相同处置IP的且正在执行的处置任务如果存在则忽略该次任务依照产品需求 // 查询当前是否有相同能力节点相同用户相同处置IP的且正在执行的处置任务如果存在则忽略该次任务依照产品需求
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask() List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask()
.stream() .stream()
.filter(v -> ((taskId == null || taskId == -1L) || Objects.equals(v.getDeviceId(), task.getDeviceId())) .filter(v -> ((taskId == null || taskId == -1L) || Objects.equals(v.getDeviceId(), task.getDeviceId()))
&& Objects.equals(v.getAccountId(), task.getAccountId()) && Objects.equals(v.getAccountId(), task.getAccountId())
&& taskIsRunning(v) && taskIsRunning(v)
&& Objects.equals(v.getType(), task.getType()) && Objects.equals(v.getType(), task.getType())
&& Objects.equals(v.getDisposeIp(), task.getDisposeIp())) && Objects.equals(v.getDisposeIp(), task.getDisposeIp()))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (taskList.size() > 0) { if (taskList.size() > 0) {
log.error("Same dispose task is running: devId:{}, disposeIp:{}, type:{}", log.error("Same dispose task is running: devId:{}, disposeIp:{}, type:{}",
task.getDeviceId(), task.getDisposeIp(), task.getType()); task.getDeviceId(), task.getDisposeIp(), task.getType());
return MulReturnType.<ErrorCode, Long>builder() return MulReturnType.<ErrorCode, Long>builder()
.firstParam(ErrorCode.ERR_TASKRUNNING) .firstParam(ErrorCode.ERR_TASKRUNNING)
.secondParam(taskList.get(0).getId()) .secondParam(taskList.get(0).getId())
.build(); .build();
} }
// 将该任务写入数据库和缓存等到定时任务真正启动该任务 // 将该任务写入数据库和缓存等到定时任务真正启动该任务
@ -168,7 +169,7 @@ public class TaskServiceImpl implements TaskService {
ErrorCode err = taskCacheManager.addTask(cacheTask); ErrorCode err = taskCacheManager.addTask(cacheTask);
log.info("Create new dispose task: devId:{}, disposeIp:{}, type:{}, result:{}", log.info("Create new dispose task: devId:{}, disposeIp:{}, type:{}, result:{}",
task.getDeviceId(), task.getDisposeIp(), task.getType(), err.getMsg()); task.getDeviceId(), task.getDisposeIp(), task.getType(), err.getMsg());
return MulReturnType.<ErrorCode, Long>builder().firstParam(err).secondParam(task.getId()).build(); return MulReturnType.<ErrorCode, Long>builder().firstParam(err).secondParam(task.getId()).build();
} }
@ -211,26 +212,26 @@ public class TaskServiceImpl implements TaskService {
// 异步启动处置任务 // 异步启动处置任务
CompletableFuture.supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], planDuration)) CompletableFuture.supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], planDuration))
.whenComplete((v, ex) -> { .whenComplete((v, ex) -> {
if (ex != null) { if (ex != null) {
log.error("Start task: taskId:{}, error:{}", taskId, ex.getMessage()); log.error("Start task: taskId:{}, error:{}", taskId, ex.getMessage());
// 增加设备执行清洗任务信息 // 增加设备执行清洗任务信息
taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null,
(long) ErrorCode.ERR_SYSTEMEXCEPTION.getCode()); (long) ErrorCode.ERR_SYSTEMEXCEPTION.getCode());
} else {
if (v.getFirstParam() != ErrorCode.ERR_OK) {
// 执行任务失败恢复缓存中的任务状态
taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, (long) v.getFirstParam().getCode());
log.error("Start task: taskId:{}, disposeIp:{}, error:{}",
taskId, task.getDisposeIp(), v.getFirstParam().getMsg());
} else { } else {
// 执行任务失败恢复缓存中的任务状态 if (v.getFirstParam() != ErrorCode.ERR_OK) {
taskInfoMapper.addNewTaskInfo(taskId, k.getId(), v.getSecondParam(), (long) v.getFirstParam().getCode()); // 执行任务失败恢复缓存中的任务状态
log.info("Start task finished: taskId:{}, disposeId:{}, type:{}", taskInfoMapper.addNewTaskInfo(taskId, k.getId(), null, (long) v.getFirstParam().getCode());
taskId, task.getDisposeIp(), task.getType()); log.error("Start task: taskId:{}, disposeIp:{}, error:{}",
taskId, task.getDisposeIp(), v.getFirstParam().getMsg());
} else {
// 执行任务成功
taskInfoMapper.addNewTaskInfo(taskId, k.getId(), v.getSecondParam(), (long) v.getFirstParam().getCode());
log.info("Start task finished: taskId:{}, disposeId:{}, type:{}",
taskId, task.getDisposeIp(), task.getType());
}
} }
} });
});
}); });
} }
@ -269,26 +270,34 @@ public class TaskServiceImpl implements TaskService {
dev.forEach(k -> { dev.forEach(k -> {
DisposeEntryManager dp = DeviceRouter.deviceRouterFactory(k.getType(), k.getIpAddr()); DisposeEntryManager dp = DeviceRouter.deviceRouterFactory(k.getType(), k.getIpAddr());
// 异步启动处置任务 List<TaskInfo> TaskInfoList = taskInfoMapper.getTaskInfo(taskId, k.getId());
CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], -1L))
.whenComplete((v, ex) -> { TaskInfoList.forEach(t -> {
if (ex != null) { if (t.getExternId() == null) {
// 恢复缓存中任务状态到先前状态 t.setExternId(-1L);
taskCacheManager.upgradeTaskStatus(taskId, prdStatus); }
log.error("Stop task: taskId:{}, error:{}", taskId, ex.getMessage());
} else { // 异步启动处置任务
if (v.getFirstParam() != ErrorCode.ERR_OK) { CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()], t.getExternId()))
// 恢复缓存中任务状态到先前状态 .whenComplete((v, ex) -> {
taskCacheManager.upgradeTaskStatus(taskId, prdStatus); if (ex != null) {
log.error("Stop task: taskId:{}, error:{}", taskId, v.getFirstParam().getMsg()); // 恢复缓存中任务状态到先前状态
} else { taskCacheManager.upgradeTaskStatus(taskId, prdStatus);
// 任务执行完成后更新数据库处置任务状态 log.error("Stop task: taskId:{}, error:{}", taskId, ex.getMessage());
finishTask(taskId); } else {
log.info("Stop task finished: taskId:{}, disposeId:{}, type:{}", if (v.getFirstParam() != ErrorCode.ERR_OK) {
taskId, task.getDisposeIp(), task.getType()); // 恢复缓存中任务状态到先前状态
} taskCacheManager.upgradeTaskStatus(taskId, prdStatus);
} log.error("Stop task: taskId:{}, error:{}", taskId, v.getFirstParam().getMsg());
}); } else {
// 任务执行完成后更新数据库处置任务状态
finishTask(taskId);
log.info("Stop task finished: taskId:{}, disposeId:{}, type:{}",
taskId, task.getDisposeIp(), task.getType());
}
}
});
});
}); });
} else { } else {
log.error("No such task: taskId:{}", taskId); log.error("No such task: taskId:{}", taskId);
@ -330,7 +339,7 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public boolean taskIsRunning(TaskInfoDetail task) { public boolean taskIsRunning(TaskInfoDetail task) {
return task.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode() return task.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode()
|| task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode(); || task.getCurrentStatus() == DisposeTaskStatus.TASK_NEW.getCode();
} }
/** /**
@ -342,8 +351,8 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public boolean taskIsExpired(TaskInfoDetail task) { public boolean taskIsExpired(TaskInfoDetail task) {
return LocalDateTime.parse(task.getPlanEndTime(), return LocalDateTime.parse(task.getPlanEndTime(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
.isBefore(LocalDateTime.now()); .isBefore(LocalDateTime.now());
} }
/** /**
@ -355,11 +364,11 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public List<TaskInfoDetail> getNodeAllRunningTask(Long devId) { public List<TaskInfoDetail> getNodeAllRunningTask(Long devId) {
List<TaskInfoDetail> taskList = taskCacheManager List<TaskInfoDetail> taskList = taskCacheManager
.getAllRunningTask() .getAllRunningTask()
.stream() .stream()
.filter(v -> v.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode() .filter(v -> v.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode()
&& v.getDeviceId().equals(devId)) && v.getDeviceId().equals(devId))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (taskList.size() > 0) { if (taskList.size() > 0) {
log.info("The device has tasks running: devId:{}, tasksRunningNumber:{}", devId, taskList.size()); log.info("The device has tasks running: devId:{}, tasksRunningNumber:{}", devId, taskList.size());
@ -430,9 +439,9 @@ public class TaskServiceImpl implements TaskService {
// 根据处置IP拿出所有正在处置的任务 // 根据处置IP拿出所有正在处置的任务
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream() List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
.filter(v -> (Objects.equals(v.getId(), taskId)) .filter(v -> (Objects.equals(v.getId(), taskId))
&& taskIsRunning(v)) && taskIsRunning(v))
.collect(Collectors.toList()); .collect(Collectors.toList());
// 对符合条件的任务调用停止命令 // 对符合条件的任务调用停止命令
taskList.forEach(v -> { taskList.forEach(v -> {
@ -455,10 +464,10 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType) { public boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType) {
return taskCacheManager.getAllRunningTask().stream() return taskCacheManager.getAllRunningTask().stream()
.anyMatch(v -> Objects.equals(v.getAccountId(), userId) .anyMatch(v -> Objects.equals(v.getAccountId(), userId)
&& Objects.equals(v.getDeviceId(), devId) && Objects.equals(v.getDeviceId(), devId)
&& Objects.equals(v.getDisposeIp(), disposeIp) && Objects.equals(v.getDisposeIp(), disposeIp)
&& Objects.equals(v.getType(), disposeType)); && Objects.equals(v.getType(), disposeType));
} }
/** /**
@ -475,11 +484,11 @@ public class TaskServiceImpl implements TaskService {
// 根据处置IP拿出所有正在处置的任务 // 根据处置IP拿出所有正在处置的任务
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream() List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
.filter(v -> (Objects.equals(v.getDeviceId(), devId)) .filter(v -> (Objects.equals(v.getDeviceId(), devId))
&& (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
&& Objects.equals(v.getDisposeIp(), ipAddr) && Objects.equals(v.getDisposeIp(), ipAddr)
&& taskIsRunning(v)) && taskIsRunning(v))
.collect(Collectors.toList()); .collect(Collectors.toList());
// 对符合条件的任务调用停止命令 // 对符合条件的任务调用停止命令
taskList.forEach(v -> { taskList.forEach(v -> {
@ -503,10 +512,10 @@ public class TaskServiceImpl implements TaskService {
// 根据处置IP拿出所有正在处置的任务 // 根据处置IP拿出所有正在处置的任务
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream() List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
.filter(v -> ((devId == -1L) || Objects.equals(v.getDeviceId(), devId)) .filter(v -> ((devId == -1L) || Objects.equals(v.getDeviceId(), devId))
&& (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) && (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
&& taskIsRunning(v)) && taskIsRunning(v))
.collect(Collectors.toList()); .collect(Collectors.toList());
// 对符合条件的任务调用停止命令 // 对符合条件的任务调用停止命令
taskList.forEach(v -> { taskList.forEach(v -> {
@ -529,9 +538,9 @@ public class TaskServiceImpl implements TaskService {
// 根据处置IP拿出所有正在处置的任务 // 根据处置IP拿出所有正在处置的任务
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream() List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
.filter(v -> (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode()) .filter(v -> (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
&& taskIsRunning(v)) && taskIsRunning(v))
.collect(Collectors.toList()); .collect(Collectors.toList());
// 对符合条件的任务调用停止命令 // 对符合条件的任务调用停止命令
taskList.forEach(v -> { taskList.forEach(v -> {