REM:
1. 优化代码
This commit is contained in:
HuangXin 2020-08-24 10:16:42 +08:00
parent 4868b1abf8
commit 74fef0cecf
2 changed files with 223 additions and 174 deletions

View File

@ -9,7 +9,7 @@ public interface DeviceTaskManagerService {
/**
* Device task manager schedule.
*/
void deviceTaskManagerSchedule();
void disposeTaskManagerSchedule();
/**
* Device task runner schedule.

View File

@ -47,23 +47,230 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
@Resource
private DisposeAbilityRouterService disposeAbilityRouterService;
/**
* Virtual device task run.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void virtualDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING);
// 设置启动任务攻击类型状态
deviceTaskManager.setExecAttackType(deviceTask.getId(), NetflowDirection.DIRECTION_BI,
deviceTask.getTaskAttackType());
// 调用设备执行处置任务
ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null, null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 设置攻击类型任务启动结果
deviceTaskManager.setAttackTypeStatus(deviceTask.getId(),
disposeTask.getFlowDirection(), deviceTask.getTaskAttackType());
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
DisposeTaskStatus.TASK_STARTED);
log.info("VIRTUAL_DISPOSE setup task succeed: {}", deviceTask);
} else {
// 任务出错不在重试当做失败任务处理
deviceTaskManager.setAttackTypeStatus(deviceTask.getId(),
disposeTask.getFlowDirection(), ~deviceTask.getTaskAttackType());
log.error("VIRTUAL_DISPOSE setup task error {}: {}", ret.getFirstParam(), deviceTask);
}
}
/**
* Hao han device task run.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void haoHanDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING);
// 设置启动任务攻击类型状态
deviceTaskManager.setExecAttackType(deviceTask.getId(), NetflowDirection.DIRECTION_BI,
deviceTask.getTaskAttackType());
ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null,
(long) Helper.getTimestampDiffNow(disposeTask.getPlanEndTime()));
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 设置攻击类型任务启动结果
deviceTaskManager.setAttackTypeStatus(deviceTask.getId(),
disposeTask.getFlowDirection(), deviceTask.getTaskAttackType());
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
DisposeTaskStatus.TASK_STARTED);
// 记录浩瀚设备返回的任务ID
deviceTaskManager.setTaskExternId(deviceTask.getId(), ret.getSecondParam());
log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", deviceTask,
ret.getSecondParam());
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) {
// 设置该任务为新任务待下次重试启动
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
DisposeTaskStatus.TASK_NEW);
// 记录任务出错重试次数
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry() + 1);
log.error("HAOHAN_PLATFORM setup task times {} error {}: {}", deviceTask.getErrRetry(),
ret.getSecondParam(), deviceTask);
} else {
// 任务出错不在重试当做失败任务处理
deviceTaskManager.setAttackTypeStatus(deviceTask.getId(),
disposeTask.getFlowDirection(), ~deviceTask.getTaskAttackType());
log.error("HAOHAN_PLATFORM setup task error {}: {}", ret.getFirstParam(), deviceTask);
}
}
/**
* Dp tech device task run.
*
* @param ai the ai
* @param deviceTask the device task
* @param disposeTask the dispose task
*/
private void dpTechDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) {
MulReturnType<ErrorCode, Long> ret;
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING);
// 遍历设备攻击类型
for (DpTechAttackType t : DpTechAttackType.maskToDdosAttackType(deviceTask.getTaskAttackType())) {
// 入方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
// 已经启动过的任务不再重复启动
if ((deviceTask.getExecAttackTypeOut() & t.getAttackTypeMask()) != 0) {
continue;
}
// 设置启动任务攻击类型状态
deviceTaskManager.execAttackTypeSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
// 调用迪普设备启动处置任务
ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(),
NetflowDirection.DIRECTION_OUT, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志启动成功
deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
NetflowDirection.DIRECTION_OUT, deviceTask);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 设置该任务为新任务待下次重试启动
// 记录任务出错重试次数
deviceTask.setErrRetry(deviceTask.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry());
//清除任务攻击类型启动标志
deviceTaskManager.execAttackTypeCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_OUT, deviceTask.getErrRetry(), deviceTask);
} else {
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), deviceTask);
}
}
// 出方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
// 已经启动过的任务不再重复启动
if ((deviceTask.getExecAttackTypeIn() & t.getAttackTypeMask()) != 0) {
continue;
}
// 设置启动任务攻击类型状态
deviceTaskManager.execAttackTypeSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
// 调用迪普设备启动处置任务
ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(),
NetflowDirection.DIRECTION_IN, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志启动成功
deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
NetflowDirection.DIRECTION_IN, deviceTask);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 设置该任务为新任务待下次重试启动
// 记录任务出错重试次数
deviceTask.setErrRetry(deviceTask.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry());
//清除任务攻击类型启动标志
deviceTaskManager.execAttackTypeCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_IN, deviceTask.getErrRetry(), deviceTask);
} else {
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_IN, ret.getFirstParam(), deviceTask);
}
}
}
// 检查需要处置的各种攻击类型任务启动状态与处置任务执行状态判断该处置任务是否调用成功
boolean taskSetupSucceed = true;
// 获取处置任务当前在设备上的执行状态
DeviceTask devTask = deviceTaskManager.getTaskById(deviceTask.getId());
// 入方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
if (!devTask.getExecAttackTypeOut().equals(devTask.getTaskAttackType())
|| !devTask.getExecAttackTypeOut().equals(devTask.getAttackTypeStatusOut())) {
taskSetupSucceed = false;
}
}
// 出方向
if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
if (!devTask.getExecAttackTypeIn().equals(devTask.getTaskAttackType())
|| !devTask.getExecAttackTypeIn().equals(devTask.getAttackTypeStatusIn())) {
taskSetupSucceed = false;
}
}
// 全部处置任务调用成功后修改处置任务状态为启动中
if (taskSetupSucceed) {
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
DisposeTaskStatus.TASK_STARTED);
log.info("DPTECH_UMC setup task succeed: {}", deviceTask);
}
}
/**
* Device task manager schedule.
*/
@Override
@Async("deviceTaskExecutor")
@Scheduled(fixedDelay = 1000)
public void deviceTaskManagerSchedule() {
public void disposeTaskManagerSchedule() {
// 清理过期任务
disposeTaskManager.getExpiredTasks().forEach(v -> {
for (DisposeTask v : disposeTaskManager.getExpiredTasks()) {
log.info("Task {} {} {} is expired, expect finished at {}",
v.getId(), v.getDisposeCapacity(), v.getDisposeIp(), v.getPlanEndTime());
// 设置任务状态过期任务不再为设备创建处置任务
disposeTaskManager.changeDisposeTaskStatus(v.getId(), DisposeTaskStatus.TASK_EXPIRED);
});
}
// 对新建的任务创建对应处置设备任务信息
disposeTaskManager.getNewDisposeTasks().forEach(v -> {
for (DisposeTask v : disposeTaskManager.getNewDisposeTasks()) {
// 设置任务状态, 开始创建任务
disposeTaskManager.changeDisposeTaskStatus(v.getId(), DisposeTaskStatus.TASK_STARTING);
@ -82,7 +289,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
log.error("Add task {} to device {} error", v, d);
}
});
});
}
}
/**
@ -92,191 +299,33 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
@Async("deviceTaskExecutor")
@Scheduled(fixedDelay = 1000)
public void deviceTaskRunnerSchedule() {
deviceTaskManager.getNewDisposeDeviceTaskInfo().forEach(v -> {
// 遍历所有新的设备处置任务
for (DeviceTask v : deviceTaskManager.getNewDisposeDeviceTaskInfo()) {
DisposeTask task = disposeTaskManager.getDisposeTaskById(v.getTaskId());
assert task != null;
log.info("Run task {}, {}", task, v);
MulReturnType<ErrorCode, Long> ret;
// 保护代码理论上不存在该情况
assert task != null;
log.info("Run task {}, {}", task, v);
AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(task.getDeviceId());
switch (ai.getDev().getDeviceType()) {
case DPTECH_UMC:
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING);
// 遍历设备攻击类型
for (DpTechAttackType t : DpTechAttackType.maskToDdosAttackType(v.getTaskAttackType())) {
// 入方向
if (task.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
// 已经启动过的任务不再重复启动
if ((v.getExecAttackTypeOut() & t.getAttackTypeMask()) != 0) {
continue;
}
// 设置启动任务攻击类型状态
deviceTaskManager.execAttackTypeSetBit(v.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
// 调用迪普设备启动处置任务
ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(),
NetflowDirection.DIRECTION_OUT, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志启动成功
deviceTaskManager.attackTypeStatusSetBit(v.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
NetflowDirection.DIRECTION_OUT, v);
deviceTaskManager.setTaskErrRetryTimes(v.getId(), 0);
} else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 设置该任务为新任务待下次重试启动
// 记录任务出错重试次数
v.setErrRetry(v.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry());
//清除任务攻击类型启动标志
deviceTaskManager.execAttackTypeCleanBit(v.getId(), NetflowDirection.DIRECTION_OUT,
t.getValue());
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_OUT, v.getErrRetry(), v);
} else {
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), v);
}
}
// 出方向
if (task.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
// 已经启动过的任务不再重复启动
if ((v.getExecAttackTypeIn() & t.getAttackTypeMask()) != 0) {
continue;
}
// 设置启动任务攻击类型状态
deviceTaskManager.execAttackTypeSetBit(v.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
// 调用迪普设备启动处置任务
ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(),
NetflowDirection.DIRECTION_IN, t.getValue(), null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 标志启动成功
deviceTaskManager.attackTypeStatusSetBit(v.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
NetflowDirection.DIRECTION_IN, v);
deviceTaskManager.setTaskErrRetryTimes(v.getId(), 0);
} else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
// 设置该任务为新任务待下次重试启动
// 记录任务出错重试次数
v.setErrRetry(v.getErrRetry() + 1);
deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry());
//清除任务攻击类型启动标志
deviceTaskManager.execAttackTypeCleanBit(v.getId(), NetflowDirection.DIRECTION_IN,
t.getValue());
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
NetflowDirection.DIRECTION_IN, v.getErrRetry(), v);
} else {
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
NetflowDirection.DIRECTION_IN, ret.getFirstParam(), v);
}
}
}
boolean taskSetupSuccessed = true;
DeviceTask devTask = deviceTaskManager.getTaskById(v.getId());
if (task.getFlowDirection() != NetflowDirection.DIRECTION_IN) {
if (!devTask.getExecAttackTypeOut().equals(devTask.getTaskAttackType())
|| !devTask.getExecAttackTypeOut().equals(devTask.getAttackTypeStatusOut())) {
taskSetupSuccessed = false;
}
}
if (task.getFlowDirection() != NetflowDirection.DIRECTION_OUT) {
if (!devTask.getExecAttackTypeIn().equals(devTask.getTaskAttackType())
|| !devTask.getExecAttackTypeIn().equals(devTask.getAttackTypeStatusIn())) {
taskSetupSuccessed = false;
}
}
if (taskSetupSuccessed) {
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(),
DisposeTaskStatus.TASK_STARTED);
log.info("DPTECH_UMC setup task succeed: {}", v);
}
dpTechDeviceTaskRun(ai, v, task);
break;
case HAOHAN_PLATFORM:
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING);
// 设置启动任务攻击类型状态
deviceTaskManager.setExecAttackType(v.getId(), NetflowDirection.DIRECTION_BI,
v.getTaskAttackType());
ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), null, null,
(long) Helper.getTimestampDiffNow(task.getPlanEndTime()));
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 设置攻击类型任务启动结果
deviceTaskManager.setAttackTypeStatus(v.getId(),
task.getFlowDirection(), v.getTaskAttackType());
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(),
DisposeTaskStatus.TASK_STARTED);
// 记录浩瀚设备返回的任务ID
deviceTaskManager.setTaskExternId(v.getId(), ret.getSecondParam());
log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", v, ret.getSecondParam());
} else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) {
// 设置该任务为新任务待下次重试启动
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(),
DisposeTaskStatus.TASK_NEW);
// 记录任务出错重试次数
deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry() + 1);
log.error("HAOHAN_PLATFORM setup task times {} error {}: {}", v.getErrRetry(),
ret.getSecondParam(), v);
} else {
// 任务出错不在重试当做失败任务处理
deviceTaskManager.setAttackTypeStatus(v.getId(),
task.getFlowDirection(), ~v.getTaskAttackType());
log.error("HAOHAN_PLATFORM setup task error {}: {}", ret.getFirstParam(), v);
}
haoHanDeviceTaskRun(ai, v, task);
break;
case VIRTUAL_DISPOSE:
// 设置任务状态为启动中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING);
// 设置启动任务攻击类型状态
deviceTaskManager.setExecAttackType(v.getId(), NetflowDirection.DIRECTION_BI,
v.getTaskAttackType());
// 调用设备执行处置任务
ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), null, null, null);
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
// 设置攻击类型任务启动结果
deviceTaskManager.setAttackTypeStatus(v.getId(),
task.getFlowDirection(), v.getTaskAttackType());
// 更改处置任务状态为处置中
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(),
DisposeTaskStatus.TASK_STARTED);
log.info("VIRTUAL_DISPOSE setup task succeed: {}", v);
} else {
// 任务出错不在重试当做失败任务处理
deviceTaskManager.setAttackTypeStatus(v.getId(),
task.getFlowDirection(), ~v.getTaskAttackType());
log.error("VIRTUAL_DISPOSE setup task error {}: {}", ret.getFirstParam(), v);
}
virtualDeviceTaskRun(ai, v, task);
break;
default:
log.error("Unknown dispose device type: {}", ai.getDev());
break;
}
});
}
}
}