OCT
REM: 1. 浩瀚设备启动已经处置的IP时,支持自动停止多余的处置任务,保留最新一次处置任务 2. 更改处置能力枚举值 3. 增加IPv6地址格式化为标志字符串功能 4. 增加IPv4地址格式化为标准字符串功能 5. 处置接口支持IPv6地址 6. 增加协议请求Http头部拦截器,校验token 7. 增加停止处置任务接口 8. 增加并发锁,避免对同一个处置IP调用重复的处置命令 9. 清理、优化代码
This commit is contained in:
parent
6ca21ee928
commit
4fc3a58e6e
|
@ -149,7 +149,7 @@ public class DpTechAbilityImpl implements DisposeAbility {
|
|||
ErrorCode err = ErrorCode.ERR_OK;
|
||||
|
||||
try {
|
||||
log.info("++++Begging DPTech Start Cleanup Task: {}, {}, {} ", ip, attackType, nfDirection);
|
||||
log.debug("++++Begging DPTech Start Cleanup Task: {}, {}, {} ", ip, attackType, nfDirection);
|
||||
|
||||
NtcRequestResultInfo ret = cleanTypePort.startAbnormalTaskForUMC(ip, attackType,
|
||||
nfDirection.getValue());
|
||||
|
@ -160,7 +160,7 @@ public class DpTechAbilityImpl implements DisposeAbility {
|
|||
CommonEnumHandler.codeOf(DpTechAttackType.class, attackType),
|
||||
ret.getResultInfo().getValue());
|
||||
} else {
|
||||
log.info("----Finish DPTech Start Cleanup Task: {}, {}, {}, {}", ip, nfDirection,
|
||||
log.debug("----Finish DPTech Start Cleanup Task: {}, {}, {}, {}", ip, nfDirection,
|
||||
CommonEnumHandler.codeOf(DpTechAttackType.class, attackType),
|
||||
ret.getResultInfo().getValue());
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class HaoHanAbilityImpl implements DisposeAbility {
|
|||
return new MulReturnType<>(ErrorCode.ERR_HAOHAN_ERROR, null);
|
||||
}
|
||||
|
||||
log.info("----Finish Haohan Start Cleanup Task: {}", ip);
|
||||
log.debug("----Finish Haohan Start Cleanup Task: {}", ip);
|
||||
return new MulReturnType<>(ErrorCode.ERR_OK, (long) resp.getCleanTaskId());
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ public class HaoHanAbilityImpl implements DisposeAbility {
|
|||
@Nullable NetflowDirection nfDirection,
|
||||
@Nullable Integer attackType,
|
||||
@Nullable Long taskId) {
|
||||
log.info("++++Begging Haohan Stop Cleanup Task: {}", taskId);
|
||||
log.debug("++++Begging Haohan Stop Cleanup Task: {}", taskId);
|
||||
|
||||
if (taskId == null) {
|
||||
return new MulReturnType<>(ErrorCode.ERR_PARAMS, null);
|
||||
|
@ -116,7 +116,7 @@ public class HaoHanAbilityImpl implements DisposeAbility {
|
|||
return new MulReturnType<>(ErrorCode.ERR_HAOHAN_ERROR, null);
|
||||
}
|
||||
|
||||
log.info("----Finish Haohan Stop Cleanup Task: {}", taskId);
|
||||
log.debug("----Finish Haohan Stop Cleanup Task: {}", taskId);
|
||||
return new MulReturnType<>(ErrorCode.ERR_OK, null);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,19 +10,19 @@ public enum DisposeCapacityType implements BaseEnum {
|
|||
/**
|
||||
* The Cleanup.
|
||||
*/
|
||||
CLEANUP(0, "清洗能力"),
|
||||
CLEANUP(1, "清洗能力"),
|
||||
/**
|
||||
* Blackhool dispose capacity type.
|
||||
*/
|
||||
BLACKHOOL(1, "压制能力"),
|
||||
BLACKHOOL(2, "压制能力"),
|
||||
/**
|
||||
* The Hidepend.
|
||||
*/
|
||||
HIDEPEND(2, "高防能力"),
|
||||
HIDEPEND(3, "高防能力"),
|
||||
/**
|
||||
* The Detecive.
|
||||
*/
|
||||
DETECIVE(3, "检测能力"),
|
||||
DETECIVE(4, "检测能力"),
|
||||
;
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package com.dispose.common;
|
||||
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
@ -11,18 +14,23 @@ import java.time.format.DateTimeFormatter;
|
|||
*/
|
||||
public class Helper {
|
||||
|
||||
/**
|
||||
* Gets current datetime.
|
||||
*
|
||||
* @return the current datetime
|
||||
*/
|
||||
public static String getCurrentDatetime() {
|
||||
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets timestamp milli second.
|
||||
* Gets timestamp second.
|
||||
*
|
||||
* @param dateTime the date time
|
||||
* @return the timestamp milli second
|
||||
* @return the timestamp second
|
||||
*/
|
||||
public static int getTimestampSecond(String dateTime) {
|
||||
return (int)(Timestamp.valueOf(dateTime).toInstant().toEpochMilli() / 1000);
|
||||
return (int) (Timestamp.valueOf(dateTime).toInstant().toEpochMilli() / 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -57,8 +65,25 @@ public class Helper {
|
|||
*/
|
||||
public static int getTimestampDiffNow(String begin) {
|
||||
int starTime = getTimestampSecond(begin);
|
||||
int endTm = (int)(System.currentTimeMillis() / 1000);
|
||||
int endTm = (int) (System.currentTimeMillis() / 1000);
|
||||
|
||||
return endTm - starTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ip address normalize string.
|
||||
*
|
||||
* @param ipAddr the ip addr
|
||||
* @return the string
|
||||
*/
|
||||
public static String ipAddressNormalize(String ipAddr) {
|
||||
IPAddressString addrString = new IPAddressString(ipAddr);
|
||||
IPAddress addr = addrString.getAddress();
|
||||
|
||||
if (addr.isIPv6()) {
|
||||
return addr.toFullString().toUpperCase();
|
||||
} else {
|
||||
return addr.toNormalizedString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
package com.dispose.config;
|
||||
|
||||
import com.dispose.interceptor.TokenInterceptor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
|
||||
/**
|
||||
* The type Auth token config.
|
||||
*
|
||||
* @author <huangxin@cmhi.chinamoblie.com>
|
||||
*/
|
||||
@Configuration
|
||||
public class AuthTokenConfig implements WebMvcConfigurer {
|
||||
/**
|
||||
* Init auth interceptor token interceptor.
|
||||
*
|
||||
* @return the token interceptor
|
||||
*/
|
||||
@Bean
|
||||
public TokenInterceptor initAuthInterceptor(){
|
||||
return new TokenInterceptor();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add interceptors.
|
||||
*
|
||||
* @param registry the registry
|
||||
*/
|
||||
@Override
|
||||
public void addInterceptors(InterceptorRegistry registry) {
|
||||
// 注册需要检查token的控制器接口
|
||||
registry.addInterceptor(initAuthInterceptor()).addPathPatterns("/information/**");
|
||||
registry.addInterceptor(initAuthInterceptor()).addPathPatterns("/task/**");
|
||||
}
|
||||
}
|
|
@ -5,13 +5,16 @@ import com.dispose.common.DDoSAttackType;
|
|||
import com.dispose.common.DisposeCapacityType;
|
||||
import com.dispose.common.DisposeConfigValue;
|
||||
import com.dispose.common.ErrorCode;
|
||||
import com.dispose.common.Helper;
|
||||
import com.dispose.common.NetflowDirection;
|
||||
import com.dispose.pojo.dto.protocol.base.BaseIdResp;
|
||||
import com.dispose.pojo.dto.protocol.base.BaseRespStatus;
|
||||
import com.dispose.pojo.dto.protocol.base.IdArraysReq;
|
||||
import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO;
|
||||
import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO;
|
||||
import com.dispose.pojo.dto.protocol.task.TaskStartReq;
|
||||
import com.dispose.pojo.dto.protocol.task.TaskStartResp;
|
||||
import com.dispose.pojo.dto.protocol.task.TaskStopResp;
|
||||
import com.dispose.pojo.entity.DeviceTask;
|
||||
import com.dispose.pojo.entity.DisposeTask;
|
||||
import com.dispose.pojo.po.MulReturnType;
|
||||
import com.dispose.service.DisposeTaskService;
|
||||
|
@ -34,6 +37,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
|||
|
||||
import javax.annotation.Resource;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
|
@ -52,24 +57,31 @@ import java.util.Optional;
|
|||
@Decryption
|
||||
public class DisposeTaskController {
|
||||
|
||||
/**
|
||||
* The Dispose task service.
|
||||
*/
|
||||
@Resource
|
||||
private DisposeTaskService disposeTaskService;
|
||||
|
||||
/**
|
||||
* The User account service.
|
||||
*/
|
||||
@Resource
|
||||
private UserAccountService userAccountService;
|
||||
|
||||
/**
|
||||
* Start task protocol resp dto.
|
||||
*
|
||||
* @param mr the mr
|
||||
* @param mr the mr
|
||||
* @param headers the headers
|
||||
* @return the protocol resp dto
|
||||
*/
|
||||
@PostMapping("/start")
|
||||
@ResponseBody
|
||||
@ApiOperation("启动处置任务")
|
||||
public ProtocolRespDTO<? extends BaseRespStatus> startTask(@Validated(ValidGroups.TaskStartReqValid.class)
|
||||
@RequestBody ProtocolReqDTO<TaskStartReq> mr,
|
||||
@NotNull @RequestHeader HttpHeaders headers) {
|
||||
@RequestBody ProtocolReqDTO<TaskStartReq> mr,
|
||||
@NotNull @RequestHeader HttpHeaders headers) {
|
||||
TaskStartReq req = mr.getMsgContent();
|
||||
|
||||
// 构成处置任务参数
|
||||
|
@ -77,7 +89,7 @@ public class DisposeTaskController {
|
|||
.deviceId(Long.parseLong(Optional.ofNullable(req.getId()).orElse("-1")))
|
||||
.accountId(userAccountService.getUserIdByAuthHead(Objects.requireNonNull(headers.get("Authorization")).get(0)))
|
||||
.disposeCapacity(CommonEnumHandler.codeOf(DisposeCapacityType.class, req.getType()))
|
||||
.disposeIp(req.getDisposeIp())
|
||||
.disposeIp(Helper.ipAddressNormalize(req.getDisposeIp()))
|
||||
.planEndTime(String.valueOf(req.getDisposeTime()))
|
||||
.flowDirection(CommonEnumHandler.codeOf(NetflowDirection.class,
|
||||
Optional.ofNullable(req.getFlowDirection()).orElse(2)))
|
||||
|
@ -98,21 +110,55 @@ public class DisposeTaskController {
|
|||
}
|
||||
|
||||
// 设置返回消息
|
||||
TaskStartResp rspInfo = TaskStartResp.builder()
|
||||
.taskId(ret.getSecondParam().toString())
|
||||
.build();
|
||||
BaseIdResp rspInfo = new BaseIdResp();
|
||||
|
||||
rspInfo.setTaskId(ret.getSecondParam().toString());
|
||||
rspInfo.setStatus(ret.getFirstParam().getCode());
|
||||
rspInfo.setMessage(new String[]{ret.getFirstParam().getMsg()});
|
||||
|
||||
return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop task protocol resp dto.
|
||||
*
|
||||
* @param mr the mr
|
||||
* @return the protocol resp dto
|
||||
*/
|
||||
@PostMapping("/stop")
|
||||
@ResponseBody
|
||||
@ApiOperation("停止处置任务")
|
||||
public ProtocolRespDTO<?> stopTask(@Validated(ValidGroups.TaskStopReqValid.class)
|
||||
@RequestBody ProtocolReqDTO<IdArraysReq> mr) {
|
||||
return ProtocolRespDTO.result(ErrorCode.ERR_OK);
|
||||
@RequestBody ProtocolReqDTO<IdArraysReq> mr) {
|
||||
|
||||
// 记录多个任务停止信息
|
||||
List<TaskStopResp> rspList = new ArrayList<>();
|
||||
|
||||
for (String tId : mr.getMsgContent().getTaskId()) {
|
||||
// 停止处置任务
|
||||
MulReturnType<ErrorCode, DisposeTask> ret = disposeTaskService.stopTask(Long.parseLong(tId));
|
||||
|
||||
TaskStopResp rspInfo = TaskStopResp.builder().build();
|
||||
|
||||
// 停止成功
|
||||
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
|
||||
rspInfo.setDisposeDevice(ret.getSecondParam().getDeviceTask().stream()
|
||||
.map(DeviceTask::getDeviceId)
|
||||
.map(String::valueOf)
|
||||
.toArray(String[]::new));
|
||||
rspInfo.setType(ret.getSecondParam().getDisposeCapacity().getValue());
|
||||
rspInfo.setDisposeIp(ret.getSecondParam().getDisposeIp());
|
||||
rspInfo.setLeftTime(String.valueOf(Math.abs(
|
||||
Helper.getTimestampDiffNow(ret.getSecondParam().getPlanEndTime())) / 60));
|
||||
}
|
||||
|
||||
rspInfo.setTaskId(tId);
|
||||
rspInfo.setStatus(ret.getFirstParam().getCode());
|
||||
rspInfo.setMessage(new String[]{ret.getFirstParam().getMsg()});
|
||||
|
||||
rspList.add(rspInfo);
|
||||
}
|
||||
|
||||
return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspList);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,4 +61,5 @@ public interface DisposeTaskManager {
|
|||
* @return the dispose task by id
|
||||
*/
|
||||
DisposeTask getDisposeTaskById(Long taskId);
|
||||
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ import lombok.NoArgsConstructor;
|
|||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@JsonPropertyOrder({"ipAddr", "devId", "status", "message"})
|
||||
@JsonPropertyOrder({"id", "devId", "taskId", "status", "message"})
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class BaseIdResp extends BaseRespStatus {
|
||||
/**
|
||||
|
@ -27,4 +27,6 @@ public class BaseIdResp extends BaseRespStatus {
|
|||
* The Dev id.
|
||||
*/
|
||||
private String devId;
|
||||
|
||||
private String taskId;
|
||||
}
|
||||
|
|
|
@ -33,8 +33,8 @@ public class TaskStartReq {
|
|||
* The Type.
|
||||
*/
|
||||
@NotNull(message = "type 处置类型不能为空", groups = ValidGroups.TaskStartReqValid.class)
|
||||
@Range(min = 0, max = 3,
|
||||
message = "type 字段取值为 [0, 3]",
|
||||
@Range(min = 1, max = 4,
|
||||
message = "type 字段取值为 [1, 4]",
|
||||
groups = ValidGroups.TaskStartReqValid.class)
|
||||
private Integer type;
|
||||
/**
|
||||
|
|
|
@ -10,7 +10,7 @@ import lombok.EqualsAndHashCode;
|
|||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* The type Task start resp.
|
||||
* The type Task stop resp.
|
||||
*
|
||||
* @author <huangxin@cmhi.chinamoblie.com>
|
||||
*/
|
||||
|
@ -19,11 +19,24 @@ import lombok.NoArgsConstructor;
|
|||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@JsonPropertyOrder({"id", "taskId", "status", "message"})
|
||||
@JsonPropertyOrder({"taskId", "disposeDevice", "type", "disposeIp", "leftTime", "status", "message"})
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class TaskStartResp extends BaseIdResp {
|
||||
public class TaskStopResp extends BaseIdResp {
|
||||
|
||||
/**
|
||||
* The Task id.
|
||||
* The Dispose device.
|
||||
*/
|
||||
private String taskId;
|
||||
private String[] disposeDevice;
|
||||
/**
|
||||
* The Type.
|
||||
*/
|
||||
private Integer type;
|
||||
/**
|
||||
* The Dispose ip.
|
||||
*/
|
||||
private String disposeIp;
|
||||
/**
|
||||
* The Left time.
|
||||
*/
|
||||
private String leftTime;
|
||||
}
|
|
@ -20,18 +20,10 @@ public interface DisposeTaskService {
|
|||
MulReturnType<ErrorCode, Long> createTask(DisposeTask task);
|
||||
|
||||
/**
|
||||
* Start task error code.
|
||||
* Stop task mul return type.
|
||||
*
|
||||
* @param taskId the task id
|
||||
* @return the error code
|
||||
* @return the mul return type
|
||||
*/
|
||||
ErrorCode startTask(Long taskId);
|
||||
|
||||
/**
|
||||
* Stop task error code.
|
||||
*
|
||||
* @param taskId the task id
|
||||
* @return the error code
|
||||
*/
|
||||
ErrorCode stopTask(Long taskId);
|
||||
MulReturnType<ErrorCode, DisposeTask> stopTask(Long taskId);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* The type Device task manager service.
|
||||
|
@ -29,6 +30,9 @@ import javax.annotation.Resource;
|
|||
@Service
|
||||
@Slf4j
|
||||
public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
||||
|
||||
private final ConcurrentHashMap<String, Boolean> taskCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* The Dispose task manager.
|
||||
*/
|
||||
|
@ -135,21 +139,18 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
deviceTaskManager.setAttackTypeStatus(deviceTask.getId(),
|
||||
disposeTask.getFlowDirection(), deviceTask.getTaskAttackType());
|
||||
// 更改处置任务状态为处置中
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
|
||||
DisposeTaskStatus.TASK_STARTED);
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_STARTED);
|
||||
|
||||
// 记录浩瀚设备返回的任务ID
|
||||
deviceTaskManager.setTaskExternId(deviceTask.getId(), ret.getSecondParam());
|
||||
|
||||
log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", deviceTask,
|
||||
ret.getSecondParam());
|
||||
log.info("HAOHAN_PLATFORM setup task succeed: {}, device taskId {}", deviceTask, ret.getSecondParam());
|
||||
|
||||
// 重置错误尝试次数
|
||||
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
|
||||
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES) {
|
||||
// 设置该任务为新任务,待下次重试启动
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(),
|
||||
DisposeTaskStatus.TASK_NEW);
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_NEW);
|
||||
// 记录任务出错重试次数
|
||||
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), deviceTask.getErrRetry() + 1);
|
||||
log.error("HAOHAN_PLATFORM setup task times {} error {}: {}", deviceTask.getErrRetry(),
|
||||
|
@ -177,8 +178,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
deviceTask.getExternId());
|
||||
|
||||
if (ret.getFirstParam() == ErrorCode.ERR_OK) {
|
||||
log.info("HAOHAN_PLATFORM stop task succeed: {}, device taskId {}", deviceTask,
|
||||
ret.getSecondParam());
|
||||
log.info("HAOHAN_PLATFORM stop task succeed: {}, device taskId {}", deviceTask, ret.getSecondParam());
|
||||
// 设置任务状态为结束
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(deviceTask.getId(), DisposeTaskStatus.TASK_FINISHED);
|
||||
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
|
||||
|
@ -225,8 +225,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_OUT,
|
||||
t.getValue());
|
||||
|
||||
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
|
||||
NetflowDirection.DIRECTION_OUT, deviceTask);
|
||||
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, NetflowDirection.DIRECTION_OUT, deviceTask);
|
||||
|
||||
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
|
||||
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
|
||||
|
@ -240,7 +239,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
|
||||
NetflowDirection.DIRECTION_OUT, deviceTask.getErrRetry(), deviceTask);
|
||||
} else {
|
||||
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
|
||||
log.error("DPTECH_UMC setup task {}, {} error {}: {}", t,
|
||||
NetflowDirection.DIRECTION_OUT, ret.getFirstParam(), deviceTask);
|
||||
}
|
||||
}
|
||||
|
@ -263,8 +262,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
deviceTaskManager.attackTypeStatusSetBit(deviceTask.getId(), NetflowDirection.DIRECTION_IN,
|
||||
t.getValue());
|
||||
|
||||
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t,
|
||||
NetflowDirection.DIRECTION_IN, deviceTask);
|
||||
log.info("DPTECH_UMC setup task {}, {} succeed: {}", t, NetflowDirection.DIRECTION_IN, deviceTask);
|
||||
|
||||
deviceTaskManager.setTaskErrRetryTimes(deviceTask.getId(), 0);
|
||||
} else if (deviceTask.getErrRetry() < DisposeConfigValue.CALL_ERROR_RETRY_TIMES * 10) {
|
||||
|
@ -278,7 +276,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
log.error("DPTECH_UMC setup task {}, {} times {} error: {}", t,
|
||||
NetflowDirection.DIRECTION_IN, deviceTask.getErrRetry(), deviceTask);
|
||||
} else {
|
||||
log.info("DPTECH_UMC setup task {}, {} error {}: {}", t,
|
||||
log.error("DPTECH_UMC setup task {}, {} error {}: {}", t,
|
||||
NetflowDirection.DIRECTION_IN, ret.getFirstParam(), deviceTask);
|
||||
}
|
||||
}
|
||||
|
@ -457,6 +455,7 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
public void deviceTaskRunnerSchedule() {
|
||||
// 遍历所有新的设备处置任务
|
||||
for (DeviceTask v : deviceTaskManager.getNewDisposeDeviceTaskInfo()) {
|
||||
|
||||
DisposeTask task = disposeTaskManager.getDisposeTaskById(v.getTaskId());
|
||||
|
||||
// 保护代码,理论上不存在该情况
|
||||
|
@ -465,9 +464,17 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
// 获取设备
|
||||
AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(v.getDeviceId());
|
||||
|
||||
// 启动新任务
|
||||
log.info("Run task {}, {}", task, v);
|
||||
// 对处置IP生成一个唯一的操作标识符
|
||||
String taskStartKey = task.getDisposeIp() + "Start" + ai.getDev().getDeviceType().getDescription();
|
||||
|
||||
if (taskCache.containsKey(taskStartKey)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 设置标志,避免对同一个IP重复调用启动处置命令
|
||||
taskCache.put(taskStartKey, true);
|
||||
|
||||
// 启动新任务
|
||||
switch (ai.getDev().getDeviceType()) {
|
||||
case DPTECH_UMC:
|
||||
dpTechDeviceTaskRun(ai, v, task);
|
||||
|
@ -485,6 +492,9 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
log.error("Unknown dispose device type: {}", ai.getDev());
|
||||
break;
|
||||
}
|
||||
|
||||
// 处置命令调用结束,允许下一次启动命令调用
|
||||
taskCache.remove(taskStartKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,8 +523,17 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
// 获取设备
|
||||
AbilityInfo ai = disposeAbilityRouterService.getAbilityDevice(v.getDeviceId());
|
||||
|
||||
// 对处置IP生成一个唯一的操作标识符
|
||||
String taskStopKey = task.getDisposeIp() + "Stop" + ai.getDev().getDeviceType().getDescription();
|
||||
|
||||
if (taskCache.containsKey(taskStopKey)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 设置标志,避免对同一个IP重复调用停止处置命令
|
||||
taskCache.put(taskStopKey, true);
|
||||
|
||||
deviceTaskManager.changeDisposeDeviceTaskInfoStatus(v.getId(), DisposeTaskStatus.TASK_EXPIRED);
|
||||
log.info("Task expired, Stop: {}", v);
|
||||
|
||||
switch (ai.getDev().getDeviceType()) {
|
||||
case DPTECH_UMC:
|
||||
|
@ -533,6 +552,9 @@ public class DeviceTaskManagerServiceImpl implements DeviceTaskManagerService {
|
|||
log.error("Unknown dispose device type: {}", ai.getDev());
|
||||
break;
|
||||
}
|
||||
|
||||
// 处置命令调用结束,允许下一次停止命令调用
|
||||
taskCache.remove(taskStopKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.dispose.service.impl;
|
||||
|
||||
import com.dispose.common.DisposeTaskStatus;
|
||||
import com.dispose.common.ErrorCode;
|
||||
import com.dispose.manager.DisposeTaskManager;
|
||||
import com.dispose.pojo.entity.DisposeTask;
|
||||
|
@ -16,6 +17,9 @@ import javax.annotation.Resource;
|
|||
*/
|
||||
@Service
|
||||
public class DisposeTaskServiceImpl implements DisposeTaskService {
|
||||
/**
|
||||
* The Dispose task manager.
|
||||
*/
|
||||
@Resource
|
||||
DisposeTaskManager disposeTaskManager;
|
||||
|
||||
|
@ -38,24 +42,23 @@ public class DisposeTaskServiceImpl implements DisposeTaskService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Start task error code.
|
||||
* Stop task mul return type.
|
||||
*
|
||||
* @param taskId the task id
|
||||
* @return the error code
|
||||
* @return the mul return type
|
||||
*/
|
||||
@Override
|
||||
public ErrorCode startTask(Long taskId) {
|
||||
return null;
|
||||
}
|
||||
public MulReturnType<ErrorCode, DisposeTask> stopTask(Long taskId) {
|
||||
DisposeTask task = disposeTaskManager.getDisposeTaskById(taskId);
|
||||
|
||||
/**
|
||||
* Stop task error code.
|
||||
*
|
||||
* @param taskId the task id
|
||||
* @return the error code
|
||||
*/
|
||||
@Override
|
||||
public ErrorCode stopTask(Long taskId) {
|
||||
return null;
|
||||
if (task == null) {
|
||||
return new MulReturnType<>(ErrorCode.ERR_NOSUCHTASK, null);
|
||||
}
|
||||
|
||||
if (!disposeTaskManager.changeDisposeTaskStatus(taskId, DisposeTaskStatus.TASK_CANCELED)) {
|
||||
return new MulReturnType<>(ErrorCode.ERR_DATABASE, null);
|
||||
}
|
||||
|
||||
return new MulReturnType<>(ErrorCode.ERR_OK, task);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.haohan.dispose.protocol;
|
|||
|
||||
import cn.hutool.http.Header;
|
||||
import cn.hutool.http.HttpRequest;
|
||||
import com.dispose.common.ErrorCode;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.haohan.dispose.common.HaoHanGetCleanTaskNetflowInfoReq;
|
||||
|
@ -16,8 +17,12 @@ import com.haohan.dispose.common.HaoHanStopCleanReq;
|
|||
import com.haohan.dispose.common.HaoHanStopCleanResp;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* The type Restful interface.
|
||||
|
@ -41,7 +46,7 @@ public class RestfulInterface {
|
|||
*/
|
||||
private static String postJson(String url, Map<String, String> header, String body) {
|
||||
return HttpRequest.post(url).header(Header.CONTENT_TYPE, "application/json").addHeaders(header).body(body)
|
||||
.execute().body();
|
||||
.execute().body();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,8 +88,8 @@ public class RestfulInterface {
|
|||
*/
|
||||
public HaoHanGetCleanTaskStatusResp getCleanTaskStatus(String baseUrlPath, Integer taskId) {
|
||||
return protocolRun(baseUrlPath + "/getCleanTaskState",
|
||||
HaoHanGetCleanTaskStatusReq.builder().cleanTaskId(taskId).build(),
|
||||
HaoHanGetCleanTaskStatusResp.class);
|
||||
HaoHanGetCleanTaskStatusReq.builder().cleanTaskId(taskId).build(),
|
||||
HaoHanGetCleanTaskStatusResp.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,9 +102,51 @@ public class RestfulInterface {
|
|||
* @return the hao han start clean resp
|
||||
*/
|
||||
public HaoHanStartCleanResp startClean(String baseUrlPath, String ipAddr, int times, String readme) {
|
||||
return protocolRun(baseUrlPath + "/sendTow",
|
||||
new HaoHanStartCleanReq(ipAddr, times, readme),
|
||||
HaoHanStartCleanResp.class);
|
||||
HaoHanStartCleanResp svrResp = protocolRun(baseUrlPath + "/sendTow",
|
||||
new HaoHanStartCleanReq(ipAddr, times, readme),
|
||||
HaoHanStartCleanResp.class);
|
||||
|
||||
final String regexPattern = "\\{(.*?)}";
|
||||
final String usedTag = "ip已存在清洗中";
|
||||
|
||||
if (svrResp == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 判断是否曾经启动过对IP的处置
|
||||
if (!svrResp.getMsg().contains(usedTag)) {
|
||||
return svrResp;
|
||||
}
|
||||
|
||||
try {
|
||||
// 处理返回消息信息,其中包含了已经启动的对同一个IP进行的处置任务
|
||||
Matcher matcher = Pattern.compile(regexPattern).matcher(svrResp.getMsg());
|
||||
List<String> ids = new ArrayList<>();
|
||||
while (matcher.find()) {
|
||||
ids.add(matcher.group(1).substring(matcher.group(1).lastIndexOf("=") + 1)
|
||||
.replaceAll("\\D", ""));
|
||||
}
|
||||
|
||||
// 排序,保留最后一次任务
|
||||
ids.sort(String.CASE_INSENSITIVE_ORDER);
|
||||
|
||||
String maxId = ids.get(ids.size() - 1);
|
||||
ids.remove(ids.size() - 1);
|
||||
|
||||
// 自动停止多余的旧任务
|
||||
ids.forEach(v -> {
|
||||
log.debug("HAOHAN_PLATFORM automatic stop more task: {}", v);
|
||||
stopClean(baseUrlPath, Integer.parseInt(maxId), readme);
|
||||
});
|
||||
|
||||
// 更改返回值
|
||||
svrResp.setCleanTaskId(Integer.parseInt(maxId));
|
||||
svrResp.setState(ErrorCode.ERR_OK.getCode());
|
||||
} catch (Exception ignored) {
|
||||
|
||||
}
|
||||
|
||||
return svrResp;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -112,8 +159,8 @@ public class RestfulInterface {
|
|||
*/
|
||||
public HaoHanStopCleanResp stopClean(String baseUrlPath, Integer taskId, String readme) {
|
||||
return protocolRun(baseUrlPath + "/delTow",
|
||||
new HaoHanStopCleanReq(taskId, readme),
|
||||
HaoHanStopCleanResp.class);
|
||||
new HaoHanStopCleanReq(taskId, readme),
|
||||
HaoHanStopCleanResp.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,8 +172,8 @@ public class RestfulInterface {
|
|||
*/
|
||||
public HaoHanGetCleaningNetflowInfoResp getCleaningNetflow(String baseUrlPath, String readme) {
|
||||
return protocolRun(baseUrlPath + "/allIpFlow",
|
||||
new HaoHanGetCleaningNetflowInfoReq(readme),
|
||||
HaoHanGetCleaningNetflowInfoResp.class);
|
||||
new HaoHanGetCleaningNetflowInfoReq(readme),
|
||||
HaoHanGetCleaningNetflowInfoResp.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -138,8 +185,8 @@ public class RestfulInterface {
|
|||
*/
|
||||
public HaoHanGetCleanTaskNetflowInfoResp getCleanTaskNetflow(String baseUrlPath, Integer taskId) {
|
||||
return protocolRun(baseUrlPath + "/cleanTaskFlow",
|
||||
new HaoHanGetCleanTaskNetflowInfoReq(taskId),
|
||||
HaoHanGetCleanTaskNetflowInfoResp.class
|
||||
new HaoHanGetCleanTaskNetflowInfoReq(taskId),
|
||||
HaoHanGetCleanTaskNetflowInfoResp.class
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue