parent
6bcebb0fd0
commit
1c471ef677
|
@ -32,7 +32,7 @@ public interface TaskCacheManager {
|
|||
*
|
||||
* @return the all task
|
||||
*/
|
||||
List<TaskInfoDetail> getAllTask();
|
||||
List<TaskInfoDetail> getAllRunningTask();
|
||||
|
||||
/**
|
||||
* Remove task.
|
||||
|
@ -49,8 +49,4 @@ public interface TaskCacheManager {
|
|||
* @return the error code
|
||||
*/
|
||||
ErrorCode upgradeTaskStatus(Long id, int status);
|
||||
|
||||
// void increaseRetryTimes();
|
||||
//
|
||||
// void cleanRetryTimes();
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ public class TaskCacheManagerImpl implements TaskCacheManager {
|
|||
* @return the all task
|
||||
*/
|
||||
@Override
|
||||
public List<TaskInfoDetail> getAllTask() {
|
||||
public List<TaskInfoDetail> getAllRunningTask() {
|
||||
return new ArrayList<>(taskCacheMap.values());
|
||||
}
|
||||
|
||||
|
|
|
@ -133,4 +133,11 @@ public interface TaskService {
|
|||
* @return the all task
|
||||
*/
|
||||
List<TaskInfoDetail> getAllTask();
|
||||
|
||||
/**
|
||||
* Gets active task.
|
||||
*
|
||||
* @return the active task
|
||||
*/
|
||||
List<TaskInfoDetail> getActiveTask();
|
||||
}
|
||||
|
|
|
@ -81,7 +81,9 @@ public class TaskServiceImpl implements TaskService {
|
|||
@Override
|
||||
public void loadTaskFromDatabase() {
|
||||
// 从数据库中取出所有任务
|
||||
List<TaskInfoDetail> taskList = disposeTaskMapper.selectAll();
|
||||
List<TaskInfoDetail> taskList = disposeTaskMapper.selectAll().stream()
|
||||
.filter(this::taskIsRunning)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
taskList.forEach(v -> {
|
||||
v.setRetryTimes(0);
|
||||
|
@ -126,7 +128,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
}
|
||||
|
||||
// 查询当前是否有相同能力节点,相同用户,相同处置IP的且正在执行的处置任务,如果存在则忽略该次任务(依照产品需求)
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask()
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask()
|
||||
.stream()
|
||||
.filter(v -> ((taskId == null || taskId == -1L) || Objects.equals(v.getDeviceId(), task.getDeviceId()))
|
||||
&& Objects.equals(v.getAccountId(), task.getAccountId())
|
||||
|
@ -326,7 +328,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
@Override
|
||||
public List<TaskInfoDetail> getNodeAllRunningTask(Long devId) {
|
||||
List<TaskInfoDetail> taskList = taskCacheManager
|
||||
.getAllTask()
|
||||
.getAllRunningTask()
|
||||
.stream()
|
||||
.filter(v -> v.getCurrentStatus() == DisposeTaskStatus.TASK_RUNNING.getCode()
|
||||
&& v.getDeviceId().equals(devId))
|
||||
|
@ -355,6 +357,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
return taskList;
|
||||
}
|
||||
|
||||
log.info("The device has nothing tasks: devId:{}", devId);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
|
@ -365,7 +368,27 @@ public class TaskServiceImpl implements TaskService {
|
|||
*/
|
||||
@Override
|
||||
public List<TaskInfoDetail> getAllTask() {
|
||||
return taskCacheManager.getAllTask();
|
||||
List<TaskInfoDetail> taskList = disposeTaskMapper.selectAll();
|
||||
|
||||
if (taskList.size() > 0) {
|
||||
return taskList;
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets active task.
|
||||
*
|
||||
* @return the active task
|
||||
*/
|
||||
@Override
|
||||
public List<TaskInfoDetail> getActiveTask() {
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask();
|
||||
|
||||
if (taskList.size() > 0) {
|
||||
return taskList;
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -379,7 +402,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
List<MulReturnType<ErrorCode, TaskInfoDetail>> retList = new ArrayList<>();
|
||||
|
||||
// 根据处置IP,拿出所有正在处置的任务
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask().stream()
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
|
||||
.filter(v -> (Objects.equals(v.getId(), taskId))
|
||||
&& taskIsRunning(v))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -404,7 +427,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
*/
|
||||
@Override
|
||||
public boolean taskIsExists(Long devId, Long userId, String disposeIp, int disposeType) {
|
||||
return taskCacheManager.getAllTask().stream()
|
||||
return taskCacheManager.getAllRunningTask().stream()
|
||||
.anyMatch(v -> Objects.equals(v.getAccountId(), userId)
|
||||
&& Objects.equals(v.getDeviceId(), devId)
|
||||
&& Objects.equals(v.getDisposeIp(), disposeIp)
|
||||
|
@ -424,7 +447,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
List<MulReturnType<ErrorCode, TaskInfoDetail>> retList = new ArrayList<>();
|
||||
|
||||
// 根据处置IP,拿出所有正在处置的任务
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask().stream()
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
|
||||
.filter(v -> (Objects.equals(v.getDeviceId(), devId))
|
||||
&& (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
|
||||
&& Objects.equals(v.getDisposeIp(), ipAddr)
|
||||
|
@ -452,7 +475,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
List<MulReturnType<ErrorCode, TaskInfoDetail>> retList = new ArrayList<>();
|
||||
|
||||
// 根据处置IP,拿出所有正在处置的任务
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask().stream()
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
|
||||
.filter(v -> ((devId == -1L) || Objects.equals(v.getDeviceId(), devId))
|
||||
&& (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
|
||||
&& taskIsRunning(v))
|
||||
|
@ -478,7 +501,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
List<MulReturnType<ErrorCode, TaskInfoDetail>> retList = new ArrayList<>();
|
||||
|
||||
// 根据处置IP,拿出所有正在处置的任务
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask().stream()
|
||||
List<TaskInfoDetail> taskList = taskCacheManager.getAllRunningTask().stream()
|
||||
.filter(v -> (Objects.equals(v.getType(), type) || type == DeviceCapacity.ALLCAPACITY.getCode())
|
||||
&& taskIsRunning(v))
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
@ -3,7 +3,6 @@ package com.dispose.task;
|
|||
import com.dispose.common.DisposeTaskStatus;
|
||||
import com.dispose.common.ErrorCode;
|
||||
import com.dispose.common.GlobalVar;
|
||||
import com.dispose.manager.TaskCacheManager;
|
||||
import com.dispose.pojo.vo.common.TaskInfoDetail;
|
||||
import com.dispose.service.TaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -22,9 +21,6 @@ import java.util.Iterator;
|
|||
@Component
|
||||
@Slf4j
|
||||
public class TaskManagerTask {
|
||||
@Resource
|
||||
private TaskCacheManager taskCacheManager;
|
||||
|
||||
@Resource
|
||||
private TaskService taskService;
|
||||
|
||||
|
@ -34,7 +30,7 @@ public class TaskManagerTask {
|
|||
@Async("bizExecutor")
|
||||
@Scheduled(fixedDelay = 1000)
|
||||
public void taskRuntimeManager() {
|
||||
Iterator<TaskInfoDetail> it = taskCacheManager.getAllTask().iterator();
|
||||
Iterator<TaskInfoDetail> it = taskService.getActiveTask().iterator();
|
||||
|
||||
// 由于可能删除列表中的项目,所以使用迭代器
|
||||
while (it.hasNext()) {
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TaskControllerTest extends InitTestEnvironment {
|
|||
}
|
||||
|
||||
private Long getExistsTaskId() {
|
||||
List<TaskInfoDetail> lt = taskCacheManager.getAllTask();
|
||||
List<TaskInfoDetail> lt = taskCacheManager.getAllRunningTask();
|
||||
|
||||
if (lt != null && lt.size() > 0) {
|
||||
return lt.get(0).getId();
|
||||
|
|
Loading…
Reference in New Issue