REM:
1. 更正部分协议字段名称
2. 增加停止处置任务接口
3. 增加根据处置IP停止处置任务接口
4. 修正部分协议缺少当前任务执行状态字段
This commit is contained in:
HuangXin 2020-04-29 18:47:16 +08:00
parent d60bf3da8e
commit 87fbaa635d
12 changed files with 121 additions and 30 deletions

View File

@ -129,6 +129,8 @@ public enum ErrorCode {
* Err nosuchtask error code. * Err nosuchtask error code.
*/ */
ERR_NOSUCHTASK(30, "没有该任务"), ERR_NOSUCHTASK(30, "没有该任务"),
ERR_TASKNOTRUNNING(31, "该任务没有运行"),
; ;
/** /**

View File

@ -1,5 +1,6 @@
package com.dispose.common; package com.dispose.common;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -54,4 +55,22 @@ public class Helper {
return intArray.toArray(new Integer[0]); return intArray.toArray(new Integer[0]);
} }
public static int getTimestampSecond(String dateTime) {
return (int)(Timestamp.valueOf(dateTime).toInstant().toEpochMilli() / 1000);
}
public static int getTimestampDiff(String begin, String end) {
int starTime = getTimestampSecond(begin);
int endTm = getTimestampSecond(end);
return endTm - starTime;
}
public static int getTimestampDiffNow(String begin) {
int starTime = getTimestampSecond(begin);
int endTm = (int)(System.currentTimeMillis() / 1000);
return endTm - starTime;
}
} }

View File

@ -104,7 +104,7 @@ public class DisposeNodeManagerController {
retStatus.setDevId(ret.getSecondParam()); retStatus.setDevId(ret.getSecondParam());
retStatus.setStatus(ret.getFirstParam().getCode()); retStatus.setStatus(ret.getFirstParam().getCode());
retStatus.setMessage(ret.getFirstParam().getMsg()); retStatus.setMessage(ret.getFirstParam().getMsg());
rspInfo.getResult().add(retStatus); rspInfo.getItems().add(retStatus);
}); });
return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo); return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo);

View File

@ -1,7 +1,9 @@
package com.dispose.controller; package com.dispose.controller;
import com.dispose.common.ErrorCode; import com.dispose.common.ErrorCode;
import com.dispose.common.FlowDirection;
import com.dispose.common.Helper; import com.dispose.common.Helper;
import com.dispose.manager.TaskCacheManager;
import com.dispose.pojo.dto.ProtocolReqDTO; import com.dispose.pojo.dto.ProtocolReqDTO;
import com.dispose.pojo.dto.ProtocolRespDTO; import com.dispose.pojo.dto.ProtocolRespDTO;
import com.dispose.pojo.entity.DisposeDevice; import com.dispose.pojo.entity.DisposeDevice;
@ -48,12 +50,27 @@ import java.util.stream.Collectors;
@Api(value = "抗DDoS处置平台处置任务接口", tags = "抗DDoS处置平台处置任务接口") @Api(value = "抗DDoS处置平台处置任务接口", tags = "抗DDoS处置平台处置任务接口")
@Component @Component
public class DisposeTaskController { public class DisposeTaskController {
/**
* The Dispose node manager.
*/
@Resource @Resource
private DisposeNodeManager disposeNodeManager; private DisposeNodeManager disposeNodeManager;
/**
* The Task service.
*/
@Resource @Resource
private TaskService taskService; private TaskService taskService;
/**
* The Task cache manager.
*/
@Resource
private TaskCacheManager taskCacheManager;
/**
* The User account service.
*/
@Resource @Resource
private UserAccountService userAccountService; private UserAccountService userAccountService;
@ -86,8 +103,8 @@ public class DisposeTaskController {
.type(reqInfo.getType()) .type(reqInfo.getType())
.disposeIp(reqInfo.getDisposeIp()) .disposeIp(reqInfo.getDisposeIp())
.planEndTime(String.valueOf(reqInfo.getDisposeTime())) .planEndTime(String.valueOf(reqInfo.getDisposeTime()))
.flowDirection(reqInfo.getFlowDirection()) .flowDirection(reqInfo.getFlowDirection() != null ? reqInfo.getFlowDirection() : FlowDirection.DIRECTION_TWOWAY.getCode())
.flowBandWidth(reqInfo.getFlowBandWidth()) .flowBandWidth(reqInfo.getFlowBandWidth() != null ? reqInfo.getFlowBandWidth() : 1024)
.attackType(Helper.attackArrayToString(reqInfo.getAttackType())) .attackType(Helper.attackArrayToString(reqInfo.getAttackType()))
.build(); .build();
@ -129,9 +146,22 @@ public class DisposeTaskController {
} }
for (String v : reqInfo.getTaskId()) { for (String v : reqInfo.getTaskId()) {
TaskInfoData taskData = TaskInfoData.builder() TaskInfoData taskData = TaskInfoData.builder().taskId(v).build();
.taskId(v) TaskInfoDetail task = taskCacheManager.getTaskById(Long.parseLong(v));
.build();
if (task == null) {
err = ErrorCode.ERR_TASKNOTRUNNING;
} else {
err = taskService.stopTask(Long.parseLong(v));
if (err == ErrorCode.ERR_OK) {
taskData.setTaskId(v);
taskData.setDisposeIp(task.getDisposeIp());
taskData.setType(task.getType());
taskData.setLeftTime(Math.abs(Helper.getTimestampDiffNow(task.getBeginTime())
- Helper.getTimestampDiffNow(task.getPlanEndTime())));
}
}
taskData.setStatus(err.getCode()); taskData.setStatus(err.getCode());
taskData.setMessage(err.getMsg()); taskData.setMessage(err.getMsg());
@ -139,7 +169,7 @@ public class DisposeTaskController {
rspInfo.getItems().add(taskData); rspInfo.getItems().add(taskData);
} }
return ProtocolRespDTO.result(err, rspInfo); return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo);
} }
/** /**
@ -166,13 +196,27 @@ public class DisposeTaskController {
TaskInfoRsp rspInfo = new TaskInfoRsp(); TaskInfoRsp rspInfo = new TaskInfoRsp();
reqInfo.getItems().forEach(v -> { reqInfo.getItems().forEach(v -> {
TaskInfoData taskData = TaskInfoData.builder() ErrorCode retError;
.type(v.getType())
.build(); TaskInfoData taskData = TaskInfoData.builder().type(v.getType()).build();
List<MReturnType<ErrorCode, TaskInfoDetail>> ret = taskService.stopTaskByDisposeIp(Long.parseLong(v.getId()), v.getType(), v.getDisposeIp());
if (ret.size() == 0) {
retError = ErrorCode.ERR_NOSUCHTASK;
} else {
retError = ret.get(0).getFirstParam();
TaskInfoDetail taskInfo = ret.get(0).getSecondParam();
taskData.setTaskId(String.valueOf(taskInfo.getId()));
taskData.setType(taskInfo.getType());
taskData.setDisposeIp(v.getDisposeIp());
taskData.setLeftTime(Math.abs(Helper.getTimestampDiffNow(taskInfo.getBeginTime())
- Helper.getTimestampDiffNow(taskInfo.getPlanEndTime())));
}
taskData.setId(v.getId()); taskData.setId(v.getId());
taskData.setStatus(err.getCode()); taskData.setStatus(retError.getCode());
taskData.setMessage(err.getMsg()); taskData.setMessage(retError.getMsg());
rspInfo.getItems().add(taskData); rspInfo.getItems().add(taskData);
}); });
@ -207,7 +251,7 @@ public class DisposeTaskController {
*/ */
@PostMapping("/stop_all") @PostMapping("/stop_all")
@ResponseBody @ResponseBody
@ApiOperation("启动处置任务") @ApiOperation("停止所有处置任务")
public ProtocolRespDTO taskStopAll(@RequestBody ProtocolReqDTO mr, public ProtocolRespDTO taskStopAll(@RequestBody ProtocolReqDTO mr,
@RequestHeader HttpHeaders headers) @RequestHeader HttpHeaders headers)
throws JsonProcessingException { throws JsonProcessingException {

View File

@ -16,7 +16,7 @@ import lombok.NoArgsConstructor;
@Builder @Builder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@JsonPropertyOrder({"id", "taskId", "type", "disposeIp", "startTime", "disposeTime", @JsonPropertyOrder({"id", "taskId", "currentStatus", "type", "disposeIp", "startTime", "disposeTime",
"flowDirection", "attackType", "flowBandWidth", "flowAttack", "flowClean", "leftTime", "status", "message"}) "flowDirection", "attackType", "flowBandWidth", "flowAttack", "flowClean", "leftTime", "status", "message"})
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
public class TaskInfoData extends IDReturnStatus { public class TaskInfoData extends IDReturnStatus {

View File

@ -19,12 +19,12 @@ public class AddNodeRsp {
/** /**
* The Result. * The Result.
*/ */
List<AddNodeRetData> result; List<AddNodeRetData> items;
/** /**
* Instantiates a new Add node rsp. * Instantiates a new Add node rsp.
*/ */
public AddNodeRsp() { public AddNodeRsp() {
this.result = new ArrayList<>(); this.items = new ArrayList<>();
} }
} }

View File

@ -38,6 +38,16 @@ public interface TaskService {
*/ */
ErrorCode stopTask(Long taskId); ErrorCode stopTask(Long taskId);
/**
* Stop task by dispose ip error code.
*
* @param devId the dev id
* @param type the type
* @param ipAddr the ip addr
* @return the error code
*/
List<MReturnType<ErrorCode, TaskInfoDetail>> stopTaskByDisposeIp(Long devId, int type, String ipAddr);
/** /**
* Finish task error code. * Finish task error code.
* *

View File

@ -73,7 +73,6 @@ public class DisposeNodeManagerImpl implements DisposeNodeManager {
*/ */
@Override @Override
public ErrorCode delDisposeDeviceById(Long id) { public ErrorCode delDisposeDeviceById(Long id) {
DisposeEntryManager dp;
DisposeDevice dev = disposeDeviceMapper.getDeviceById(id); DisposeDevice dev = disposeDeviceMapper.getDeviceById(id);
if (dev == null) { if (dev == null) {
@ -91,8 +90,6 @@ public class DisposeNodeManagerImpl implements DisposeNodeManager {
*/ */
@Override @Override
public ErrorCode delDisposeDeviceByIp(String ipAddr) { public ErrorCode delDisposeDeviceByIp(String ipAddr) {
DisposeEntryManager dp;
if (disposeDeviceMapper.isDeviceExistsByIp(ipAddr) == 0 if (disposeDeviceMapper.isDeviceExistsByIp(ipAddr) == 0
|| !disposeDevMap.containsKey(ipAddr)) { || !disposeDevMap.containsKey(ipAddr)) {
return ErrorCode.ERR_NOSUCHDEVICE; return ErrorCode.ERR_NOSUCHDEVICE;

View File

@ -210,8 +210,7 @@ public class TaskServiceImpl implements TaskService {
taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode()); taskCacheManager.upgradeTaskStatus(taskId, DisposeTaskStatus.TASK_RUNNING.getCode());
// 异步启动处置任务 // 异步启动处置任务
CompletableFuture<ErrorCode> future = CompletableFuture CompletableFuture.supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()]))
.supplyAsync(() -> dp.runDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()]))
.whenComplete((v, ex) -> { .whenComplete((v, ex) -> {
if (ex != null) { if (ex != null) {
log.error(ex.getMessage()); log.error(ex.getMessage());
@ -260,8 +259,7 @@ public class TaskServiceImpl implements TaskService {
int prdStatus = task.getCurrentStatus(); int prdStatus = task.getCurrentStatus();
// 异步启动处置任务 // 异步启动处置任务
CompletableFuture<ErrorCode> future = CompletableFuture CompletableFuture.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()]))
.supplyAsync(() -> dp.stopDispose(task.getDisposeIp(), DeviceCapacity.values()[task.getType()]))
.whenComplete((v, ex) -> { .whenComplete((v, ex) -> {
if (ex != null) { if (ex != null) {
// 恢复缓存中任务状态到先前状态 // 恢复缓存中任务状态到先前状态
@ -274,11 +272,10 @@ public class TaskServiceImpl implements TaskService {
log.error("Start task {}:{} error {}", task.getId(), task.getDisposeIp(), err.getMsg()); log.error("Start task {}:{} error {}", task.getId(), task.getDisposeIp(), err.getMsg());
} else { } else {
// 任务执行完成后更新数据库处置任务状态 // 任务执行完成后更新数据库处置任务状态
disposeTaskMapper.changeTaskCurrentStatus(taskId, DisposeTaskStatus.TASK_STOP.getCode()); finishTask(taskId);
} }
} }
}); });
} }
return ErrorCode.ERR_OK; return ErrorCode.ERR_OK;
@ -401,4 +398,25 @@ public class TaskServiceImpl implements TaskService {
&& Objects.equals(v.getDisposeIp(), disposeIp) && Objects.equals(v.getDisposeIp(), disposeIp)
&& Objects.equals(v.getType(), disposeType)); && Objects.equals(v.getType(), disposeType));
} }
@Override
public List<MReturnType<ErrorCode, TaskInfoDetail>> stopTaskByDisposeIp(Long devId, int type, String ipAddr) {
List<MReturnType<ErrorCode, TaskInfoDetail>> retList = new ArrayList<>();
// 根据处置IP拿出所有正在处置的任务
List<TaskInfoDetail> taskList = taskCacheManager.getAllTask().parallelStream()
.filter(v -> ((devId == null || devId == -1L) || Objects.equals(v.getDeviceId(), devId)
&& Objects.equals(v.getType(), type))
&& Objects.equals(v.getDisposeIp(), ipAddr)
&& taskIsRunning(v))
.collect(Collectors.toList());
// 对符合条件的任务调用停止命令
taskList.parallelStream().forEach(v -> {
ErrorCode err = stopTask(v.getId());
retList.add(new MReturnType<>(err, v));
});
return retList;
}
} }

View File

@ -46,7 +46,6 @@ public class TaskManagerTask {
log.info("Finish expired task {}:{} begin at {}", log.info("Finish expired task {}:{} begin at {}",
taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime()); taskData.getId(), taskData.getDisposeIp(), taskData.getBeginTime());
taskService.stopTask(taskData.getId()); taskService.stopTask(taskData.getId());
taskService.finishTask(taskData.getId());
continue; continue;
} }

View File

@ -142,7 +142,7 @@ public class DeviceNodeManagerControllerTest extends InitTestEnvironment {
//将json字符串转为AddNodeRsp类 //将json字符串转为AddNodeRsp类
AddNodeRsp addNodeRsp = objectMapper.readValue(msgContent, AddNodeRsp.class); AddNodeRsp addNodeRsp = objectMapper.readValue(msgContent, AddNodeRsp.class);
List<AddNodeRetData> addNodeList = addNodeRsp.getResult(); List<AddNodeRetData> addNodeList = addNodeRsp.getItems();
for (AddNodeRetData d : addNodeList for (AddNodeRetData d : addNodeList
) { ) {
//before: no device information after: device information exists //before: no device information after: device information exists

View File

@ -133,7 +133,7 @@ public class TaskControllerTest extends InitTestEnvironment {
@Test @Test
public void t2_stopTask() throws Exception { public void t2_stopTask() throws Exception {
IDArrayReq reqData = IDArrayReq.builder() IDArrayReq reqData = IDArrayReq.builder()
.taskId(new String[]{"1", "2"}) .taskId(new String[]{"40", "2"})
.build(); .build();
ProtocolReqDTO reqInfo = new ProtocolReqDTO(); ProtocolReqDTO reqInfo = new ProtocolReqDTO();
@ -233,8 +233,10 @@ public class TaskControllerTest extends InitTestEnvironment {
@Test @Test
public void t5_stopAllTask() throws Exception { public void t5_stopAllTask() throws Exception {
StopTaskData itemData = StopTaskData.builder() StopTaskData itemData = StopTaskData.builder()
.type(DeviceCapacity.CLEANUP.getCode()) .disposeIp("192.168.1.1")
.build(); .type(DeviceCapacity.CLEANUP.getCode())
.id("210")
.build();
StopTaskReq reqData = new StopTaskReq(); StopTaskReq reqData = new StopTaskReq();