diff --git a/config/application-dispose.properties b/config/application-dispose.properties
index 10f23ceb..26d9b602 100644
--- a/config/application-dispose.properties
+++ b/config/application-dispose.properties
@@ -6,32 +6,27 @@ dispose.split_char=,
dispose.request-timeout-second=5
# 是否开启隐私保护
dispose.used-privacy-protect=false
-
dispose.call-error-retry-times=3
# 分页配置项
# 最大每页数据条数
dispose.max-split-page-size=100
# 最小每页数据条数
dispose.min-split-page-size=10
-
# 迪普设备配置
# 发送超时时间(s)
dptech.soap-conn-timeout-second=60
# 接收超时时间(s)
dptech.soap-recv-timeout-second=60
-
# 用户权限配置
# 是否对设备管理进行用户验证
permission.admin-check=true
# 运行管理设备的操作员用户名
permission.admin-users=admin
-
# 认证配置
# 是否对接口访问进行认证
auth.verify-request-token=true
# token访问超时时间
auth.token-timeout-minute=30
-
# 安全配置
#加密类型: 0 不加密
# 1 Base64编码
@@ -41,7 +36,6 @@ crypto.security-protocol-type=0
crypto.aes-key=hkoUV5ZWh0q1jSxMnpjovVn19Qg99HY6DD40
# 3DES秘钥
crypto.des-key=P3mq9iSIvQcvfyfdWR8sAnfAadO
-
# Kafka 服务器配置
#重试次数
kafka.producer.retries=3
@@ -53,12 +47,11 @@ kafka.producer.linger=1
kafka.producer.buffer.memory=33554432
kafka.producer.servers=172.21.44.189:9092,172.21.44.9:9092,172.21.44.244:9092,172.21.44.236:9092,172.21.44.80:9092
kafka.dispose.topic=ddos-vip-customer-ck
-kafka.consumer.group-id = testGroup
-kafka.consumer.auto-offset-reset = earliest
-kafka.consumer.enable-auto-commit = true
-kafka.consumer.auto-commit-interval = 100
-kafka.consumer.max-poll-records =5
-
+#redis config
+spring.redis.database=0
+spring.redis.host=172.21.48.211
+spring.redis.port=6379
+spring.redis.password=!Q2w3e4r
#信任主机配置
# 白名单开关
trust.auth-white-list-check=true
diff --git a/pom.xml b/pom.xml
index 626c9ace..d78177dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,6 +225,10 @@
dom4j-core
1.4-dev-8
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
diff --git a/src/main/java/com/dispose/common/Constants.java b/src/main/java/com/dispose/common/Constants.java
new file mode 100644
index 00000000..29c7272f
--- /dev/null
+++ b/src/main/java/com/dispose/common/Constants.java
@@ -0,0 +1,93 @@
+package com.dispose.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The emos constants.
+ */
+public class Constants {
+ /**
+ * DDoS具体攻击类型
+ */
+ public static Map ATTACK_TYPE;
+
+ static {
+ Map attTypeMap = new HashMap<>();
+ attTypeMap.put("hosttotaltraffic", "Host total traffic|1|2");
+
+ attTypeMap.put("RSTFlood", "RST Flood|3|4");
+ attTypeMap.put("SYNFlood", "SYN Flood|5|6");
+ attTypeMap.put("ACKFlood", "ACK Flood|7|8");
+ attTypeMap.put("TCPnullFlood", "TCP null|9|10");
+ attTypeMap.put("SYNACKAmplification", "SYN/ACK Amplification|11|12");
+ attTypeMap.put("tcpmisuse", "TCP Misuse|13|14");
+ attTypeMap.put("FINFlood", "FIN Flood|15|16");
+ attTypeMap.put("TCPFragment", "TCP Fragment|17|18");
+ attTypeMap.put("HTTPFlood", "HTTP Flood|19|20");
+ attTypeMap.put("HTTPSFlood", "HTTPS Flood|21|22");
+
+ attTypeMap.put("SIPFlood", "SIP Flood|23|24");
+ attTypeMap.put("DNS", "DNS|25|26");
+
+ attTypeMap.put("UDPFragment", "UDP Fragment|27|28");
+ attTypeMap.put("chargenAmplification", "chargen Amplification|29|30");
+ attTypeMap.put("L2TPAmplification", "L2TP Amplification|31|32");
+ attTypeMap.put("mDNSAmplification", "mDNS Amplification|33|34");
+ attTypeMap.put("MSSQLRSAmplification", "MS SQL RS Amplification|35|36");
+ attTypeMap.put("NetBIOSAmplification", "NetBIOS Amplification|37|38");
+ attTypeMap.put("NTPAmplification", "NTP Amplification|39|40");
+ attTypeMap.put("RIPv1Amplification", "RIPv1 Amplification|41|42");
+ attTypeMap.put("rpcbindAmplification", "rpcbind Amplification|43|44");
+ attTypeMap.put("SNMPAmplification", "SNMP Amplification|45|46");
+ attTypeMap.put("SSDPAmplification", "SSDP Amplification|47|48");
+ attTypeMap.put("DNSAmplification", "DNS Amplification|49|50");
+ attTypeMap.put("QOTDAmplification", "QOTD Amplification|51|52");
+ attTypeMap.put("Quake3Amplification", "Quake3 Amplification|53|54");
+ attTypeMap.put("SteamAmplification", "Steam Amplification|55|56");
+ attTypeMap.put("CLADPAmplification", "CLADP Amplification|57|58");
+ attTypeMap.put("MemcacheAmplification", "Memcache Amplification|59|60");
+ attTypeMap.put("UDPFlood", "UDP Flood|61|62");
+
+ attTypeMap.put("smurf", "Smurf|63|64");
+ attTypeMap.put("icmpfrgment", "ICMP Fragment|65|66");
+ attTypeMap.put("ICMPFlood", "ICMP Flood|67|68");
+
+ attTypeMap.put("IPv4Protocol0", "IPv4 Protocol 0|69|70");
+ attTypeMap.put("IPPrivate", "IP Private|71|72");
+ attTypeMap.put("landflood", "Land flood|73|74");
+ attTypeMap.put("IGMPFlood", "IGMP Flood|75|76");
+ ATTACK_TYPE = Collections.unmodifiableMap(attTypeMap);
+ }
+
+ /**
+ * 派单eoms模板
+ */
+ public static final String DISPATCH_TEMPLATE = "\nMsgSerial:{0}\nSDN:{1}\nNeName:{2}\nEquipmentClass:99236\n" +
+ "AlarmUniqueId:{3}\nAlarmUniqueClearId:{4}\nLocateNeName:{5}\nLocateNeType:99236\nLocateNeSDN:{6}\nLocateInfo:{7}\n" +
+ "EventTime:{8}\nCancelTime:{9}\nVendorAlarmType:{10}\nVendorSeverity:{11}\nVendorAlarmId:{12}\nAlarmTitle:{13}\n" +
+ "ProbableCauseTxt:{14}\nRLocateSDN: \nRLocateNeName: \nRLocateNeType: \nRate: \nAlarmLocation: \nAlarmCheck: \n" +
+ "HolderType: \nAlarmStatus:{15}\nCorrelateAlarmFlag: \nAlarmActCount: \nNeIp:\nEmsId: \nVendor:99083\nAlarmText:{16}\n" +
+ "NeAlias: \nVersion: \nRemoteNe: \nAlarmProvince:{17}\nAlarmRegion:{18}\nAlarmCounty: \nSite: \nSiteType: \nSiteProperty: \n" +
+ "MachineroomIDofZGTT: \nBusinessSystem:{19}\nCircuitNo: \nMac: \nSpecialty:9\nNetworkType:903\nNeSubType: \nEffectCircuitNum: \n" +
+ "CircuitLevel: \nAlarmSeverity:3\nNmsAlarmId:0903-083-056-10-900001\nStandardAlarmName:{21}\nAlarmLogicClass:{22}\n" +
+ "AlarmLogicSubClass:{23}\nEffectOnEquipment:5\nEffectOnBusiness:4\nNmsAlarmType:1\nSendGroupFlag: \nStandardFlag:2\n" +
+ "AlarmExplanation:{24}\nBusinessType: \nBusinessInfo:{25}\nIsRelatedRemote: \nLocateNeStatus:1300\nProjectNo: \n" +
+ "ProjectName: \nProjectStartTime: \nProjectEndTime: \nGroupCustomer: \nCustomerLevel: \nServiceType: \nServiceLevel: \n" +
+ "ServiceName: \nServiceCrossDomainType: \nInterruptCircuitState: \nCircuitLocateInfo: \nHomeClientNum: \nHomeCellNum: \n" +
+ "LinkOnuNum: \n";
+
+ /* *
+ 0:网元自动清除 --收到从采集源发送的清除告警
+ */
+ public static final int NE_AUTO_CLEARED_STATUS = 0;
+ /* *
+ 1:活动告警---告警当前为活动状态
+ */
+ public static final int ACTIVE_ALARM_STATUS = 1;
+ /* *
+ 2:同步清除---已采集活动告警,但采集平台从告警源同步时,发现已经没有对应的活动告警,由采集平台产生的清除告警
+ */
+ public static final int SYNCHRONIZATION_CLEAR_STATUS = 2;
+}
diff --git a/src/main/java/com/dispose/common/ErrorCode.java b/src/main/java/com/dispose/common/ErrorCode.java
index 467c6dfb..7e2efb3a 100644
--- a/src/main/java/com/dispose/common/ErrorCode.java
+++ b/src/main/java/com/dispose/common/ErrorCode.java
@@ -293,6 +293,10 @@ public enum ErrorCode {
* The Err huawei firewall error.
*/
ERR_HUAWEIFIREWALL_ERROR(305, "华为防火墙返回错误"),
+ /**
+ * The Err emos create message error.
+ */
+ EMOS_CREATEMESSAGE_ERROR(306, "EMOS发送信息错误"),
;
/**
diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java
index 231a188a..9434b975 100644
--- a/src/main/java/com/dispose/controller/kafkaController.java
+++ b/src/main/java/com/dispose/controller/kafkaController.java
@@ -1,21 +1,22 @@
package com.dispose.controller;
+import com.alibaba.fastjson.JSONObject;
+import com.dispose.common.Constants;
import com.dispose.common.ErrorCode;
import com.dispose.config.KafkaConfiguration;
import com.dispose.pojo.dto.protocol.base.BaseRespStatus;
import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO;
import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO;
-import com.dispose.pojo.dto.protocol.kafka.AlarmInfo;
+import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo;
import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq;
import com.dispose.security.annotation.Decryption;
import com.dispose.security.annotation.Encryption;
import com.dispose.validation.group.ValidGroups;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
@@ -27,6 +28,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
+import java.text.MessageFormat;
import java.util.Objects;
/**
@@ -43,18 +45,15 @@ import java.util.Objects;
@Encryption
@Decryption
public class kafkaController {
- /**
- * The Object mapper.
- */
- @Resource
- private ObjectMapper objectMapper;
-
/**
* The Kafka configuration.
*/
@Resource
private KafkaConfiguration kafkaConfiguration;
+ @Resource
+ private StringRedisTemplate stringRedisTemplate;
+
/**
* Dispatch command sent to kafka.
*
@@ -66,28 +65,150 @@ public class kafkaController {
@ApiOperation("发送消息")
public ProtocolRespDTO dispatchCommand(
@Validated(ValidGroups.ProtocolCommonValid.class)
- @RequestBody ProtocolReqDTO mr) throws JsonProcessingException {
- //获取入参信息,进行所需数据格式拼接
- log.info("alarm message information :{}", mr.getMsgContent().getAlarmInfo());
- AlarmInfo alarmInfo = objectMapper.readValue(mr.getMsgContent().getAlarmInfo(), new TypeReference() {
- });
+ @RequestBody ProtocolReqDTO mr) {
+ //获取入参信息
+ BaseRespStatus rspInfo = new BaseRespStatus();
+ log.info("emos alarm is:{}", mr.getMsgContent().getAlarmInfo());
+ EmosAlarmInfo alarmInfo = JSONObject.parseObject(mr.getMsgContent().getAlarmInfo(), EmosAlarmInfo.class);
+ //拼接emos据格式
+ String content = createSendContent(alarmInfo);
+ if (content == null) {
+ rspInfo.setStatus(ErrorCode.EMOS_CREATEMESSAGE_ERROR.getCode());
+ rspInfo.setMessage(new String[]{ErrorCode.EMOS_CREATEMESSAGE_ERROR.getMsg()});
+ return ProtocolRespDTO.result(ErrorCode.EMOS_CREATEMESSAGE_ERROR, rspInfo);
+ }
//推动数据格式到kafka
- log.info("send alarm message :{}", alarmInfo.toString());
- String sendMessage = mr.getMsgContent().getAlarmInfo();
+ log.info("send alarm :{}", content);
ListenableFuture> sendResult = kafkaConfiguration
.kafkaTemplate()
- .sendDefault(0, System.currentTimeMillis(), "dispose", sendMessage);
+ .sendDefault(0, System.currentTimeMillis(), "dispose", content);
- sendResult.addCallback(v -> log.info("Kafka send {} to {} at {}", sendMessage,
+ sendResult.addCallback(v -> log.info("Kafka send {} to {} at {}", content,
Objects.requireNonNull(v)
.getRecordMetadata()
.topic(), v.getRecordMetadata().partition()),
ex -> log.error("Kafka send error: {}", ex.getMessage()));
- BaseRespStatus rspInfo = new BaseRespStatus();
+ //保存数据格式到数据库
rspInfo.setStatus(ErrorCode.ERR_OK.getCode());
rspInfo.setMessage(new String[]{ErrorCode.ERR_OK.getMsg()});
return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo);
}
+
+ /**
+ * @param alarmInfo emos告警信息
+ * @return 发送的消息
+ */
+ private String createSendContent(EmosAlarmInfo alarmInfo) {
+ try {
+ //编号从1开始,以实时消息发布通道为单位进行编号。如果编号超过最大正整数(2^32-1),重新从1开始编号。
+ Long increment = stringRedisTemplate.opsForValue().increment("SEND_KAFKA_INCREMENT", 1);
+ String dstIp = alarmInfo.getDstIp();
+ String alarmId = alarmInfo.getAlarmId();
+ //LocateInfo: 192.168.11.6 ddos attack alarm,Memcache , 2019-08-01 12:31, 2.1Gbps, 1.01Mpps
+ String locateInfo = getAlarmEvent(alarmInfo);
+
+ String eventTime = alarmInfo.getStartTime();
+ String cancelTime = alarmInfo.getEndTime();
+
+ //告警类型vendorAlarmType、告警级别vendorSeverity、告警号vendorAlarmId、告警标题AlarmTitle、告警可能原因ProbableCauseTxt
+ String type = Constants.ATTACK_TYPE.get(alarmInfo.getAttackType());
+ String vendorSeverity = changeCharacatorCode("一级");
+ String vendorAlarmType;
+ String vendorAlarmId;
+ if (type != null) {
+ String[] arr = type.split("\\|");
+ vendorAlarmType = arr[0];
+ if (alarmInfo.getBpspps().compareTo("bps") == 0) {
+ vendorAlarmId = arr[1];
+ } else {
+ vendorAlarmId = arr[2];
+ }
+ } else {
+ vendorAlarmType = alarmInfo.getAttackType();
+ vendorAlarmId = "0";
+ log.info("get unKnow attack type:" + vendorAlarmType);
+ }
+ String alarmStatus = String.valueOf(Constants.ACTIVE_ALARM_STATUS);
+ String alarmText = changeCharacatorCode(getAlarmText(alarmInfo));
+ String alarmExplanation = changeCharacatorCode(getAlarmExplanation(alarmInfo));
+
+ String content = MessageFormat.format(Constants.DISPATCH_TEMPLATE, increment, dstIp,
+ dstIp, alarmId, alarmId, dstIp, dstIp, locateInfo, eventTime, cancelTime,
+ vendorAlarmType, vendorSeverity, vendorAlarmId, changeCharacatorCode("重保攻击事件告警"),
+ changeCharacatorCode("DDos攻击事件"), alarmStatus, alarmText,
+ changeCharacatorCode(alarmInfo.getDstProvince()), changeCharacatorCode(alarmInfo.getDstCity()),
+ changeCharacatorCode("网络部集中抗D系统"), changeCharacatorCode("DDOS攻击事件告警"),
+ changeCharacatorCode("安全告警"), changeCharacatorCode("DDOS告警"),
+ alarmExplanation, changeCharacatorCode("集中抗D"));
+ return content;
+
+ } catch (Exception e) {
+ log.error("createSendContent告警消息异常,详细信息:{}", ExceptionUtils.getStackTrace(e));
+ return null;
+ }
+ }
+
+ private String getAlarmEvent(EmosAlarmInfo a) {
+ return a.getDstIp() + " ddos attack alarm, " + a.getAttackType() + ", " + a.getStartTime() +
+ ", " + a.getMaxBps() + ", " + a.getMaxPps();
+ }
+
+ private String changeCharacatorCode(String con) {
+ try {
+ return new String(con.getBytes("GBK"), "GBK");
+ } catch (Exception e) {
+ log.info("change failed:" + e.getMessage());
+ return con;
+ }
+ }
+
+ private String getAreaDes(String province, String city) {
+ if (province.compareTo("北京") == 0) {
+ return "北京市";
+ }
+ if (province.compareTo("上海") == 0) {
+ return "上海市";
+ }
+ if (province.compareTo("天津") == 0) {
+ return "天津市";
+ }
+ if (province.compareTo("重庆") == 0) {
+ return "重庆市";
+ }
+ return province + "省" + city + "市";
+ }
+
+ private String getAlarmText(EmosAlarmInfo a) {
+ String area = getAreaDes(a.getDstProvince(), a.getDstCity());
+ String op = "";
+ if (a.getDisposeType() == 1) {
+ op = "清洗";
+ } else if (a.getDisposeType() == 2) {
+ op = "流控";
+ } else if (a.getDisposeType() == 3) {
+ op = "黑洞";
+ }
+ return "攻击目的IP:" + a.getDstIp() + "," + area + "," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟";
+ }
+
+ private String getAlarmExplanation(EmosAlarmInfo a) {
+ String op = "";
+ if (a.getDisposeType() == 1) {
+ op = "清洗";
+ } else if (a.getDisposeType() == 2) {
+ op = "流控";
+ } else if (a.getDisposeType() == 3) {
+ op = "黑洞";
+ }
+ String srcIp = "";
+ for (String ip : a.getSrcIpLs()) {
+ srcIp = srcIp + ip + ",";
+ }
+ if (!srcIp.isEmpty()) {
+ srcIp = srcIp.substring(0, srcIp.length() - 1);
+ }
+ return "攻击目的IP:" + a.getDstIp() + "," + "攻击源地址:(" + srcIp + ")," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟";
+ }
}
diff --git a/src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java b/src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java
similarity index 98%
rename from src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java
rename to src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java
index 8bb85b0e..bc2f88c9 100644
--- a/src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java
+++ b/src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java
@@ -16,7 +16,7 @@ import java.util.List;
@JsonPropertyOrder({"alarmId", "dstIp", "attackType", "bpspps", "dstProvince", "dstCity", "srcIpLs", "startTime",
"endTime", "disposeType", "disposeTime", "maxBps", "maxPps"})
@JsonInclude(JsonInclude.Include.NON_NULL)
-public class AlarmInfo {
+public class EmosAlarmInfo {
/**
* 告警id.
*/
diff --git a/src/main/java/com/dispose/setup/SpringBootBeanUtil.java b/src/main/java/com/dispose/setup/SpringBootBeanUtil.java
deleted file mode 100644
index f34424a4..00000000
--- a/src/main/java/com/dispose/setup/SpringBootBeanUtil.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.dispose.setup;
-
-import lombok.extern.slf4j.Slf4j;
-import org.jetbrains.annotations.NotNull;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
-
-/**
- * The type Spring boot bean util.
- */
-@Component
-@Slf4j
-public class SpringBootBeanUtil implements ApplicationContextAware {
- /**
- * The constant applicationContext.
- */
- private static ApplicationContext applicationContext;
-
- /**
- * Sets application context.
- *
- * @param applicationContext the application context
- * @throws BeansException the beans exception
- */
- @Override
- public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
- if (SpringBootBeanUtil.applicationContext == null) {
- SpringBootBeanUtil.applicationContext = applicationContext;
- }
-
- log.debug("========ApplicationContext配置成功========");
- log.debug("========在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象========");
- log.debug("========applicationContext=" + SpringBootBeanUtil.applicationContext + "========");
- }
-
- /**
- * Gets application context.
- *
- * @return the application context
- */
- public static ApplicationContext getApplicationContext() {
- return applicationContext;
- }
-
- /**
- * Gets bean.
- *
- * @param name the name
- * @return the bean
- */
- public static Object getBean(String name) {
- return getApplicationContext().getBean(name);
- }
-
- /**
- * Gets bean.
- *
- * @param the type parameter
- * @param clazz the clazz
- * @return the bean
- */
- public static T getBean(Class clazz) {
- return getApplicationContext().getBean(clazz);
- }
-
- /**
- * Gets bean.
- *
- * @param the type parameter
- * @param name the name
- * @param clazz the clazz
- * @return the bean
- */
- public static T getBean(String name, Class clazz) {
- return getApplicationContext().getBean(name, clazz);
- }
-}
diff --git a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java
index 566c5600..83a52d64 100644
--- a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java
+++ b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java
@@ -83,7 +83,7 @@ public class KafkaControllerTest extends InitTestEnvironment {
// 1清洗,2流控,3黑洞
disposeParam.set("disposeType", 1);
disposeParam.set("disposeTime", 30);
- disposeParam.set("endTime", new Date());
+ disposeParam.set("endTime", sdf.format(new Date()));
List srcIp = new ArrayList<>();
srcIp.add("192.168.10.1");
srcIp.add("192.168.10.2");