diff --git a/src/main/java/com/dispose/service/DeviceTaskManagerService.java b/src/main/java/com/dispose/service/DeviceTaskManagerService.java index c1bf578a..0490bc8f 100644 --- a/src/main/java/com/dispose/service/DeviceTaskManagerService.java +++ b/src/main/java/com/dispose/service/DeviceTaskManagerService.java @@ -9,7 +9,7 @@ public interface DeviceTaskManagerService { /** * Device task manager schedule. */ - void deviceTaskManagerSchedule(); + void disposeTaskManagerSchedule(); /** * Device task runner schedule. diff --git a/src/main/java/com/dispose/service/impl/DeviceTaskManagerServiceImpl.java b/src/main/java/com/dispose/service/impl/DeviceTaskManagerServiceImpl.java index 05dfe4bb..5b09c564 100644 --- a/src/main/java/com/dispose/service/impl/DeviceTaskManagerServiceImpl.java +++ b/src/main/java/com/dispose/service/impl/DeviceTaskManagerServiceImpl.java @@ -47,23 +47,230 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService { @Resource private DisposeAbilityRouterService disposeAbilityRouterService; + /** + * Virtual device task run. + * + * @param ai the ai + * @param deviceTask the device task + * @param disposeTask the dispose task + */ + private void virtualDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) { + MulReturnType ret; + + // 设置任务状态为启动中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING); + // 设置启动任务攻击类型状态 + deviceTaskManager.setExecAttackType(deviceTask.getId(), NetflowDirection.DIRECTION_BI, + deviceTask.getTaskAttackType()); + + // 调用设备执行处置任务 + ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null, null); + + if (ret.getFirstParam() == ErrorCode.ERR_OK) { + // 设置攻击类型任务启动结果 + deviceTaskManager.setAttackTypeStatus(deviceTask.getId(), + disposeTask.getFlowDirection(), deviceTask.getTaskAttackType()); + // 更改处置任务状态为处置中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), + DisposeTaskStatus.TASK_STARTED); + log.info("VIRTUAL_DISPOSE setup task succeed: {}", deviceTask); + } else { + // 任务出错,不在重试,当做失败任务处理 + deviceTaskManager.setAttackTypeStatus(deviceTask.getId(), + disposeTask.getFlowDirection(), ~deviceTask.getTaskAttackType()); + log.error("VIRTUAL_DISPOSE setup task error {}: {}", ret.getFirstParam(), deviceTask); + } + } + + /** + * Hao han device task run. + * + * @param ai the ai + * @param deviceTask the device task + * @param disposeTask the dispose task + */ + private void haoHanDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) { + MulReturnType ret; + + // 设置任务状态为启动中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING); + // 设置启动任务攻击类型状态 + deviceTaskManager.setExecAttackType(deviceTask.getId(), NetflowDirection.DIRECTION_BI, + deviceTask.getTaskAttackType()); + + ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), null, null, + (long) Helper.getTimestampDiffNow(disposeTask.getPlanEndTime())); + + if (ret.getFirstParam() == ErrorCode.ERR_OK) { + // 设置攻击类型任务启动结果 + deviceTaskManager.setAttackTypeStatus(deviceTask.getId(), + disposeTask.getFlowDirection(), deviceTask.getTaskAttackType()); + // 更改处置任务状态为处置中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), + DisposeTaskStatus.TASK_STARTED); + + // 记录浩瀚设备返回的任务ID + deviceTaskManager.setTaskExternId(deviceTask.getId(), ret.getSecondParam()); + + log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", deviceTask, + ret.getSecondParam()); + } else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) { + // 设置该任务为新任务,待下次重试启动 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), + DisposeTaskStatus.TASK_NEW); + // 记录任务出错重试次数 + deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry() + 1); + log.error("HAOHAN_PLATFORM setup task times {} error {}: {}", deviceTask.getErrRetry(), + ret.getSecondParam(), deviceTask); + } else { + // 任务出错,不在重试,当做失败任务处理 + deviceTaskManager.setAttackTypeStatus(deviceTask.getId(), + disposeTask.getFlowDirection(), ~deviceTask.getTaskAttackType()); + log.error("HAOHAN_PLATFORM setup task error {}: {}", ret.getFirstParam(), deviceTask); + } + } + + /** + * Dp tech device task run. + * + * @param ai the ai + * @param deviceTask the device task + * @param disposeTask the dispose task + */ + private void dpTechDeviceTaskRun(AbilityInfo ai, DeviceTask deviceTask, DisposeTask disposeTask) { + MulReturnType ret; + + // 设置任务状态为启动中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTING); + + // 遍历设备攻击类型 + for (DpTechAttackType t : DpTechAttackType.maskToDdosAttackType(deviceTask.getTaskAttackType())) { + // 入方向 + if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) { + // 已经启动过的任务不再重复启动 + if ((deviceTask.getExecAttackTypeOut() & t.getAttackTypeMask()) != 0) { + continue; + } + // 设置启动任务攻击类型状态 + deviceTaskManager.execAttackTypeSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT, + t.getValue()); + // 调用迪普设备启动处置任务 + ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), + NetflowDirection.DIRECTION_OUT, t.getValue(), null); + + if (ret.getFirstParam() == ErrorCode.ERR_OK) { + // 标志启动成功 + deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT, + t.getValue()); + + log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, + NetflowDirection.DIRECTION_OUT, deviceTask); + + deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0); + } else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) { + // 设置该任务为新任务,待下次重试启动 + // 记录任务出错重试次数 + deviceTask.setErrRetry(deviceTask.getErrRetry() + 1); + deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry()); + //清除任务攻击类型启动标志 + deviceTaskManager.execAttackTypeCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT, + t.getValue()); + log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t, + NetflowDirection.DIRECTION_OUT, deviceTask.getErrRetry(), deviceTask); + } else { + log.info("DPTECH_UMC setup task {}, {} error {}: {}", t, + NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), deviceTask); + } + } + + // 出方向 + if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) { + // 已经启动过的任务不再重复启动 + if ((deviceTask.getExecAttackTypeIn() & t.getAttackTypeMask()) != 0) { + continue; + } + // 设置启动任务攻击类型状态 + deviceTaskManager.execAttackTypeSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN, + t.getValue()); + // 调用迪普设备启动处置任务 + ret = ai.getDb().runDispose(disposeTask.getDisposeIp(), disposeTask.getDisposeCapacity(), + NetflowDirection.DIRECTION_IN, t.getValue(), null); + + if (ret.getFirstParam() == ErrorCode.ERR_OK) { + // 标志启动成功 + deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN, + t.getValue()); + + log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, + NetflowDirection.DIRECTION_IN, deviceTask); + + deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0); + } else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) { + // 设置该任务为新任务,待下次重试启动 + // 记录任务出错重试次数 + deviceTask.setErrRetry(deviceTask.getErrRetry() + 1); + deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry()); + //清除任务攻击类型启动标志 + deviceTaskManager.execAttackTypeCleanBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN, + t.getValue()); + log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t, + NetflowDirection.DIRECTION_IN, deviceTask.getErrRetry(), deviceTask); + } else { + log.info("DPTECH_UMC setup task {}, {} error {}: {}", t, + NetflowDirection.DIRECTION_IN, ret.getFirstParam(), deviceTask); + } + } + } + + // 检查需要处置的各种攻击类型任务启动状态与处置任务执行状态,判断该处置任务是否调用成功 + boolean taskSetupSucceed = true; + + // 获取处置任务当前在设备上的执行状态 + DeviceTask devTask = deviceTaskManager.getTaskById(deviceTask.getId()); + + // 入方向 + if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_IN) { + if (!devTask.getExecAttackTypeOut().equals(devTask.getTaskAttackType()) + || !devTask.getExecAttackTypeOut().equals(devTask.getAttackTypeStatusOut())) { + taskSetupSucceed = false; + } + } + + // 出方向 + if (disposeTask.getFlowDirection() != NetflowDirection.DIRECTION_OUT) { + if (!devTask.getExecAttackTypeIn().equals(devTask.getTaskAttackType()) + || !devTask.getExecAttackTypeIn().equals(devTask.getAttackTypeStatusIn())) { + taskSetupSucceed = false; + } + } + + // 全部处置任务调用成功后,修改处置任务状态为启动中 + if (taskSetupSucceed) { + // 更改处置任务状态为处置中 + deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), + DisposeTaskStatus.TASK_STARTED); + log.info("DPTECH_UMC setup task succeed: {}", deviceTask); + } + } + /** * Device task manager schedule. */ @Override @Async("deviceTaskExecutor") @Scheduled(fixedDelay = 1000) - public void deviceTaskManagerSchedule() { + public void disposeTaskManagerSchedule() { + // 清理过期任务 - disposeTaskManager.getExpiredTasks().forEach(v -> { + for (DisposeTask v : disposeTaskManager.getExpiredTasks()) { log.info("Task {} {} {} is expired, expect finished at {}", v.getId(), v.getDisposeCapacity(), v.getDisposeIp(), v.getPlanEndTime()); // 设置任务状态,过期任务不再为设备创建处置任务 disposeTaskManager.changeDisposeTaskStatus(v.getId(), DisposeTaskStatus.TASK_EXPIRED); - }); + } // 对新建的任务,创建对应处置设备任务信息 - disposeTaskManager.getNewDisposeTasks().forEach(v -> { + for (DisposeTask v : disposeTaskManager.getNewDisposeTasks()) { // 设置任务状态, 开始创建任务 disposeTaskManager.changeDisposeTaskStatus(v.getId(), DisposeTaskStatus.TASK_STARTING); @@ -82,7 +289,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService { log.error("Add task {} to device {} error", v, d); } }); - }); + } } /** @@ -92,191 +299,33 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService { @Async("deviceTaskExecutor") @Scheduled(fixedDelay = 1000) public void deviceTaskRunnerSchedule() { - deviceTaskManager.getNewDisposeDeviceTaskInfo().forEach(v -> { + // 遍历所有新的设备处置任务 + for (DeviceTask v : deviceTaskManager.getNewDisposeDeviceTaskInfo()) { DisposeTask task = disposeTaskManager.getDisposeTaskById(v.getTaskId()); - assert task != null; - log.info("Run task {}, {}", task, v); - MulReturnType ret; + // 保护代码,理论上不存在该情况 + assert task != null; + + log.info("Run task {}, {}", task, v); AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(task.getDeviceId()); switch (ai.getDev().getDeviceType()) { case DPTECH_UMC: - // 设置任务状态为启动中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING); - - // 遍历设备攻击类型 - for (DpTechAttackType t : DpTechAttackType.maskToDdosAttackType(v.getTaskAttackType())) { - // 入方向 - if (task.getFlowDirection() != NetflowDirection.DIRECTION_IN) { - // 已经启动过的任务不再重复启动 - if ((v.getExecAttackTypeOut() & t.getAttackTypeMask()) != 0) { - continue; - } - // 设置启动任务攻击类型状态 - deviceTaskManager.execAttackTypeSetBit(v.getId(), NetflowDirection.DIRECTION_OUT, - t.getValue()); - // 调用迪普设备启动处置任务 - ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), - NetflowDirection.DIRECTION_OUT, t.getValue(), null); - - if (ret.getFirstParam() == ErrorCode.ERR_OK) { - // 标志启动成功 - deviceTaskManager.attackTypeStatusSetBit(v.getId(), NetflowDirection.DIRECTION_OUT, - t.getValue()); - - log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, - NetflowDirection.DIRECTION_OUT, v); - - deviceTaskManager.setTaskErrRetryTimes(v.getId(), 0); - } else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) { - // 设置该任务为新任务,待下次重试启动 - // 记录任务出错重试次数 - v.setErrRetry(v.getErrRetry() + 1); - deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry()); - //清除任务攻击类型启动标志 - deviceTaskManager.execAttackTypeCleanBit(v.getId(), NetflowDirection.DIRECTION_OUT, - t.getValue()); - log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t, - NetflowDirection.DIRECTION_OUT, v.getErrRetry(), v); - } else { - log.info("DPTECH_UMC setup task {}, {} error {}: {}", t, - NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), v); - } - } - - // 出方向 - if (task.getFlowDirection() != NetflowDirection.DIRECTION_OUT) { - // 已经启动过的任务不再重复启动 - if ((v.getExecAttackTypeIn() & t.getAttackTypeMask()) != 0) { - continue; - } - // 设置启动任务攻击类型状态 - deviceTaskManager.execAttackTypeSetBit(v.getId(), NetflowDirection.DIRECTION_IN, - t.getValue()); - // 调用迪普设备启动处置任务 - ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), - NetflowDirection.DIRECTION_IN, t.getValue(), null); - - if (ret.getFirstParam() == ErrorCode.ERR_OK) { - // 标志启动成功 - deviceTaskManager.attackTypeStatusSetBit(v.getId(), NetflowDirection.DIRECTION_IN, - t.getValue()); - - log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, - NetflowDirection.DIRECTION_IN, v); - - deviceTaskManager.setTaskErrRetryTimes(v.getId(), 0); - } else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) { - // 设置该任务为新任务,待下次重试启动 - // 记录任务出错重试次数 - v.setErrRetry(v.getErrRetry() + 1); - deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry()); - //清除任务攻击类型启动标志 - deviceTaskManager.execAttackTypeCleanBit(v.getId(), NetflowDirection.DIRECTION_IN, - t.getValue()); - log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t, - NetflowDirection.DIRECTION_IN, v.getErrRetry(), v); - } else { - log.info("DPTECH_UMC setup task {}, {} error {}: {}", t, - NetflowDirection.DIRECTION_IN, ret.getFirstParam(), v); - } - } - } - - boolean taskSetupSuccessed = true; - DeviceTask devTask = deviceTaskManager.getTaskById(v.getId()); - - if (task.getFlowDirection() != NetflowDirection.DIRECTION_IN) { - if (!devTask.getExecAttackTypeOut().equals(devTask.getTaskAttackType()) - || !devTask.getExecAttackTypeOut().equals(devTask.getAttackTypeStatusOut())) { - taskSetupSuccessed = false; - } - } - - if (task.getFlowDirection() != NetflowDirection.DIRECTION_OUT) { - if (!devTask.getExecAttackTypeIn().equals(devTask.getTaskAttackType()) - || !devTask.getExecAttackTypeIn().equals(devTask.getAttackTypeStatusIn())) { - taskSetupSuccessed = false; - } - } - - if (taskSetupSuccessed) { - // 更改处置任务状态为处置中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), - DisposeTaskStatus.TASK_STARTED); - log.info("DPTECH_UMC setup task succeed: {}", v); - } + dpTechDeviceTaskRun(ai, v, task); break; case HAOHAN_PLATFORM: - // 设置任务状态为启动中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING); - // 设置启动任务攻击类型状态 - deviceTaskManager.setExecAttackType(v.getId(), NetflowDirection.DIRECTION_BI, - v.getTaskAttackType()); - - ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), null, null, - (long) Helper.getTimestampDiffNow(task.getPlanEndTime())); - - if (ret.getFirstParam() == ErrorCode.ERR_OK) { - // 设置攻击类型任务启动结果 - deviceTaskManager.setAttackTypeStatus(v.getId(), - task.getFlowDirection(), v.getTaskAttackType()); - // 更改处置任务状态为处置中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), - DisposeTaskStatus.TASK_STARTED); - - // 记录浩瀚设备返回的任务ID - deviceTaskManager.setTaskExternId(v.getId(), ret.getSecondParam()); - - log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", v, ret.getSecondParam()); - } else if (v.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) { - // 设置该任务为新任务,待下次重试启动 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), - DisposeTaskStatus.TASK_NEW); - // 记录任务出错重试次数 - deviceTaskManager.setTaskErrRetryTimes(v.getId(), v.getErrRetry() + 1); - log.error("HAOHAN_PLATFORM setup task times {} error {}: {}", v.getErrRetry(), - ret.getSecondParam(), v); - } else { - // 任务出错,不在重试,当做失败任务处理 - deviceTaskManager.setAttackTypeStatus(v.getId(), - task.getFlowDirection(), ~v.getTaskAttackType()); - log.error("HAOHAN_PLATFORM setup task error {}: {}", ret.getFirstParam(), v); - } + haoHanDeviceTaskRun(ai, v, task); break; case VIRTUAL_DISPOSE: - // 设置任务状态为启动中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_STARTING); - // 设置启动任务攻击类型状态 - deviceTaskManager.setExecAttackType(v.getId(), NetflowDirection.DIRECTION_BI, - v.getTaskAttackType()); - - // 调用设备执行处置任务 - ret = ai.getDb().runDispose(task.getDisposeIp(), task.getDisposeCapacity(), null, null, null); - - if (ret.getFirstParam() == ErrorCode.ERR_OK) { - // 设置攻击类型任务启动结果 - deviceTaskManager.setAttackTypeStatus(v.getId(), - task.getFlowDirection(), v.getTaskAttackType()); - // 更改处置任务状态为处置中 - deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), - DisposeTaskStatus.TASK_STARTED); - log.info("VIRTUAL_DISPOSE setup task succeed: {}", v); - } else { - // 任务出错,不在重试,当做失败任务处理 - deviceTaskManager.setAttackTypeStatus(v.getId(), - task.getFlowDirection(), ~v.getTaskAttackType()); - log.error("VIRTUAL_DISPOSE setup task error {}: {}", ret.getFirstParam(), v); - } + virtualDeviceTaskRun(ai, v, task); break; default: log.error("Unknown dispose device type: {}", ai.getDev()); break; } - }); + } } }