REM:
1. 清理暂时未使用的接口声明
2. 增加处置设备停止处置任务功能
This commit is contained in:
HuangXin 2020-08-24 11:17:55 +08:00
parent 74fef0cecf
commit 4762558d1b
8 changed files with 261 additions and 211 deletions

View File

@ -29,6 +29,13 @@ public interface DeviceTaskManager {
*/
List<DeviceTask> getNewDisposeDeviceTaskInfo();
/**
* Gets started dispose device task info.
*
* @return the started dispose device task info
*/
List<DeviceTask> getStartedDisposeDeviceTaskInfo();
/**
* Change dispose device task info status boolean.
*

View File

@ -2,10 +2,7 @@ package com.dispose.manager;
import com.dispose.common.DisposeCapacityType;
import com.dispose.common.DisposeTaskStatus;
import com.dispose.common.ErrorCode;
import com.dispose.pojo.entity.DeviceTask;
import com.dispose.pojo.entity.DisposeTask;
import com.dispose.pojo.po.MulReturnType;
import java.util.List;
@ -15,87 +12,6 @@ import java.util.List;
* @author <huangxin@cmhi.chinamoblie.com>
*/
public interface DisposeTaskManager {
/**
* Gets unfinished task.
*
* @return the unfinished task
*/
List<DeviceTask> getUnfinishedTask();
/**
* Create new task mul return type.
*
* @param task the task
* @return the mul return type
*/
MulReturnType<ErrorCode, Long> createNewTask(DisposeTask task);
/**
* Add task info mul return type.
*
* @param taskId the task id
* @param taskInfo the task info
* @return the mul return type
*/
MulReturnType<ErrorCode, Long> addTaskInfo(Long taskId, DeviceTask taskInfo);
/**
* Sets dispose task status.
*
* @param taskId the task id
* @param status the status
* @return the dispose task status
*/
ErrorCode setDisposeTaskStatus(Long taskId, DisposeTaskStatus status);
/**
* Gets task attack type mask.
*
* @param taskId the task id
* @return the task attack type mask
*/
Long getTaskAttackTypeMask(Long taskId);
/**
* Gets cur attack type mask.
*
* @param taskId the task id
* @return the cur attack type mask
*/
Long getCurAttackTypeMask(Long taskId);
/**
* Gets attack type mask status.
*
* @param taskId the task id
* @return the attack type mask status
*/
Long getAttackTypeMaskStatus(Long taskId);
/**
* Sets task attack type mask.
*
* @param taskId the task id
* @param mask the mask
*/
void setTaskAttackTypeMask(Long taskId, Long mask);
/**
* Sets cur attack type mask.
*
* @param taskId the task id
* @param mask the mask
*/
void setCurAttackTypeMask(Long taskId, Long mask);
/**
* Sets attack type mask status.
*
* @param taskId the task id
* @param mask the mask
*/
void setAttackTypeMaskStatus(Long taskId, Long mask);
/**
* Dispose ip running boolean.
*

View File

@ -57,6 +57,16 @@ public class DeviceTaskManagerImpl implements DeviceTaskManager {
return deviceTaskMapper.getNewTaskInfos();
}
/**
* Gets started dispose device task info.
*
* @return the started dispose device task info
*/
@Override
public List<DeviceTask> getStartedDisposeDeviceTaskInfo() {
return deviceTaskMapper.getRunningTaskInfos();
}
/**
* Change dispose device task info status boolean.
*

View File

@ -2,13 +2,10 @@ package com.dispose.manager.impl;
import com.dispose.common.DisposeCapacityType;
import com.dispose.common.DisposeTaskStatus;
import com.dispose.common.ErrorCode;
import com.dispose.manager.DisposeTaskManager;
import com.dispose.mapper.DeviceTaskMapper;
import com.dispose.mapper.DisposeTaskMapper;
import com.dispose.pojo.entity.DeviceTask;
import com.dispose.pojo.entity.DisposeTask;
import com.dispose.pojo.po.MulReturnType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -30,122 +27,10 @@ public class DisposeTaskManagerImpl implements DisposeTaskManager {
private DisposeTaskMapper disposeTaskMapper;
/**
* The Task info mapper.
* The Device task mapper.
*/
@Resource
private DeviceTaskMapper deviceTaskMapper;
/**
* Create new task mul return type.
*
* @param task the task
* @return the mul return type
*/
@Override
public MulReturnType<ErrorCode, Long> createNewTask(DisposeTask task) {
return null;
}
/**
* Add task info mul return type.
*
* @param taskId the task id
* @param taskInfo the task info
* @return the mul return type
*/
@Override
public MulReturnType<ErrorCode, Long> addTaskInfo(Long taskId, DeviceTask taskInfo) {
return null;
}
/**
* Gets unfinished task.
*
* @return the unfinished task
*/
@Override
public List<DeviceTask> getUnfinishedTask() {
return null;
}
/**
* Gets task attack type mask.
*
* @param taskId the task id
* @return the task attack type mask
*/
@Override
public Long getTaskAttackTypeMask(Long taskId) {
return null;
}
/**
* Gets attack type mask status.
*
* @param taskId the task id
* @return the attack type mask status
*/
@Override
public Long getAttackTypeMaskStatus(Long taskId) {
return null;
}
/**
* Gets cur attack type mask.
*
* @param taskId the task id
* @return the cur attack type mask
*/
@Override
public Long getCurAttackTypeMask(Long taskId) {
return null;
}
/**
* Sets task attack type mask.
*
* @param taskId the task id
* @param mask the mask
*/
@Override
public void setTaskAttackTypeMask(Long taskId, Long mask) {
}
/**
* Sets cur attack type mask.
*
* @param taskId the task id
* @param mask the mask
*/
@Override
public void setCurAttackTypeMask(Long taskId, Long mask) {
}
/**
* Sets attack type mask status.
*
* @param taskId the task id
* @param mask the mask
*/
@Override
public void setAttackTypeMaskStatus(Long taskId, Long mask) {
}
/**
* Sets dispose task status.
*
* @param taskId the task id
* @param status the status
* @return the dispose task status
*/
@Override
public ErrorCode setDisposeTaskStatus(Long taskId, DisposeTaskStatus status) {
return null;
}
/**
* Dispose ip running boolean.
*

View File

@ -58,6 +58,13 @@ public interface DeviceTaskMapper {
*/
List<DeviceTask> getNewTaskInfos();
/**
* Gets running task infos.
*
* @return the running task infos
*/
List<DeviceTask> getRunningTaskInfos();
/**
* Gets task by details.
*

View File

@ -7,7 +7,7 @@ package com.dispose.service;
*/
public interface DeviceTaskManagerService {
/**
* Device task manager schedule.
* Dispose task manager schedule.
*/
void disposeTaskManagerSchedule();
@ -15,4 +15,14 @@ public interface DeviceTaskManagerService {
* Device task runner schedule.
*/
void deviceTaskRunnerSchedule();
/**
* Device task stop schedule.
*/
void deviceTaskStopSchedule();
/**
* Schedule runner thread.
*/
void scheduleRunnerThread();
}

View File

@ -82,6 +82,30 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
}
}
/**
* Virtual device task stop.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void virtualDeviceTaskStop(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 调用设备执行处置任务
ret = ai.getDb().stopDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null, null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
log.info("VIRTUAL_DISPOSE stop task succeed: {}", deviceTask);
} else {
log.error("VIRTUAL_DISPOSE stop task error {}: {}", ret.getFirstParam(), deviceTask);
}
// 设置任务状态为结束
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_FINISHED);
}
/**
* Hao han device task run.
*
@ -114,6 +138,9 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", deviceTask,
ret.getSecondParam());
// 重置错误尝试次数
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) {
// 设置该任务为新任务待下次重试启动
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
@ -130,6 +157,35 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
}
}
/**
* Hao han device task stop.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void haoHanDeviceTaskStop(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 停止处置任务
ret = ai.getDb().stopDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null,
deviceTask.getExternId());
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
log.info("HAOHAN_PLATFORM stop task succeed: {}, device taskId {}", deviceTask,
ret.getSecondParam());
// 设置任务状态为结束
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_FINISHED);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) {
// 记录任务出错重试次数
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry() + 1);
log.error("HAOHAN_PLATFORM stop task times {} error {}: {}", deviceTask.getErrRetry(),
ret.getSecondParam(), deviceTask);
} else {
log.error("HAOHAN_PLATFORM stop task error {}: {}", ret.getFirstParam(), deviceTask);
}
}
/**
* Dp tech device task run.
*
@ -247,18 +303,113 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
// 全部处置任务调用成功后修改处置任务状态为启动中
if (taskSetupSucceed) {
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
DisposeTaskStatus.TASK_STARTED);
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTED);
// 重置错误尝试次数
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
log.info("DPTECH_UMC setup task succeed: {}", deviceTask);
}
}
/**
* Device task manager schedule.
* Dp tech device task stop.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void dpTechDeviceTaskStop(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 遍历处置设备执行成功的攻击类型
for (DpTechAttackType t : DpTechAttackType.maskToDdosAttackType(deviceTask.getAttackTypeStatusIn())) {
// 入方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
// 调用迪普设备停止处置任务
ret = ai.getDb().stopDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(),
NetflowDirection.DIRECTION_OUT, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志停止成功
deviceTaskManager.attackTypeStatusCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
log.info("DPTECH_UMC stop task {}, {} succeed: {}", t, NetflowDirection.DIRECTION_OUT, deviceTask);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 记录任务出错重试次数
deviceTask.setErrRetry(deviceTask.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry());
log.error("DPTECH_UMC stop task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_OUT, deviceTask.getErrRetry(), deviceTask);
} else {
log.info("DPTECH_UMC stop task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), deviceTask);
}
}
// 出方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
// 调用迪普设备启动处置任务
ret = ai.getDb().stopDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(),
NetflowDirection.DIRECTION_IN, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志启动成功
deviceTaskManager.attackTypeStatusCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
NetflowDirection.DIRECTION_IN, deviceTask);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 记录任务出错重试次数
deviceTask.setErrRetry(deviceTask.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry());
log.error("DPTECH_UMC stop task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_IN, deviceTask.getErrRetry(), deviceTask);
} else {
log.info("DPTECH_UMC stop task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_IN, ret.getFirstParam(), deviceTask);
}
}
}
// 检查需要处置的各种攻击类型任务停止状态与处置任务执行状态判断该处置任务是否调用成功
boolean taskStopSucceed = true;
// 获取处置任务当前在设备上的执行状态
DeviceTask devTask = deviceTaskManager.getTaskById(deviceTask.getId());
// 入方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
if (devTask.getAttackTypeStatusOut() != 0) {
taskStopSucceed = false;
}
}
// 出方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
if (devTask.getAttackTypeStatusIn() != 0) {
taskStopSucceed = false;
}
}
// 全部处置任务调用成功后修改处置任务状态为任务结束
if (taskStopSucceed) {
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_FINISHED);
// 重置错误尝试次数
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
log.info("DPTECH_UMC stop task succeed: {}", deviceTask);
}
}
/**
* Dispose task manager schedule.
*/
@Override
@Async("deviceTaskExecutor")
@Scheduled(fixedDelay = 1000)
public void disposeTaskManagerSchedule() {
// 清理过期任务
@ -296,8 +447,6 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
* Device task runner schedule.
*/
@Override
@Async("deviceTaskExecutor")
@Scheduled(fixedDelay = 1000)
public void deviceTaskRunnerSchedule() {
// 遍历所有新的设备处置任务
for (DeviceTask v : deviceTaskManager.getNewDisposeDeviceTaskInfo()) {
@ -306,9 +455,12 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
// 保护代码理论上不存在该情况
assert task != null;
log.info("Run task {}, {}", task, v);
// 获取设备
AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(task.getDeviceId());
// 启动新任务
log.info("Run task {}, {}", task, v);
switch (ai.getDev().getDeviceType()) {
case DPTECH_UMC:
dpTechDeviceTaskRun(ai, v, task);
@ -328,4 +480,59 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
}
}
}
/**
* Device task stop schedule.
*/
@Override
public void deviceTaskStopSchedule() {
// 遍历所有新的设备处置任务
for (DeviceTask v : deviceTaskManager.getStartedDisposeDeviceTaskInfo()) {
DisposeTask task = disposeTaskManager.getDisposeTaskById(v.getTaskId());
// 保护代码理论上不存在该情况
assert task != null;
// 获取设备
AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(task.getDeviceId());
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_EXPIRED);
log.info("Task expired, Stop: {}", v);
switch (ai.getDev().getDeviceType()) {
case DPTECH_UMC:
dpTechDeviceTaskStop(ai, v, task);
break;
case HAOHAN_PLATFORM:
haoHanDeviceTaskStop(ai, v, task);
break;
case VIRTUAL_DISPOSE:
virtualDeviceTaskStop(ai, v, task);
break;
default:
log.error("Unknown dispose device type: {}", ai.getDev());
break;
}
}
}
/**
* Schedule runner thread.
*/
@Override
@Async("deviceTaskExecutor")
@Scheduled(fixedDelay = 1000)
public void scheduleRunnerThread() {
// 处理处置任务数据
disposeTaskManagerSchedule();
// 处置设备启动任务
deviceTaskRunnerSchedule();
// 处置设备停止任务
deviceTaskStopSchedule();
}
}

View File

@ -54,8 +54,16 @@
<select id="getNewTaskInfos" resultMap="device_task">
SELECT *
FROM device_task
WHERE status = ${@com.dispose.common.DisposeTaskStatus@TASK_NEW.getValue()} OR
status = ${@com.dispose.common.DisposeTaskStatus@TASK_STARTING.getValue()}
WHERE status = ${@com.dispose.common.DisposeTaskStatus@TASK_NEW.getValue()}
OR status = ${@com.dispose.common.DisposeTaskStatus@TASK_STARTING.getValue()}
</select>
<select id="getRunningTaskInfos" resultMap="device_task">
SELECT *
FROM device_task
WHERE status = ${@com.dispose.common.DisposeTaskStatus@TASK_NEW.getValue()}
OR status = ${@com.dispose.common.DisposeTaskStatus@TASK_STARTING.getValue()}
OR status = ${@com.dispose.common.DisposeTaskStatus@TASK_STARTED.getValue()}
</select>
<select id="getTaskByDetails" resultMap="device_task">