From 17655252ff769519187365387faef9439eba15f0 Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Tue, 7 Sep 2021 15:38:22 +0800 Subject: [PATCH 1/5] =?UTF-8?q?OCT=20REM:=201.=20=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=86=97=E4=BD=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dispose/controller/kafkaController.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java index 082154a5..231a188a 100644 --- a/src/main/java/com/dispose/controller/kafkaController.java +++ b/src/main/java/com/dispose/controller/kafkaController.java @@ -52,8 +52,6 @@ public class kafkaController { /** * The Kafka configuration. */ -// final private KafkaConfiguration kafkaConfiguration = -// SpringBootBeanUtil.getBean(com.dispose.config.KafkaConfiguration.class); @Resource private KafkaConfiguration kafkaConfiguration; @@ -92,17 +90,4 @@ public class kafkaController { rspInfo.setMessage(new String[]{ErrorCode.ERR_OK.getMsg()}); return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo); } - - -// @KafkaListener(topics = {"ddos-vip-customer-ck"}) -// public void kafkaListen(ConsumerRecord consumerRecord) { -// //判断消息是否为null -// Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); -// log.info(">>>>>>>>>>> record = " + kafkaMessage); -// if (kafkaMessage.isPresent()) { -// String consumerMsg = kafkaMessage.get(); -// log.info("消费消息:" + consumerMsg); -// } -// } - } From 5c492a6c0bc8f15857d52b7ed5bfe7afdcd1b4ea Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Thu, 9 Sep 2021 17:43:42 +0800 Subject: [PATCH 2/5] =?UTF-8?q?OCT=20REM:=201.=20kafka=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=95=B0=E6=8D=AE=E6=A0=BC=E5=BC=8F=E6=8B=BC?= =?UTF-8?q?=E6=8E=A5=202.=20kafka=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=8F=91=E9=80=81=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/application-dispose.properties | 17 +- pom.xml | 4 + .../java/com/dispose/common/Constants.java | 93 ++++++++++ .../java/com/dispose/common/ErrorCode.java | 4 + .../dispose/controller/kafkaController.java | 161 +++++++++++++++--- .../{AlarmInfo.java => EmosAlarmInfo.java} | 2 +- .../com/dispose/setup/SpringBootBeanUtil.java | 79 --------- .../dev/controller/KafkaControllerTest.java | 2 +- 8 files changed, 249 insertions(+), 113 deletions(-) create mode 100644 src/main/java/com/dispose/common/Constants.java rename src/main/java/com/dispose/pojo/dto/protocol/kafka/{AlarmInfo.java => EmosAlarmInfo.java} (98%) delete mode 100644 src/main/java/com/dispose/setup/SpringBootBeanUtil.java diff --git a/config/application-dispose.properties b/config/application-dispose.properties index 10f23ceb..26d9b602 100644 --- a/config/application-dispose.properties +++ b/config/application-dispose.properties @@ -6,32 +6,27 @@ dispose.split_char=, dispose.request-timeout-second=5 # 是否开启隐私保护 dispose.used-privacy-protect=false - dispose.call-error-retry-times=3 # 分页配置项 # 最大每页数据条数 dispose.max-split-page-size=100 # 最小每页数据条数 dispose.min-split-page-size=10 - # 迪普设备配置 # 发送超时时间(s) dptech.soap-conn-timeout-second=60 # 接收超时时间(s) dptech.soap-recv-timeout-second=60 - # 用户权限配置 # 是否对设备管理进行用户验证 permission.admin-check=true # 运行管理设备的操作员用户名 permission.admin-users=admin - # 认证配置 # 是否对接口访问进行认证 auth.verify-request-token=true # token访问超时时间 auth.token-timeout-minute=30 - # 安全配置 #加密类型: 0 不加密 # 1 Base64编码 @@ -41,7 +36,6 @@ crypto.security-protocol-type=0 crypto.aes-key=hkoUV5ZWh0q1jSxMnpjovVn19Qg99HY6DD40 # 3DES秘钥 crypto.des-key=P3mq9iSIvQcvfyfdWR8sAnfAadO - # Kafka 服务器配置 #重试次数 kafka.producer.retries=3 @@ -53,12 +47,11 @@ kafka.producer.linger=1 kafka.producer.buffer.memory=33554432 kafka.producer.servers=172.21.44.189:9092,172.21.44.9:9092,172.21.44.244:9092,172.21.44.236:9092,172.21.44.80:9092 kafka.dispose.topic=ddos-vip-customer-ck -kafka.consumer.group-id = testGroup -kafka.consumer.auto-offset-reset = earliest -kafka.consumer.enable-auto-commit = true -kafka.consumer.auto-commit-interval = 100 -kafka.consumer.max-poll-records =5 - +#redis config +spring.redis.database=0 +spring.redis.host=172.21.48.211 +spring.redis.port=6379 +spring.redis.password=!Q2w3e4r #信任主机配置 # 白名单开关 trust.auth-white-list-check=true diff --git a/pom.xml b/pom.xml index 626c9ace..d78177dd 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,10 @@ dom4j-core 1.4-dev-8 + + org.springframework.boot + spring-boot-starter-data-redis + diff --git a/src/main/java/com/dispose/common/Constants.java b/src/main/java/com/dispose/common/Constants.java new file mode 100644 index 00000000..29c7272f --- /dev/null +++ b/src/main/java/com/dispose/common/Constants.java @@ -0,0 +1,93 @@ +package com.dispose.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * The emos constants. + */ +public class Constants { + /** + * DDoS具体攻击类型 + */ + public static Map ATTACK_TYPE; + + static { + Map attTypeMap = new HashMap<>(); + attTypeMap.put("hosttotaltraffic", "Host total traffic|1|2"); + + attTypeMap.put("RSTFlood", "RST Flood|3|4"); + attTypeMap.put("SYNFlood", "SYN Flood|5|6"); + attTypeMap.put("ACKFlood", "ACK Flood|7|8"); + attTypeMap.put("TCPnullFlood", "TCP null|9|10"); + attTypeMap.put("SYNACKAmplification", "SYN/ACK Amplification|11|12"); + attTypeMap.put("tcpmisuse", "TCP Misuse|13|14"); + attTypeMap.put("FINFlood", "FIN Flood|15|16"); + attTypeMap.put("TCPFragment", "TCP Fragment|17|18"); + attTypeMap.put("HTTPFlood", "HTTP Flood|19|20"); + attTypeMap.put("HTTPSFlood", "HTTPS Flood|21|22"); + + attTypeMap.put("SIPFlood", "SIP Flood|23|24"); + attTypeMap.put("DNS", "DNS|25|26"); + + attTypeMap.put("UDPFragment", "UDP Fragment|27|28"); + attTypeMap.put("chargenAmplification", "chargen Amplification|29|30"); + attTypeMap.put("L2TPAmplification", "L2TP Amplification|31|32"); + attTypeMap.put("mDNSAmplification", "mDNS Amplification|33|34"); + attTypeMap.put("MSSQLRSAmplification", "MS SQL RS Amplification|35|36"); + attTypeMap.put("NetBIOSAmplification", "NetBIOS Amplification|37|38"); + attTypeMap.put("NTPAmplification", "NTP Amplification|39|40"); + attTypeMap.put("RIPv1Amplification", "RIPv1 Amplification|41|42"); + attTypeMap.put("rpcbindAmplification", "rpcbind Amplification|43|44"); + attTypeMap.put("SNMPAmplification", "SNMP Amplification|45|46"); + attTypeMap.put("SSDPAmplification", "SSDP Amplification|47|48"); + attTypeMap.put("DNSAmplification", "DNS Amplification|49|50"); + attTypeMap.put("QOTDAmplification", "QOTD Amplification|51|52"); + attTypeMap.put("Quake3Amplification", "Quake3 Amplification|53|54"); + attTypeMap.put("SteamAmplification", "Steam Amplification|55|56"); + attTypeMap.put("CLADPAmplification", "CLADP Amplification|57|58"); + attTypeMap.put("MemcacheAmplification", "Memcache Amplification|59|60"); + attTypeMap.put("UDPFlood", "UDP Flood|61|62"); + + attTypeMap.put("smurf", "Smurf|63|64"); + attTypeMap.put("icmpfrgment", "ICMP Fragment|65|66"); + attTypeMap.put("ICMPFlood", "ICMP Flood|67|68"); + + attTypeMap.put("IPv4Protocol0", "IPv4 Protocol 0|69|70"); + attTypeMap.put("IPPrivate", "IP Private|71|72"); + attTypeMap.put("landflood", "Land flood|73|74"); + attTypeMap.put("IGMPFlood", "IGMP Flood|75|76"); + ATTACK_TYPE = Collections.unmodifiableMap(attTypeMap); + } + + /** + * 派单eoms模板 + */ + public static final String DISPATCH_TEMPLATE = "\nMsgSerial:{0}\nSDN:{1}\nNeName:{2}\nEquipmentClass:99236\n" + + "AlarmUniqueId:{3}\nAlarmUniqueClearId:{4}\nLocateNeName:{5}\nLocateNeType:99236\nLocateNeSDN:{6}\nLocateInfo:{7}\n" + + "EventTime:{8}\nCancelTime:{9}\nVendorAlarmType:{10}\nVendorSeverity:{11}\nVendorAlarmId:{12}\nAlarmTitle:{13}\n" + + "ProbableCauseTxt:{14}\nRLocateSDN: \nRLocateNeName: \nRLocateNeType: \nRate: \nAlarmLocation: \nAlarmCheck: \n" + + "HolderType: \nAlarmStatus:{15}\nCorrelateAlarmFlag: \nAlarmActCount: \nNeIp:\nEmsId: \nVendor:99083\nAlarmText:{16}\n" + + "NeAlias: \nVersion: \nRemoteNe: \nAlarmProvince:{17}\nAlarmRegion:{18}\nAlarmCounty: \nSite: \nSiteType: \nSiteProperty: \n" + + "MachineroomIDofZGTT: \nBusinessSystem:{19}\nCircuitNo: \nMac: \nSpecialty:9\nNetworkType:903\nNeSubType: \nEffectCircuitNum: \n" + + "CircuitLevel: \nAlarmSeverity:3\nNmsAlarmId:0903-083-056-10-900001\nStandardAlarmName:{21}\nAlarmLogicClass:{22}\n" + + "AlarmLogicSubClass:{23}\nEffectOnEquipment:5\nEffectOnBusiness:4\nNmsAlarmType:1\nSendGroupFlag: \nStandardFlag:2\n" + + "AlarmExplanation:{24}\nBusinessType: \nBusinessInfo:{25}\nIsRelatedRemote: \nLocateNeStatus:1300\nProjectNo: \n" + + "ProjectName: \nProjectStartTime: \nProjectEndTime: \nGroupCustomer: \nCustomerLevel: \nServiceType: \nServiceLevel: \n" + + "ServiceName: \nServiceCrossDomainType: \nInterruptCircuitState: \nCircuitLocateInfo: \nHomeClientNum: \nHomeCellNum: \n" + + "LinkOnuNum: \n"; + + /* * + 0:网元自动清除 --收到从采集源发送的清除告警 + */ + public static final int NE_AUTO_CLEARED_STATUS = 0; + /* * + 1:活动告警---告警当前为活动状态 + */ + public static final int ACTIVE_ALARM_STATUS = 1; + /* * + 2:同步清除---已采集活动告警,但采集平台从告警源同步时,发现已经没有对应的活动告警,由采集平台产生的清除告警 + */ + public static final int SYNCHRONIZATION_CLEAR_STATUS = 2; +} diff --git a/src/main/java/com/dispose/common/ErrorCode.java b/src/main/java/com/dispose/common/ErrorCode.java index 467c6dfb..7e2efb3a 100644 --- a/src/main/java/com/dispose/common/ErrorCode.java +++ b/src/main/java/com/dispose/common/ErrorCode.java @@ -293,6 +293,10 @@ public enum ErrorCode { * The Err huawei firewall error. */ ERR_HUAWEIFIREWALL_ERROR(305, "华为防火墙返回错误"), + /** + * The Err emos create message error. + */ + EMOS_CREATEMESSAGE_ERROR(306, "EMOS发送信息错误"), ; /** diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java index 231a188a..9434b975 100644 --- a/src/main/java/com/dispose/controller/kafkaController.java +++ b/src/main/java/com/dispose/controller/kafkaController.java @@ -1,21 +1,22 @@ package com.dispose.controller; +import com.alibaba.fastjson.JSONObject; +import com.dispose.common.Constants; import com.dispose.common.ErrorCode; import com.dispose.config.KafkaConfiguration; import com.dispose.pojo.dto.protocol.base.BaseRespStatus; import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO; import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO; -import com.dispose.pojo.dto.protocol.kafka.AlarmInfo; +import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo; import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq; import com.dispose.security.annotation.Decryption; import com.dispose.security.annotation.Encryption; import com.dispose.validation.group.ValidGroups; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.stereotype.Controller; @@ -27,6 +28,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; +import java.text.MessageFormat; import java.util.Objects; /** @@ -43,18 +45,15 @@ import java.util.Objects; @Encryption @Decryption public class kafkaController { - /** - * The Object mapper. - */ - @Resource - private ObjectMapper objectMapper; - /** * The Kafka configuration. */ @Resource private KafkaConfiguration kafkaConfiguration; + @Resource + private StringRedisTemplate stringRedisTemplate; + /** * Dispatch command sent to kafka. * @@ -66,28 +65,150 @@ public class kafkaController { @ApiOperation("发送消息") public ProtocolRespDTO dispatchCommand( @Validated(ValidGroups.ProtocolCommonValid.class) - @RequestBody ProtocolReqDTO mr) throws JsonProcessingException { - //获取入参信息,进行所需数据格式拼接 - log.info("alarm message information :{}", mr.getMsgContent().getAlarmInfo()); - AlarmInfo alarmInfo = objectMapper.readValue(mr.getMsgContent().getAlarmInfo(), new TypeReference() { - }); + @RequestBody ProtocolReqDTO mr) { + //获取入参信息 + BaseRespStatus rspInfo = new BaseRespStatus(); + log.info("emos alarm is:{}", mr.getMsgContent().getAlarmInfo()); + EmosAlarmInfo alarmInfo = JSONObject.parseObject(mr.getMsgContent().getAlarmInfo(), EmosAlarmInfo.class); + //拼接emos据格式 + String content = createSendContent(alarmInfo); + if (content == null) { + rspInfo.setStatus(ErrorCode.EMOS_CREATEMESSAGE_ERROR.getCode()); + rspInfo.setMessage(new String[]{ErrorCode.EMOS_CREATEMESSAGE_ERROR.getMsg()}); + return ProtocolRespDTO.result(ErrorCode.EMOS_CREATEMESSAGE_ERROR, rspInfo); + } //推动数据格式到kafka - log.info("send alarm message :{}", alarmInfo.toString()); - String sendMessage = mr.getMsgContent().getAlarmInfo(); + log.info("send alarm :{}", content); ListenableFuture> sendResult = kafkaConfiguration .kafkaTemplate() - .sendDefault(0, System.currentTimeMillis(), "dispose", sendMessage); + .sendDefault(0, System.currentTimeMillis(), "dispose", content); - sendResult.addCallback(v -> log.info("Kafka send {} to {} at {}", sendMessage, + sendResult.addCallback(v -> log.info("Kafka send {} to {} at {}", content, Objects.requireNonNull(v) .getRecordMetadata() .topic(), v.getRecordMetadata().partition()), ex -> log.error("Kafka send error: {}", ex.getMessage())); - BaseRespStatus rspInfo = new BaseRespStatus(); + //保存数据格式到数据库 rspInfo.setStatus(ErrorCode.ERR_OK.getCode()); rspInfo.setMessage(new String[]{ErrorCode.ERR_OK.getMsg()}); return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo); } + + /** + * @param alarmInfo emos告警信息 + * @return 发送的消息 + */ + private String createSendContent(EmosAlarmInfo alarmInfo) { + try { + //编号从1开始,以实时消息发布通道为单位进行编号。如果编号超过最大正整数(2^32-1),重新从1开始编号。 + Long increment = stringRedisTemplate.opsForValue().increment("SEND_KAFKA_INCREMENT", 1); + String dstIp = alarmInfo.getDstIp(); + String alarmId = alarmInfo.getAlarmId(); + //LocateInfo: 192.168.11.6 ddos attack alarm,Memcache , 2019-08-01 12:31, 2.1Gbps, 1.01Mpps + String locateInfo = getAlarmEvent(alarmInfo); + + String eventTime = alarmInfo.getStartTime(); + String cancelTime = alarmInfo.getEndTime(); + + //告警类型vendorAlarmType、告警级别vendorSeverity、告警号vendorAlarmId、告警标题AlarmTitle、告警可能原因ProbableCauseTxt + String type = Constants.ATTACK_TYPE.get(alarmInfo.getAttackType()); + String vendorSeverity = changeCharacatorCode("一级"); + String vendorAlarmType; + String vendorAlarmId; + if (type != null) { + String[] arr = type.split("\\|"); + vendorAlarmType = arr[0]; + if (alarmInfo.getBpspps().compareTo("bps") == 0) { + vendorAlarmId = arr[1]; + } else { + vendorAlarmId = arr[2]; + } + } else { + vendorAlarmType = alarmInfo.getAttackType(); + vendorAlarmId = "0"; + log.info("get unKnow attack type:" + vendorAlarmType); + } + String alarmStatus = String.valueOf(Constants.ACTIVE_ALARM_STATUS); + String alarmText = changeCharacatorCode(getAlarmText(alarmInfo)); + String alarmExplanation = changeCharacatorCode(getAlarmExplanation(alarmInfo)); + + String content = MessageFormat.format(Constants.DISPATCH_TEMPLATE, increment, dstIp, + dstIp, alarmId, alarmId, dstIp, dstIp, locateInfo, eventTime, cancelTime, + vendorAlarmType, vendorSeverity, vendorAlarmId, changeCharacatorCode("重保攻击事件告警"), + changeCharacatorCode("DDos攻击事件"), alarmStatus, alarmText, + changeCharacatorCode(alarmInfo.getDstProvince()), changeCharacatorCode(alarmInfo.getDstCity()), + changeCharacatorCode("网络部集中抗D系统"), changeCharacatorCode("DDOS攻击事件告警"), + changeCharacatorCode("安全告警"), changeCharacatorCode("DDOS告警"), + alarmExplanation, changeCharacatorCode("集中抗D")); + return content; + + } catch (Exception e) { + log.error("createSendContent告警消息异常,详细信息:{}", ExceptionUtils.getStackTrace(e)); + return null; + } + } + + private String getAlarmEvent(EmosAlarmInfo a) { + return a.getDstIp() + " ddos attack alarm, " + a.getAttackType() + ", " + a.getStartTime() + + ", " + a.getMaxBps() + ", " + a.getMaxPps(); + } + + private String changeCharacatorCode(String con) { + try { + return new String(con.getBytes("GBK"), "GBK"); + } catch (Exception e) { + log.info("change failed:" + e.getMessage()); + return con; + } + } + + private String getAreaDes(String province, String city) { + if (province.compareTo("北京") == 0) { + return "北京市"; + } + if (province.compareTo("上海") == 0) { + return "上海市"; + } + if (province.compareTo("天津") == 0) { + return "天津市"; + } + if (province.compareTo("重庆") == 0) { + return "重庆市"; + } + return province + "省" + city + "市"; + } + + private String getAlarmText(EmosAlarmInfo a) { + String area = getAreaDes(a.getDstProvince(), a.getDstCity()); + String op = ""; + if (a.getDisposeType() == 1) { + op = "清洗"; + } else if (a.getDisposeType() == 2) { + op = "流控"; + } else if (a.getDisposeType() == 3) { + op = "黑洞"; + } + return "攻击目的IP:" + a.getDstIp() + "," + area + "," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟"; + } + + private String getAlarmExplanation(EmosAlarmInfo a) { + String op = ""; + if (a.getDisposeType() == 1) { + op = "清洗"; + } else if (a.getDisposeType() == 2) { + op = "流控"; + } else if (a.getDisposeType() == 3) { + op = "黑洞"; + } + String srcIp = ""; + for (String ip : a.getSrcIpLs()) { + srcIp = srcIp + ip + ","; + } + if (!srcIp.isEmpty()) { + srcIp = srcIp.substring(0, srcIp.length() - 1); + } + return "攻击目的IP:" + a.getDstIp() + "," + "攻击源地址:(" + srcIp + ")," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟"; + } } diff --git a/src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java b/src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java similarity index 98% rename from src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java rename to src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java index 8bb85b0e..bc2f88c9 100644 --- a/src/main/java/com/dispose/pojo/dto/protocol/kafka/AlarmInfo.java +++ b/src/main/java/com/dispose/pojo/dto/protocol/kafka/EmosAlarmInfo.java @@ -16,7 +16,7 @@ import java.util.List; @JsonPropertyOrder({"alarmId", "dstIp", "attackType", "bpspps", "dstProvince", "dstCity", "srcIpLs", "startTime", "endTime", "disposeType", "disposeTime", "maxBps", "maxPps"}) @JsonInclude(JsonInclude.Include.NON_NULL) -public class AlarmInfo { +public class EmosAlarmInfo { /** * 告警id. */ diff --git a/src/main/java/com/dispose/setup/SpringBootBeanUtil.java b/src/main/java/com/dispose/setup/SpringBootBeanUtil.java deleted file mode 100644 index f34424a4..00000000 --- a/src/main/java/com/dispose/setup/SpringBootBeanUtil.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.dispose.setup; - -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.stereotype.Component; - -/** - * The type Spring boot bean util. - */ -@Component -@Slf4j -public class SpringBootBeanUtil implements ApplicationContextAware { - /** - * The constant applicationContext. - */ - private static ApplicationContext applicationContext; - - /** - * Sets application context. - * - * @param applicationContext the application context - * @throws BeansException the beans exception - */ - @Override - public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException { - if (SpringBootBeanUtil.applicationContext == null) { - SpringBootBeanUtil.applicationContext = applicationContext; - } - - log.debug("========ApplicationContext配置成功========"); - log.debug("========在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象========"); - log.debug("========applicationContext=" + SpringBootBeanUtil.applicationContext + "========"); - } - - /** - * Gets application context. - * - * @return the application context - */ - public static ApplicationContext getApplicationContext() { - return applicationContext; - } - - /** - * Gets bean. - * - * @param name the name - * @return the bean - */ - public static Object getBean(String name) { - return getApplicationContext().getBean(name); - } - - /** - * Gets bean. - * - * @param the type parameter - * @param clazz the clazz - * @return the bean - */ - public static T getBean(Class clazz) { - return getApplicationContext().getBean(clazz); - } - - /** - * Gets bean. - * - * @param the type parameter - * @param name the name - * @param clazz the clazz - * @return the bean - */ - public static T getBean(String name, Class clazz) { - return getApplicationContext().getBean(name, clazz); - } -} diff --git a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java index 566c5600..83a52d64 100644 --- a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java +++ b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java @@ -83,7 +83,7 @@ public class KafkaControllerTest extends InitTestEnvironment { // 1清洗,2流控,3黑洞 disposeParam.set("disposeType", 1); disposeParam.set("disposeTime", 30); - disposeParam.set("endTime", new Date()); + disposeParam.set("endTime", sdf.format(new Date())); List srcIp = new ArrayList<>(); srcIp.add("192.168.10.1"); srcIp.add("192.168.10.2"); From 39531233a266ffbf142c64d6048219a652b9176c Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Mon, 13 Sep 2021 17:35:51 +0800 Subject: [PATCH 3/5] =?UTF-8?q?OCT=20REM:=201.=20=E6=96=B0=E5=A2=9EMsgSeri?= =?UTF-8?q?al=E6=95=B0=E6=8D=AE=E5=BA=93=202.=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=AD=98=E5=82=A8MsgSerial=E5=AD=97=E6=AE=B5=E5=92=8C=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=9C=80=E5=A4=A7MsgSerial=E5=80=BC=E6=96=B9=E6=B3=95?= =?UTF-8?q?=203.=20=E4=BF=AE=E6=94=B9MsgSerial=E5=AD=97=E6=AE=B5=E9=80=92?= =?UTF-8?q?=E5=A2=9E=E6=96=B9=E5=BC=8F=EF=BC=88=E4=B8=8D=E9=87=87=E7=94=A8?= =?UTF-8?q?redis=E8=87=AA=E5=A2=9E=E9=95=BF=E6=96=B9=E5=BC=8F=EF=BC=89=204?= =?UTF-8?q?.=20=E6=95=B0=E6=8D=AE=E5=BA=93=E6=93=8D=E4=BD=9C=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/application-dispose.properties | 5 - config/application-test.properties | 17 ++- pom.xml | 4 - .../dispose/controller/kafkaController.java | 30 +++- .../com/dispose/manager/MsgSerialManager.java | 19 +++ .../manager/impl/MsgSerialManagerImpl.java | 36 +++++ .../com/dispose/mapper/MsgSerialMapper.java | 21 +++ .../com/dispose/pojo/entity/MsgSerial.java | 48 +++++++ .../com/dispose/service/MsgSerialService.java | 20 +++ .../service/impl/MsgSerialServiceImpl.java | 34 +++++ src/main/resources/db/data.sql | 8 ++ src/main/resources/db/schema.sql | 133 ++++++++++-------- src/main/resources/mappers/MsgSerial.xml | 23 +++ .../test/dev/mapper/MsgSerialMapperTest.java | 60 ++++++++ 14 files changed, 373 insertions(+), 85 deletions(-) create mode 100644 src/main/java/com/dispose/manager/MsgSerialManager.java create mode 100644 src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java create mode 100644 src/main/java/com/dispose/mapper/MsgSerialMapper.java create mode 100644 src/main/java/com/dispose/pojo/entity/MsgSerial.java create mode 100644 src/main/java/com/dispose/service/MsgSerialService.java create mode 100644 src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java create mode 100644 src/main/resources/mappers/MsgSerial.xml create mode 100644 src/test/java/com/dispose/test/dev/mapper/MsgSerialMapperTest.java diff --git a/config/application-dispose.properties b/config/application-dispose.properties index 26d9b602..4753ff1d 100644 --- a/config/application-dispose.properties +++ b/config/application-dispose.properties @@ -47,11 +47,6 @@ kafka.producer.linger=1 kafka.producer.buffer.memory=33554432 kafka.producer.servers=172.21.44.189:9092,172.21.44.9:9092,172.21.44.244:9092,172.21.44.236:9092,172.21.44.80:9092 kafka.dispose.topic=ddos-vip-customer-ck -#redis config -spring.redis.database=0 -spring.redis.host=172.21.48.211 -spring.redis.port=6379 -spring.redis.password=!Q2w3e4r #信任主机配置 # 白名单开关 trust.auth-white-list-check=true diff --git a/config/application-test.properties b/config/application-test.properties index 6b114a84..19a8dd44 100644 --- a/config/application-test.properties +++ b/config/application-test.properties @@ -4,17 +4,16 @@ server.tomcat.basedir=./basedir # 多个项目放在nginx下同个端口,通过该配置区分 server.servlet.context-path=/dispose # 配置数据源 -#spring.datasource.url=jdbc:mysql://10.88.77.65:33061/ci_dispose_v2?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\ -# =convertToNull&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true -#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver -#spring.datasource.username=root -#spring.datasource.password=h0K0_8u -spring.datasource.url=jdbc:mysql://172.21.48.75:3306/ci_dispose_v1?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\ - =convertToNull&useUnicode=true +spring.datasource.url=jdbc:mysql://10.88.77.65:33061/ci_dispose_v2?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\ + =convertToNull&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.username=root -spring.datasource.password=BCcf6Dd7&8 - +spring.datasource.password=h0K0_8u +#spring.datasource.url=jdbc:mysql://172.21.48.75:3306/ci_dispose_v1?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\ +# =convertToNull&useUnicode=true +#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver +#spring.datasource.username=root +#spring.datasource.password=BCcf6Dd7&8 # 配置连接池 spring.datasource.schema=classpath:test_db/unit_test.sql spring.datasource.initialization-mode=always diff --git a/pom.xml b/pom.xml index d78177dd..626c9ace 100644 --- a/pom.xml +++ b/pom.xml @@ -225,10 +225,6 @@ dom4j-core 1.4-dev-8 - - org.springframework.boot - spring-boot-starter-data-redis - diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java index 9434b975..7923c019 100644 --- a/src/main/java/com/dispose/controller/kafkaController.java +++ b/src/main/java/com/dispose/controller/kafkaController.java @@ -9,14 +9,15 @@ import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO; import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO; import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo; import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq; +import com.dispose.pojo.entity.MsgSerial; import com.dispose.security.annotation.Decryption; import com.dispose.security.annotation.Encryption; +import com.dispose.service.MsgSerialService; import com.dispose.validation.group.ValidGroups; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.springframework.data.redis.core.StringRedisTemplate; +import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.stereotype.Controller; @@ -51,8 +52,11 @@ public class kafkaController { @Resource private KafkaConfiguration kafkaConfiguration; + /** + * The message serial service. + */ @Resource - private StringRedisTemplate stringRedisTemplate; + private MsgSerialService msgSerialService; /** * Dispatch command sent to kafka. @@ -78,8 +82,11 @@ public class kafkaController { rspInfo.setMessage(new String[]{ErrorCode.EMOS_CREATEMESSAGE_ERROR.getMsg()}); return ProtocolRespDTO.result(ErrorCode.EMOS_CREATEMESSAGE_ERROR, rspInfo); } - //推动数据格式到kafka + + //保存数据格式到数据库 log.info("send alarm :{}", content); + + //推动数据格式到kafka ListenableFuture> sendResult = kafkaConfiguration .kafkaTemplate() .sendDefault(0, System.currentTimeMillis(), "dispose", content); @@ -90,7 +97,6 @@ public class kafkaController { .topic(), v.getRecordMetadata().partition()), ex -> log.error("Kafka send error: {}", ex.getMessage())); - //保存数据格式到数据库 rspInfo.setStatus(ErrorCode.ERR_OK.getCode()); rspInfo.setMessage(new String[]{ErrorCode.ERR_OK.getMsg()}); return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo); @@ -102,13 +108,23 @@ public class kafkaController { */ private String createSendContent(EmosAlarmInfo alarmInfo) { try { + //告警序号的最大值2^32-1 + long indexEnd = 4294967296L; + //告警序号的最大值 + long indexStart = 1L; //编号从1开始,以实时消息发布通道为单位进行编号。如果编号超过最大正整数(2^32-1),重新从1开始编号。 - Long increment = stringRedisTemplate.opsForValue().increment("SEND_KAFKA_INCREMENT", 1); + long increment = msgSerialService.getMaxMessageSerial(); + increment = increment + 1; + if (increment > (indexEnd - 1)) { + increment = indexStart; + } + MsgSerial msgSerial = MsgSerial.builder().msgSerial(increment).build(); + msgSerialService.addMessageSerial(msgSerial); + String dstIp = alarmInfo.getDstIp(); String alarmId = alarmInfo.getAlarmId(); //LocateInfo: 192.168.11.6 ddos attack alarm,Memcache , 2019-08-01 12:31, 2.1Gbps, 1.01Mpps String locateInfo = getAlarmEvent(alarmInfo); - String eventTime = alarmInfo.getStartTime(); String cancelTime = alarmInfo.getEndTime(); diff --git a/src/main/java/com/dispose/manager/MsgSerialManager.java b/src/main/java/com/dispose/manager/MsgSerialManager.java new file mode 100644 index 00000000..b279c41b --- /dev/null +++ b/src/main/java/com/dispose/manager/MsgSerialManager.java @@ -0,0 +1,19 @@ +package com.dispose.manager; + +import com.dispose.pojo.entity.MsgSerial; + +public interface MsgSerialManager { + /** + * Add user business error code. + * + * @param msgSerial the message serial + */ + void addMsgSerialNumber(MsgSerial msgSerial); + + /** + * get new max message serial. + * + * @return the long + */ + Long getMaxMsgSerial(); +} diff --git a/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java new file mode 100644 index 00000000..1e4a787a --- /dev/null +++ b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java @@ -0,0 +1,36 @@ +package com.dispose.manager.impl; + +import com.dispose.manager.MsgSerialManager; +import com.dispose.mapper.MsgSerialMapper; +import com.dispose.pojo.entity.MsgSerial; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +@Slf4j +public class MsgSerialManagerImpl implements MsgSerialManager { + /** + * The message serial mapper. + */ + @Resource + private MsgSerialMapper msgSerialMapper; + + + @Override + public void addMsgSerialNumber(MsgSerial msgSerial) { + msgSerialMapper.addMsgSerial(msgSerial); + } + + /** + * get new max message serial. + * + * @return the long + */ + @Override + public Long getMaxMsgSerial() { + return msgSerialMapper.getMaxMsgSerial(); + } +} + diff --git a/src/main/java/com/dispose/mapper/MsgSerialMapper.java b/src/main/java/com/dispose/mapper/MsgSerialMapper.java new file mode 100644 index 00000000..6dd62148 --- /dev/null +++ b/src/main/java/com/dispose/mapper/MsgSerialMapper.java @@ -0,0 +1,21 @@ +package com.dispose.mapper; + +import com.dispose.pojo.entity.MsgSerial; + +public interface MsgSerialMapper { + + /** + * Add new task int. + * + * @param msgSerial the msgSerial number + * @return the int + */ + int addMsgSerial(MsgSerial msgSerial); + + /** + * get new max message serial. + * + * @return the long + */ + long getMaxMsgSerial(); +} diff --git a/src/main/java/com/dispose/pojo/entity/MsgSerial.java b/src/main/java/com/dispose/pojo/entity/MsgSerial.java new file mode 100644 index 00000000..4f89bfa3 --- /dev/null +++ b/src/main/java/com/dispose/pojo/entity/MsgSerial.java @@ -0,0 +1,48 @@ +package com.dispose.pojo.entity; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import tk.mybatis.mapper.annotation.KeySql; +import tk.mybatis.mapper.annotation.NameStyle; +import tk.mybatis.mapper.code.Style; + +import javax.persistence.Id; +import javax.persistence.Table; +import java.io.Serializable; + +/** + * The type Dispose capacity. + * + * @author + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +@Table(name = "msg_serial") +@NameStyle(Style.normal) +public class MsgSerial implements Serializable { + + /** + * The constant serialVersionUID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Id. + */ + @Id + @KeySql(useGeneratedKeys = true) + private Long id; + + /** + * The Device id. + */ + private Long msgSerial; + + +} diff --git a/src/main/java/com/dispose/service/MsgSerialService.java b/src/main/java/com/dispose/service/MsgSerialService.java new file mode 100644 index 00000000..f36dee2b --- /dev/null +++ b/src/main/java/com/dispose/service/MsgSerialService.java @@ -0,0 +1,20 @@ +package com.dispose.service; + +import com.dispose.pojo.entity.MsgSerial; + +public interface MsgSerialService { + /** + * add message serial. + * + * @param msgSerial the message serial + */ + void addMessageSerial(MsgSerial msgSerial); + + /** + * get new max message serial. + * + * @return the long + */ + Long getMaxMessageSerial(); + +} diff --git a/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java b/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java new file mode 100644 index 00000000..b3c19d2b --- /dev/null +++ b/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java @@ -0,0 +1,34 @@ +package com.dispose.service.impl; + +import com.dispose.manager.MsgSerialManager; +import com.dispose.pojo.entity.MsgSerial; +import com.dispose.service.MsgSerialService; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class MsgSerialServiceImpl implements MsgSerialService { + @Resource + private MsgSerialManager msgSerialManager; + + /** + * add message serial. + * + * @param msgSerial the message serial + */ + @Override + public void addMessageSerial(MsgSerial msgSerial) { + msgSerialManager.addMsgSerialNumber(msgSerial); + } + + /** + * get new max message serial. + * + * @return the long + */ + @Override + public Long getMaxMessageSerial() { + return msgSerialManager.getMaxMsgSerial(); + } +} diff --git a/src/main/resources/db/data.sql b/src/main/resources/db/data.sql index fb23a65a..2ec968d8 100644 --- a/src/main/resources/db/data.sql +++ b/src/main/resources/db/data.sql @@ -9,3 +9,11 @@ INSERT INTO `user_account` VALUES (1, 'admin', 'c3855e6b6bb120450f160ba91134522868f89d36062f2061ebeefd80817e1d58', '2020-11-13 09:25:19', '', '2021-01-20 10:18:56', '1db9ddc47de514eb16b7ec07d7f7f96f7a714ae00e1209755bab30d543a0a2c3', '2021-01-20 10:20:57', '1970-01-02 00:00:00', 0, 0); + +-- ---------------------------- +-- Records of msg_serial +-- ---------------------------- +INSERT INTO `msg_serial` VALUES ('1', '1'); + + + diff --git a/src/main/resources/db/schema.sql b/src/main/resources/db/schema.sql index 6f40efdb..587e8229 100644 --- a/src/main/resources/db/schema.sql +++ b/src/main/resources/db/schema.sql @@ -15,7 +15,8 @@ */ SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +SET +FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for device_task @@ -23,23 +24,23 @@ SET FOREIGN_KEY_CHECKS = 0; DROP TABLE IF EXISTS `device_task`; CREATE TABLE `device_task` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '任务信息唯一标识符', - `taskId` bigint(11) NOT NULL COMMENT '处置任务唯一标识符', - `deviceId` bigint(11) NOT NULL COMMENT '处置设备唯一标识符', - `beginTime` timestamp(0) NULL DEFAULT NULL COMMENT '任务开始时间', - `endTime` timestamp(0) NULL DEFAULT NULL COMMENT '任务结束时间', - `taskAttackType` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '需要处置的攻击类型', - `execAttackTypeIn` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '已经执行处置的攻击类型(Input)', - `attackTypeStatusIn` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行的攻击类型状态(Input)', - `execAttackTypeOut` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '已经执行处置的攻击类型(Output)', - `attackTypeStatusOut` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行的攻击类型状态(Input)', - `externId` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '扩展任务ID', - `errRetry` int(11) UNSIGNED NOT NULL DEFAULT 0 COMMENT '调用失败重试次数', - `status` int(11) NOT NULL DEFAULT 0 COMMENT '任务状态', - `devStatus` int(11) NOT NULL DEFAULT 0 COMMENT '设备任务状态', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '任务信息唯一标识符', + `taskId` bigint(11) NOT NULL COMMENT '处置任务唯一标识符', + `deviceId` bigint(11) NOT NULL COMMENT '处置设备唯一标识符', + `beginTime` timestamp(0) NULL DEFAULT NULL COMMENT '任务开始时间', + `endTime` timestamp(0) NULL DEFAULT NULL COMMENT '任务结束时间', + `taskAttackType` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '需要处置的攻击类型', + `execAttackTypeIn` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '已经执行处置的攻击类型(Input)', + `attackTypeStatusIn` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行的攻击类型状态(Input)', + `execAttackTypeOut` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '已经执行处置的攻击类型(Output)', + `attackTypeStatusOut` bigint(255) UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行的攻击类型状态(Input)', + `externId` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '扩展任务ID', + `errRetry` int(11) UNSIGNED NOT NULL DEFAULT 0 COMMENT '调用失败重试次数', + `status` int(11) NOT NULL DEFAULT 0 COMMENT '任务状态', + `devStatus` int(11) NOT NULL DEFAULT 0 COMMENT '设备任务状态', PRIMARY KEY (`id`) USING BTREE, - INDEX `task_info_ibfk_1` (`taskId`) USING BTREE, - INDEX `task_info_ibfk_2` (`deviceId`) USING BTREE, + INDEX `task_info_ibfk_1` (`taskId`) USING BTREE, + INDEX `task_info_ibfk_2` (`deviceId`) USING BTREE, CONSTRAINT `device_task_ibfk_1` FOREIGN KEY (`taskId`) REFERENCES `dispose_task` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION, CONSTRAINT `device_task_ibfk_2` FOREIGN KEY (`deviceId`) REFERENCES `dispose_device` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION ) ENGINE = InnoDB @@ -54,15 +55,15 @@ CREATE TABLE `device_task` DROP TABLE IF EXISTS `dispose_capacity`; CREATE TABLE `dispose_capacity` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '设备处置能力唯一标识符', - `deviceId` bigint(11) NOT NULL COMMENT '设备ID', - `capacityType` int(11) NOT NULL COMMENT '处置能力:\r\n0:清洗\r\n1:高防 \r\n2:路由黑洞 \r\n3:检测\r\n4:WAF封堵\r\n', - `objectType` int(11) NOT NULL COMMENT '处置对象类型:\r\n0:IP\r\n1:域名 \r\n2:URL', - `ipType` int(11) NOT NULL DEFAULT 3 COMMENT 'IP 地址类型:\r\n1 << 0:支持IPV4\r\n1 << 1:支持IPV6', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '设备处置能力唯一标识符', + `deviceId` bigint(11) NOT NULL COMMENT '设备ID', + `capacityType` int(11) NOT NULL COMMENT '处置能力:\r\n0:清洗\r\n1:高防 \r\n2:路由黑洞 \r\n3:检测\r\n4:WAF封堵\r\n', + `objectType` int(11) NOT NULL COMMENT '处置对象类型:\r\n0:IP\r\n1:域名 \r\n2:URL', + `ipType` int(11) NOT NULL DEFAULT 3 COMMENT 'IP 地址类型:\r\n1 << 0:支持IPV4\r\n1 << 1:支持IPV6', `protectIp` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '该处置能力能够处置的IP', - `reserveNetflow` int(11) NULL DEFAULT NULL COMMENT '清洗能力储备流量值,单位(G)', + `reserveNetflow` int(11) NULL DEFAULT NULL COMMENT '清洗能力储备流量值,单位(G)', PRIMARY KEY (`id`) USING BTREE, - INDEX `id` (`deviceId`) USING BTREE, + INDEX `id` (`deviceId`) USING BTREE, CONSTRAINT `dispose_capacity_ibfk_1` FOREIGN KEY (`deviceId`) REFERENCES `dispose_device` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION ) ENGINE = InnoDB AUTO_INCREMENT = 1 @@ -76,23 +77,23 @@ CREATE TABLE `dispose_capacity` DROP TABLE IF EXISTS `dispose_device`; CREATE TABLE `dispose_device` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '设备唯一标识符', - `ipAddr` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '设备IP地址, IPv4/IPv6', - `ipPort` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '端口', - `deviceType` int(11) NOT NULL DEFAULT 0 COMMENT '能力节点类型,0:迪普UMC平台,1:浩瀚设备', - `areaCode` int(11) NOT NULL COMMENT '设备物理位置区域码', - `deviceName` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '设备名称', - `manufacturer` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '生产厂商', - `model` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '型号', - `version` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '软件版本', - `userName` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '系统管理用户名', - `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '系统管理密码', - `urlPath` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'URL路径', - `urlType` int(11) NOT NULL DEFAULT 0 COMMENT 'URL类型: 0: HTTP, 1: HTTPS', - `readme` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '备注', - `status` int(11) NOT NULL DEFAULT 0 COMMENT '状态,0:正常,1:锁定,2:禁用, 3:删除', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '设备唯一标识符', + `ipAddr` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '设备IP地址, IPv4/IPv6', + `ipPort` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '端口', + `deviceType` int(11) NOT NULL DEFAULT 0 COMMENT '能力节点类型,0:迪普UMC平台,1:浩瀚设备', + `areaCode` int(11) NOT NULL COMMENT '设备物理位置区域码', + `deviceName` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '设备名称', + `manufacturer` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '生产厂商', + `model` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '型号', + `version` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '软件版本', + `userName` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '系统管理用户名', + `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '系统管理密码', + `urlPath` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT 'URL路径', + `urlType` int(11) NOT NULL DEFAULT 0 COMMENT 'URL类型: 0: HTTP, 1: HTTPS', + `readme` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '备注', + `status` int(11) NOT NULL DEFAULT 0 COMMENT '状态,0:正常,1:锁定,2:禁用, 3:删除', PRIMARY KEY (`id`) USING BTREE, - INDEX `ipAddr` (`ipAddr`) USING BTREE + INDEX `ipAddr` (`ipAddr`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 @@ -105,23 +106,23 @@ CREATE TABLE `dispose_device` DROP TABLE IF EXISTS `dispose_task`; CREATE TABLE `dispose_task` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '处置任务唯一标识符', - `deviceId` bigint(11) NOT NULL COMMENT '处置设备唯一标识符', - `accountId` bigint(11) NOT NULL COMMENT '用户唯一标识符', - `areaCode` int(11) NOT NULL DEFAULT -1 COMMENT '设备物理位置区域码', - `disposeCapacity` int(11) NOT NULL COMMENT '处置能力类型:0:清洗 1:高防 2:黑洞 3:检测', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '处置任务唯一标识符', + `deviceId` bigint(11) NOT NULL COMMENT '处置设备唯一标识符', + `accountId` bigint(11) NOT NULL COMMENT '用户唯一标识符', + `areaCode` int(11) NOT NULL DEFAULT -1 COMMENT '设备物理位置区域码', + `disposeCapacity` int(11) NOT NULL COMMENT '处置能力类型:0:清洗 1:高防 2:黑洞 3:检测', `disposeObject` varchar(4096) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '处置对象', - `objectType` int(8) NOT NULL COMMENT '处置对象类型:\r\n0:IP\r\n1:域名 \r\n2:URL', + `objectType` int(8) NOT NULL COMMENT '处置对象类型:\r\n0:IP\r\n1:域名 \r\n2:URL', `createTime` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '开始时间', `planEndTime` timestamp(0) NOT NULL DEFAULT '1970-01-02 00:00:00' COMMENT '计划结束时间', - `endTime` timestamp(0) NULL DEFAULT '1970-01-02 00:00:00' COMMENT '实际结束时间', - `flowDirection` int(11) NOT NULL DEFAULT 2 COMMENT '流量方向, 0:流入;1:流出;2:双向', - `attackType` bigint(20) NOT NULL DEFAULT 0 COMMENT '攻击类型,默认0, 全部攻击', - `flowBandWidth` int(11) UNSIGNED NULL DEFAULT 1024 COMMENT '攻击流量占用带宽(MB)', - `currentStatus` int(11) NOT NULL DEFAULT 0 COMMENT '状态,0:停止。1:启动', + `endTime` timestamp(0) NULL DEFAULT '1970-01-02 00:00:00' COMMENT '实际结束时间', + `flowDirection` int(11) NOT NULL DEFAULT 2 COMMENT '流量方向, 0:流入;1:流出;2:双向', + `attackType` bigint(20) NOT NULL DEFAULT 0 COMMENT '攻击类型,默认0, 全部攻击', + `flowBandWidth` int(11) UNSIGNED NULL DEFAULT 1024 COMMENT '攻击流量占用带宽(MB)', + `currentStatus` int(11) NOT NULL DEFAULT 0 COMMENT '状态,0:停止。1:启动', PRIMARY KEY (`id`) USING BTREE, - INDEX `dispose_task_device_capacity_id_fk` (`disposeCapacity`) USING BTREE, - INDEX `dispose_task_user_account_id_fk` (`accountId`) USING BTREE + INDEX `dispose_task_device_capacity_id_fk` (`disposeCapacity`) USING BTREE, + INDEX `dispose_task_user_account_id_fk` (`accountId`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 @@ -134,10 +135,10 @@ CREATE TABLE `dispose_task` DROP TABLE IF EXISTS `service_group`; CREATE TABLE `service_group` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '业务ID唯一标识符', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '业务ID唯一标识符', `serviceId` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '业务ID', `serviceType` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '业务类型:SERVER,WEB,DNS,GAME', - `serviceBandwidth` int(11) NOT NULL DEFAULT 1 COMMENT '业务带宽,单位M', + `serviceBandwidth` int(11) NOT NULL DEFAULT 1 COMMENT '业务带宽,单位M', `serviceIp` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '业务IP地址,逗号分割', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB @@ -152,17 +153,17 @@ CREATE TABLE `service_group` DROP TABLE IF EXISTS `user_account`; CREATE TABLE `user_account` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '账户唯一编号', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '账户唯一编号', `username` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户名', `password` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密码', `createTime` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间', `operators` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '操作员', `lastLoginTime` timestamp(0) NOT NULL DEFAULT '1970-01-02 00:00:00' COMMENT '最后一次成功登录时间', - `token` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '用户token', + `token` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '用户token', `lastAccess` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '最后一次访问时间戳', `lockTime` timestamp(0) NOT NULL DEFAULT '1970-01-02 00:00:00' COMMENT '账户锁定时间', - `pwdErrTimes` int(11) NOT NULL DEFAULT 0 COMMENT '密码错误次数', - `status` int(11) NULL DEFAULT 0 COMMENT '账户状态', + `pwdErrTimes` int(11) NOT NULL DEFAULT 0 COMMENT '密码错误次数', + `status` int(11) NULL DEFAULT 0 COMMENT '账户状态', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `USERNAME` (`username`) USING BTREE ) ENGINE = InnoDB @@ -171,4 +172,16 @@ CREATE TABLE `user_account` COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC; -SET FOREIGN_KEY_CHECKS = 1; +SET +FOREIGN_KEY_CHECKS = 1; + +-- ---------------------------- +-- Table structure for msg_serial +-- ---------------------------- +DROP TABLE IF EXISTS `msg_serial`; +CREATE TABLE `msg_serial` +( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `msgSerial` bigint(20) NOT NULL DEFAULT '1', + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; diff --git a/src/main/resources/mappers/MsgSerial.xml b/src/main/resources/mappers/MsgSerial.xml new file mode 100644 index 00000000..4e9ceaa8 --- /dev/null +++ b/src/main/resources/mappers/MsgSerial.xml @@ -0,0 +1,23 @@ + + + + + + + + + + INSERT + IGNORE INTO msg_serial(msgSerial) + VALUES ( + #{msgSerial} + ) + + + + \ No newline at end of file diff --git a/src/test/java/com/dispose/test/dev/mapper/MsgSerialMapperTest.java b/src/test/java/com/dispose/test/dev/mapper/MsgSerialMapperTest.java new file mode 100644 index 00000000..c64b40db --- /dev/null +++ b/src/test/java/com/dispose/test/dev/mapper/MsgSerialMapperTest.java @@ -0,0 +1,60 @@ +package com.dispose.test.dev.mapper; + +import com.dispose.mapper.MsgSerialMapper; +import com.dispose.pojo.entity.MsgSerial; +import com.dispose.test.dev.Global.InitTestEnvironment; +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; + +/** + * The message serial test. + */ +@RunWith(SpringRunner.class) +@SpringBootTest +@Slf4j +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@Transactional +public class MsgSerialMapperTest extends InitTestEnvironment { + + /** + * The message serial mapper. + */ + @Resource + MsgSerialMapper msgSerialMapper; + + /** + * A 1 add new message serial. + */ + @Test + public void a1_addMsgSerial() { + for (long i = 1L; i <= 10L; i++) { + MsgSerial msgSerial = MsgSerial.builder() + .msgSerial(i) + .build(); + log.info("++++++++++++++++++MsgSerial {}", msgSerial.toString()); + msgSerialMapper.addMsgSerial(msgSerial); + } + } + + @Test + public void a2_getMaxMsgSerial() { + for (long i = 1L; i <= 16L; i++) { + MsgSerial msgSerial = MsgSerial.builder() + .msgSerial(i) + .build(); + msgSerialMapper.addMsgSerial(msgSerial); + } + long maxMsgSerial = msgSerialMapper.getMaxMsgSerial(); + log.info("+++++++++++++++++++ max MsgSerial {}", maxMsgSerial); + Assert.assertEquals(maxMsgSerial, 16); + } +} From 99d8f668be0b541781f71ccb47c45784981634d8 Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Tue, 14 Sep 2021 15:08:12 +0800 Subject: [PATCH 4/5] =?UTF-8?q?OCT=20REM:=201.=20=E4=BC=98=E5=8C=96control?= =?UTF-8?q?ler=E5=B1=82=E4=BB=A3=E7=A0=81=202.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=AD=98=E5=82=A8MsgSerial=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=92=8C=E8=8E=B7=E5=8F=96=E6=9C=80=E5=A4=A7MsgSerial?= =?UTF-8?q?=E5=80=BC=E6=96=B9=E6=B3=95=204.=20=E4=BF=AE=E6=94=B9=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=E5=85=A5=E5=8F=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/dispose/common/Constants.java | 24 +++- .../dispose/controller/kafkaController.java | 136 ++++++++++-------- .../com/dispose/manager/MsgSerialManager.java | 4 +- .../manager/impl/MsgSerialManagerImpl.java | 16 ++- .../com/dispose/service/MsgSerialService.java | 5 +- .../service/impl/MsgSerialServiceImpl.java | 20 ++- .../dev/controller/KafkaControllerTest.java | 6 +- 7 files changed, 137 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/dispose/common/Constants.java b/src/main/java/com/dispose/common/Constants.java index 29c7272f..39f1a00b 100644 --- a/src/main/java/com/dispose/common/Constants.java +++ b/src/main/java/com/dispose/common/Constants.java @@ -6,6 +6,8 @@ import java.util.Map; /** * The emos constants. + * + * @author */ public class Constants { /** @@ -62,6 +64,22 @@ public class Constants { } /** + * 各省份城市 + */ + public static final String REGION_BEIJING = "北京"; + public static final String REGION_SHANGHAI = "上海"; + public static final String REGION_TIANJIN = "天津"; + public static final String REGION_CHONGQING = "重庆"; + + /** + * 处置类型(1:清洗,2:黑洞,3:高防) + */ + public static final int CLEANUP = 1; + public static final int BLACKHOOL = 2; + public static final int HIDEPEND = 3; + + /** + * /** * 派单eoms模板 */ public static final String DISPATCH_TEMPLATE = "\nMsgSerial:{0}\nSDN:{1}\nNeName:{2}\nEquipmentClass:99236\n" + @@ -71,9 +89,9 @@ public class Constants { "HolderType: \nAlarmStatus:{15}\nCorrelateAlarmFlag: \nAlarmActCount: \nNeIp:\nEmsId: \nVendor:99083\nAlarmText:{16}\n" + "NeAlias: \nVersion: \nRemoteNe: \nAlarmProvince:{17}\nAlarmRegion:{18}\nAlarmCounty: \nSite: \nSiteType: \nSiteProperty: \n" + "MachineroomIDofZGTT: \nBusinessSystem:{19}\nCircuitNo: \nMac: \nSpecialty:9\nNetworkType:903\nNeSubType: \nEffectCircuitNum: \n" + - "CircuitLevel: \nAlarmSeverity:3\nNmsAlarmId:0903-083-056-10-900001\nStandardAlarmName:{21}\nAlarmLogicClass:{22}\n" + - "AlarmLogicSubClass:{23}\nEffectOnEquipment:5\nEffectOnBusiness:4\nNmsAlarmType:1\nSendGroupFlag: \nStandardFlag:2\n" + - "AlarmExplanation:{24}\nBusinessType: \nBusinessInfo:{25}\nIsRelatedRemote: \nLocateNeStatus:1300\nProjectNo: \n" + + "CircuitLevel: \nAlarmSeverity:3\nNmsAlarmId:0903-083-056-10-900001\nStandardAlarmName:{20}\nAlarmLogicClass:{21}\n" + + "AlarmLogicSubClass:{22}\nEffectOnEquipment:5\nEffectOnBusiness:4\nNmsAlarmType:1\nSendGroupFlag: \nStandardFlag:2\n" + + "AlarmExplanation:{23}\nBusinessType: \nBusinessInfo:{24}\nIsRelatedRemote: \nLocateNeStatus:1300\nProjectNo: \n" + "ProjectName: \nProjectStartTime: \nProjectEndTime: \nGroupCustomer: \nCustomerLevel: \nServiceType: \nServiceLevel: \n" + "ServiceName: \nServiceCrossDomainType: \nInterruptCircuitState: \nCircuitLocateInfo: \nHomeClientNum: \nHomeCellNum: \n" + "LinkOnuNum: \n"; diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java index 7923c019..a955473f 100644 --- a/src/main/java/com/dispose/controller/kafkaController.java +++ b/src/main/java/com/dispose/controller/kafkaController.java @@ -10,6 +10,7 @@ import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO; import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo; import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq; import com.dispose.pojo.entity.MsgSerial; +import com.dispose.pojo.po.MulReturnType; import com.dispose.security.annotation.Decryption; import com.dispose.security.annotation.Encryption; import com.dispose.service.MsgSerialService; @@ -35,7 +36,7 @@ import java.util.Objects; /** * The type Auth controller. * - * @author + * @author */ @Controller @RequestMapping(value = "/kafka") @@ -108,18 +109,15 @@ public class kafkaController { */ private String createSendContent(EmosAlarmInfo alarmInfo) { try { - //告警序号的最大值2^32-1 - long indexEnd = 4294967296L; - //告警序号的最大值 - long indexStart = 1L; - //编号从1开始,以实时消息发布通道为单位进行编号。如果编号超过最大正整数(2^32-1),重新从1开始编号。 - long increment = msgSerialService.getMaxMessageSerial(); - increment = increment + 1; - if (increment > (indexEnd - 1)) { - increment = indexStart; - } + long dbIncrement = msgSerialService.getMaxMessageSerial(); + long increment = dbIncrement + 1; MsgSerial msgSerial = MsgSerial.builder().msgSerial(increment).build(); - msgSerialService.addMessageSerial(msgSerial); + MulReturnType returnType = msgSerialService.addMessageSerial(msgSerial); + if (returnType.getFirstParam() == ErrorCode.ERR_OK) { + increment = returnType.getSecondParam(); + } else { + increment = dbIncrement; + } String dstIp = alarmInfo.getDstIp(); String alarmId = alarmInfo.getAlarmId(); @@ -130,7 +128,7 @@ public class kafkaController { //告警类型vendorAlarmType、告警级别vendorSeverity、告警号vendorAlarmId、告警标题AlarmTitle、告警可能原因ProbableCauseTxt String type = Constants.ATTACK_TYPE.get(alarmInfo.getAttackType()); - String vendorSeverity = changeCharacatorCode("一级"); + String vendorSeverity = characterEncode("一级"); String vendorAlarmType; String vendorAlarmId; if (type != null) { @@ -144,87 +142,103 @@ public class kafkaController { } else { vendorAlarmType = alarmInfo.getAttackType(); vendorAlarmId = "0"; - log.info("get unKnow attack type:" + vendorAlarmType); + log.info("unKnown attack type:" + vendorAlarmType); } String alarmStatus = String.valueOf(Constants.ACTIVE_ALARM_STATUS); - String alarmText = changeCharacatorCode(getAlarmText(alarmInfo)); - String alarmExplanation = changeCharacatorCode(getAlarmExplanation(alarmInfo)); + String alarmText = characterEncode(getAlarmText(alarmInfo)); + String alarmExplanation = characterEncode(getAlarmExplanation(alarmInfo)); String content = MessageFormat.format(Constants.DISPATCH_TEMPLATE, increment, dstIp, dstIp, alarmId, alarmId, dstIp, dstIp, locateInfo, eventTime, cancelTime, - vendorAlarmType, vendorSeverity, vendorAlarmId, changeCharacatorCode("重保攻击事件告警"), - changeCharacatorCode("DDos攻击事件"), alarmStatus, alarmText, - changeCharacatorCode(alarmInfo.getDstProvince()), changeCharacatorCode(alarmInfo.getDstCity()), - changeCharacatorCode("网络部集中抗D系统"), changeCharacatorCode("DDOS攻击事件告警"), - changeCharacatorCode("安全告警"), changeCharacatorCode("DDOS告警"), - alarmExplanation, changeCharacatorCode("集中抗D")); + vendorAlarmType, vendorSeverity, vendorAlarmId, characterEncode("重保攻击事件告警"), + characterEncode("DDos攻击事件"), alarmStatus, alarmText, + characterEncode(alarmInfo.getDstProvince()), characterEncode(alarmInfo.getDstCity()), + characterEncode("网络部集中抗D系统"), characterEncode("DDOS攻击事件告警"), + characterEncode("安全告警"), characterEncode("DDOS告警"), + alarmExplanation, characterEncode("集中抗D")); return content; - } catch (Exception e) { log.error("createSendContent告警消息异常,详细信息:{}", ExceptionUtils.getStackTrace(e)); return null; } } + /** + * 获取告警事件AlarmEvent + */ private String getAlarmEvent(EmosAlarmInfo a) { return a.getDstIp() + " ddos attack alarm, " + a.getAttackType() + ", " + a.getStartTime() + ", " + a.getMaxBps() + ", " + a.getMaxPps(); } - private String changeCharacatorCode(String con) { + /** + * 数据编码都采用GBK编码方式 + */ + private String characterEncode(String character) { try { - return new String(con.getBytes("GBK"), "GBK"); + return new String(character.getBytes("GBK"), "GBK"); } catch (Exception e) { - log.info("change failed:" + e.getMessage()); - return con; + log.info("character encoding failed:" + e.getMessage()); + return character; } } + /** + * 获取告警区域 + */ private String getAreaDes(String province, String city) { - if (province.compareTo("北京") == 0) { - return "北京市"; + String areaDes; + if (Constants.REGION_BEIJING.equals(province)) { + areaDes = "北京市"; + } else if (Constants.REGION_SHANGHAI.equals(province)) { + areaDes = "上海市"; + } else if (Constants.REGION_TIANJIN.equals(province)) { + areaDes = "天津市"; + } else if (Constants.REGION_CHONGQING.equals(province)) { + areaDes = "重庆市"; + } else { + areaDes = province + "省" + city + "市"; } - if (province.compareTo("上海") == 0) { - return "上海市"; - } - if (province.compareTo("天津") == 0) { - return "天津市"; - } - if (province.compareTo("重庆") == 0) { - return "重庆市"; - } - return province + "省" + city + "市"; + + return areaDes; } + /** + * 获取处置类型(1:清洗,2:黑洞,3:高防) + */ + private String getOperateType(Integer disposeType) { + String operateType = null; + if (Constants.CLEANUP == disposeType) { + operateType = "清洗"; + } else if (Constants.BLACKHOOL == disposeType) { + operateType = "流控"; + } else if (Constants.HIDEPEND == disposeType) { + operateType = "黑洞"; + } + return operateType; + } + + /** + * 获取告警正文AlarmText + */ private String getAlarmText(EmosAlarmInfo a) { String area = getAreaDes(a.getDstProvince(), a.getDstCity()); - String op = ""; - if (a.getDisposeType() == 1) { - op = "清洗"; - } else if (a.getDisposeType() == 2) { - op = "流控"; - } else if (a.getDisposeType() == 3) { - op = "黑洞"; - } - return "攻击目的IP:" + a.getDstIp() + "," + area + "," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟"; + String operateType = getOperateType(a.getDisposeType()); + return "攻击目的IP:" + a.getDstIp() + "," + area + "," + "处置操作:" + operateType + "," + "处置时长:" + a.getDisposeTime() + "分钟"; } + /** + * 获取告警解释AlarmExplanation + */ private String getAlarmExplanation(EmosAlarmInfo a) { - String op = ""; - if (a.getDisposeType() == 1) { - op = "清洗"; - } else if (a.getDisposeType() == 2) { - op = "流控"; - } else if (a.getDisposeType() == 3) { - op = "黑洞"; - } - String srcIp = ""; + String operateType = getOperateType(a.getDisposeType()); + StringBuilder srcIp = new StringBuilder(); for (String ip : a.getSrcIpLs()) { - srcIp = srcIp + ip + ","; + srcIp.append(ip).append(","); } - if (!srcIp.isEmpty()) { - srcIp = srcIp.substring(0, srcIp.length() - 1); + if (srcIp.length() > 0) { + srcIp = new StringBuilder(srcIp.substring(0, srcIp.length() - 1)); } - return "攻击目的IP:" + a.getDstIp() + "," + "攻击源地址:(" + srcIp + ")," + "处置操作:" + op + "," + "处置时长:" + a.getDisposeTime() + "分钟"; + return "攻击目的IP:" + a.getDstIp() + "," + "攻击源地址:(" + srcIp + ")," + "处置操作:" + operateType + "," + "处置时长:" + a.getDisposeTime() + "分钟"; } } diff --git a/src/main/java/com/dispose/manager/MsgSerialManager.java b/src/main/java/com/dispose/manager/MsgSerialManager.java index b279c41b..bebda584 100644 --- a/src/main/java/com/dispose/manager/MsgSerialManager.java +++ b/src/main/java/com/dispose/manager/MsgSerialManager.java @@ -1,5 +1,6 @@ package com.dispose.manager; +import com.dispose.common.ErrorCode; import com.dispose.pojo.entity.MsgSerial; public interface MsgSerialManager { @@ -7,8 +8,9 @@ public interface MsgSerialManager { * Add user business error code. * * @param msgSerial the message serial + * @return the error code */ - void addMsgSerialNumber(MsgSerial msgSerial); + ErrorCode addMsgSerialNumber(MsgSerial msgSerial); /** * get new max message serial. diff --git a/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java index 1e4a787a..ab26b187 100644 --- a/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java +++ b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java @@ -1,5 +1,6 @@ package com.dispose.manager.impl; +import com.dispose.common.ErrorCode; import com.dispose.manager.MsgSerialManager; import com.dispose.mapper.MsgSerialMapper; import com.dispose.pojo.entity.MsgSerial; @@ -17,10 +18,19 @@ public class MsgSerialManagerImpl implements MsgSerialManager { @Resource private MsgSerialMapper msgSerialMapper; - + /** + * Add user business error code. + * + * @param msgSerial the message serial + * @return the error code + */ @Override - public void addMsgSerialNumber(MsgSerial msgSerial) { - msgSerialMapper.addMsgSerial(msgSerial); + public ErrorCode addMsgSerialNumber(MsgSerial msgSerial) { + if (msgSerialMapper.addMsgSerial(msgSerial) == 1) { + return ErrorCode.ERR_OK; + } else { + return ErrorCode.ERR_DATABASE; + } } /** diff --git a/src/main/java/com/dispose/service/MsgSerialService.java b/src/main/java/com/dispose/service/MsgSerialService.java index f36dee2b..6d6af602 100644 --- a/src/main/java/com/dispose/service/MsgSerialService.java +++ b/src/main/java/com/dispose/service/MsgSerialService.java @@ -1,14 +1,17 @@ package com.dispose.service; +import com.dispose.common.ErrorCode; import com.dispose.pojo.entity.MsgSerial; +import com.dispose.pojo.po.MulReturnType; public interface MsgSerialService { /** * add message serial. * * @param msgSerial the message serial + * @return the mul return type */ - void addMessageSerial(MsgSerial msgSerial); + MulReturnType addMessageSerial(MsgSerial msgSerial); /** * get new max message serial. diff --git a/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java b/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java index b3c19d2b..63be8481 100644 --- a/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java +++ b/src/main/java/com/dispose/service/impl/MsgSerialServiceImpl.java @@ -1,7 +1,9 @@ package com.dispose.service.impl; +import com.dispose.common.ErrorCode; import com.dispose.manager.MsgSerialManager; import com.dispose.pojo.entity.MsgSerial; +import com.dispose.pojo.po.MulReturnType; import com.dispose.service.MsgSerialService; import org.springframework.stereotype.Service; @@ -16,10 +18,24 @@ public class MsgSerialServiceImpl implements MsgSerialService { * add message serial. * * @param msgSerial the message serial + * @return the mul return type */ @Override - public void addMessageSerial(MsgSerial msgSerial) { - msgSerialManager.addMsgSerialNumber(msgSerial); + public MulReturnType addMessageSerial(MsgSerial msgSerial) { + //告警序号的最大值2^32-1 + long indexEnd = 4294967295L; + long indexStart = 1L; + long currentSerial = msgSerial.getMsgSerial(); + //编号从1开始,以实时消息发布通道为单位进行编号。如果编号超过最大正整数(2^32-1),重新从1开始编号。 + if (currentSerial > indexEnd) { + currentSerial = indexStart; + } + + if (msgSerialManager.addMsgSerialNumber(msgSerial) == ErrorCode.ERR_OK) { + return new MulReturnType<>(ErrorCode.ERR_OK, currentSerial); + } else { + return new MulReturnType<>(ErrorCode.ERR_DATABASE, null); + } } /** diff --git a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java index 83a52d64..6521d0c3 100644 --- a/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java +++ b/src/test/java/com/dispose/test/dev/controller/KafkaControllerTest.java @@ -75,10 +75,10 @@ public class KafkaControllerTest extends InitTestEnvironment { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); disposeParam.set("alarmId", "1"); disposeParam.set("dstIp", "192.168.1.1"); - disposeParam.set("attackType", "1"); + disposeParam.set("attackType", "RSTFlood"); disposeParam.set("bpspps", "bps"); - disposeParam.set("dstProvince", "ZHEJIANG"); - disposeParam.set("dstCity", "HANGZHOU"); + disposeParam.set("dstProvince", "浙江"); + disposeParam.set("dstCity", "杭州"); disposeParam.set("startTime", sdf.format(new Date())); // 1清洗,2流控,3黑洞 disposeParam.set("disposeType", 1); From 7c89c400bffbaea644a3a7bde0d62378b9778872 Mon Sep 17 00:00:00 2001 From: chenlinghy Date: Wed, 15 Sep 2021 15:52:49 +0800 Subject: [PATCH 5/5] =?UTF-8?q?OCT=20REM:=201.=20emos=E6=B4=BE=E5=8D=95?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E5=AD=98=E5=82=A8=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispose/controller/kafkaController.java | 10 ++++ .../com/dispose/manager/AlarmInfoManager.java | 17 +++++++ .../manager/impl/AlarmInfoManagerImpl.java | 34 +++++++++++++ .../manager/impl/MsgSerialManagerImpl.java | 5 ++ .../mapper/AlarmInformationMapper.java | 18 +++++++ .../com/dispose/mapper/MsgSerialMapper.java | 5 ++ .../dispose/pojo/entity/AlarmInformation.java | 51 +++++++++++++++++++ .../com/dispose/pojo/entity/MsgSerial.java | 10 ++-- src/main/resources/db/schema.sql | 18 ++++++- .../resources/mappers/AlarmInformation.xml | 19 +++++++ 10 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/dispose/manager/AlarmInfoManager.java create mode 100644 src/main/java/com/dispose/manager/impl/AlarmInfoManagerImpl.java create mode 100644 src/main/java/com/dispose/mapper/AlarmInformationMapper.java create mode 100644 src/main/java/com/dispose/pojo/entity/AlarmInformation.java create mode 100644 src/main/resources/mappers/AlarmInformation.xml diff --git a/src/main/java/com/dispose/controller/kafkaController.java b/src/main/java/com/dispose/controller/kafkaController.java index a955473f..113d7aed 100644 --- a/src/main/java/com/dispose/controller/kafkaController.java +++ b/src/main/java/com/dispose/controller/kafkaController.java @@ -4,11 +4,13 @@ import com.alibaba.fastjson.JSONObject; import com.dispose.common.Constants; import com.dispose.common.ErrorCode; import com.dispose.config.KafkaConfiguration; +import com.dispose.manager.AlarmInfoManager; import com.dispose.pojo.dto.protocol.base.BaseRespStatus; import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO; import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO; import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo; import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq; +import com.dispose.pojo.entity.AlarmInformation; import com.dispose.pojo.entity.MsgSerial; import com.dispose.pojo.po.MulReturnType; import com.dispose.security.annotation.Decryption; @@ -59,6 +61,12 @@ public class kafkaController { @Resource private MsgSerialService msgSerialService; + /** + * The alarm information manager. + */ + @Resource + private AlarmInfoManager alarmInfoManager; + /** * Dispatch command sent to kafka. * @@ -86,6 +94,8 @@ public class kafkaController { //保存数据格式到数据库 log.info("send alarm :{}", content); + AlarmInformation alarmInformation = AlarmInformation.builder().alarmInfo(content).build(); + alarmInfoManager.addAlarmInfo(alarmInformation); //推动数据格式到kafka ListenableFuture> sendResult = kafkaConfiguration diff --git a/src/main/java/com/dispose/manager/AlarmInfoManager.java b/src/main/java/com/dispose/manager/AlarmInfoManager.java new file mode 100644 index 00000000..65bb579d --- /dev/null +++ b/src/main/java/com/dispose/manager/AlarmInfoManager.java @@ -0,0 +1,17 @@ +package com.dispose.manager; + +import com.dispose.pojo.entity.AlarmInformation; + +/** + * The interface alarm information manager. + * + * @author + */ +public interface AlarmInfoManager { + /** + * Add alarm information. + * + * @param alarmInformation the alarm information + */ + void addAlarmInfo(AlarmInformation alarmInformation); +} diff --git a/src/main/java/com/dispose/manager/impl/AlarmInfoManagerImpl.java b/src/main/java/com/dispose/manager/impl/AlarmInfoManagerImpl.java new file mode 100644 index 00000000..d953f3a7 --- /dev/null +++ b/src/main/java/com/dispose/manager/impl/AlarmInfoManagerImpl.java @@ -0,0 +1,34 @@ +package com.dispose.manager.impl; + +import com.dispose.manager.AlarmInfoManager; +import com.dispose.mapper.AlarmInformationMapper; +import com.dispose.pojo.entity.AlarmInformation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * The interface alarm information manager. + * + * @author + */ +@Component +@Slf4j +public class AlarmInfoManagerImpl implements AlarmInfoManager { + /** + * The alarm information mapper. + */ + @Resource + private AlarmInformationMapper alarmInformationMapper; + + /** + * Add alarm information. + * + * @param alarmInformation the alarm information + */ + @Override + public void addAlarmInfo(AlarmInformation alarmInformation) { + alarmInformationMapper.addAlarmInfo(alarmInformation); + } +} diff --git a/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java index ab26b187..2c70f843 100644 --- a/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java +++ b/src/main/java/com/dispose/manager/impl/MsgSerialManagerImpl.java @@ -9,6 +9,11 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; +/** + * The interface message serial manager. + * + * @author + */ @Component @Slf4j public class MsgSerialManagerImpl implements MsgSerialManager { diff --git a/src/main/java/com/dispose/mapper/AlarmInformationMapper.java b/src/main/java/com/dispose/mapper/AlarmInformationMapper.java new file mode 100644 index 00000000..78335ee4 --- /dev/null +++ b/src/main/java/com/dispose/mapper/AlarmInformationMapper.java @@ -0,0 +1,18 @@ +package com.dispose.mapper; + +import com.dispose.pojo.entity.AlarmInformation; + +/** + * The interface alarm information mapper. + * + * @author + */ +public interface AlarmInformationMapper { + /** + * Add alarm information. + * + * @param alarmInformation the alarm information + * @return the int + */ + int addAlarmInfo(AlarmInformation alarmInformation); +} diff --git a/src/main/java/com/dispose/mapper/MsgSerialMapper.java b/src/main/java/com/dispose/mapper/MsgSerialMapper.java index 6dd62148..46402b16 100644 --- a/src/main/java/com/dispose/mapper/MsgSerialMapper.java +++ b/src/main/java/com/dispose/mapper/MsgSerialMapper.java @@ -2,6 +2,11 @@ package com.dispose.mapper; import com.dispose.pojo.entity.MsgSerial; +/** + * The interface message serial mapper. + * + * @author + */ public interface MsgSerialMapper { /** diff --git a/src/main/java/com/dispose/pojo/entity/AlarmInformation.java b/src/main/java/com/dispose/pojo/entity/AlarmInformation.java new file mode 100644 index 00000000..8606acbe --- /dev/null +++ b/src/main/java/com/dispose/pojo/entity/AlarmInformation.java @@ -0,0 +1,51 @@ +package com.dispose.pojo.entity; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import tk.mybatis.mapper.annotation.KeySql; +import tk.mybatis.mapper.annotation.NameStyle; +import tk.mybatis.mapper.code.Style; + +import javax.persistence.Id; +import javax.persistence.Table; +import java.io.Serializable; + +/** + * The emos alarm information. + * + * @author + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +@Table(name = "alarm_information") +@NameStyle(Style.normal) +public class AlarmInformation implements Serializable { + + /** + * The constant serialVersionUID. + */ + private static final long serialVersionUID = 1L; + + /** + * The id. + */ + @Id + @KeySql(useGeneratedKeys = true) + private Long id; + + /** + * The alarm information. + */ + private String alarmInfo; + + /** + * The creating time. + */ + private String createTime; +} diff --git a/src/main/java/com/dispose/pojo/entity/MsgSerial.java b/src/main/java/com/dispose/pojo/entity/MsgSerial.java index 4f89bfa3..f779b04c 100644 --- a/src/main/java/com/dispose/pojo/entity/MsgSerial.java +++ b/src/main/java/com/dispose/pojo/entity/MsgSerial.java @@ -14,9 +14,9 @@ import javax.persistence.Table; import java.io.Serializable; /** - * The type Dispose capacity. + * The message serial. * - * @author + * @author */ @Data @NoArgsConstructor @@ -33,16 +33,14 @@ public class MsgSerial implements Serializable { private static final long serialVersionUID = 1L; /** - * The Id. + * The id. */ @Id @KeySql(useGeneratedKeys = true) private Long id; /** - * The Device id. + * The message serial. */ private Long msgSerial; - - } diff --git a/src/main/resources/db/schema.sql b/src/main/resources/db/schema.sql index 587e8229..e2eda18c 100644 --- a/src/main/resources/db/schema.sql +++ b/src/main/resources/db/schema.sql @@ -182,6 +182,20 @@ DROP TABLE IF EXISTS `msg_serial`; CREATE TABLE `msg_serial` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, - `msgSerial` bigint(20) NOT NULL DEFAULT '1', + `msgSerial` bigint(20) NOT NULL DEFAULT '1' COMMENT '连续消息序号', PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; +) ENGINE=InnoDB AUTO_INCREMENT=50 DEFAULT CHARSET=utf8; + + +-- ---------------------------- +-- Table structure for alarm_information +-- ---------------------------- +DROP TABLE IF EXISTS `alarm_information`; +CREATE TABLE `alarm_information` +( + `id` bigint(11) NOT NULL AUTO_INCREMENT, + `alarmInfo` varchar(255) NOT NULL COMMENT 'emos告警内容', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8; + diff --git a/src/main/resources/mappers/AlarmInformation.xml b/src/main/resources/mappers/AlarmInformation.xml new file mode 100644 index 00000000..fadf3d36 --- /dev/null +++ b/src/main/resources/mappers/AlarmInformation.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + INSERT + IGNORE INTO alarm_information(alarmInfo) + VALUES ( + #{alarmInfo} + ) + + + \ No newline at end of file