REM:
1. 新增MsgSerial数据库
2. 增加存储MsgSerial字段和获取最大MsgSerial值方法
3. 修改MsgSerial字段递增方式(不采用redis自增长方式)
4. 数据库操作单元测试
This commit is contained in:
chenlinghy 2021-09-13 17:35:51 +08:00
parent 5c492a6c0b
commit 39531233a2
14 changed files with 373 additions and 85 deletions

View File

@ -47,11 +47,6 @@ kafka.producer.linger=1
kafka.producer.buffer.memory=33554432
kafka.producer.servers=172.21.44.189:9092,172.21.44.9:9092,172.21.44.244:9092,172.21.44.236:9092,172.21.44.80:9092
kafka.dispose.topic=ddos-vip-customer-ck
#redis config
spring.redis.database=0
spring.redis.host=172.21.48.211
spring.redis.port=6379
spring.redis.password=!Q2w3e4r
#信任主机配置
# 白名单开关
trust.auth-white-list-check=true

View File

@ -4,17 +4,16 @@ server.tomcat.basedir=./basedir
# 多个项目放在nginx下同个端口通过该配置区分
server.servlet.context-path=/dispose
# 配置数据源
#spring.datasource.url=jdbc:mysql://10.88.77.65:33061/ci_dispose_v2?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\
# =convertToNull&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.username=root
#spring.datasource.password=h0K0_8u
spring.datasource.url=jdbc:mysql://172.21.48.75:3306/ci_dispose_v1?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\
=convertToNull&useUnicode=true
spring.datasource.url=jdbc:mysql://10.88.77.65:33061/ci_dispose_v2?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\
=convertToNull&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=BCcf6Dd7&8
spring.datasource.password=h0K0_8u
#spring.datasource.url=jdbc:mysql://172.21.48.75:3306/ci_dispose_v1?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior\
# =convertToNull&useUnicode=true
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.username=root
#spring.datasource.password=BCcf6Dd7&8
# 配置连接池
spring.datasource.schema=classpath:test_db/unit_test.sql
spring.datasource.initialization-mode=always

View File

@ -225,10 +225,6 @@
<artifactId>dom4j-core</artifactId>
<version>1.4-dev-8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -9,14 +9,15 @@ import com.dispose.pojo.dto.protocol.base.ProtocolReqDTO;
import com.dispose.pojo.dto.protocol.base.ProtocolRespDTO;
import com.dispose.pojo.dto.protocol.kafka.EmosAlarmInfo;
import com.dispose.pojo.dto.protocol.kafka.AlarmInfoReq;
import com.dispose.pojo.entity.MsgSerial;
import com.dispose.security.annotation.Decryption;
import com.dispose.security.annotation.Encryption;
import com.dispose.service.MsgSerialService;
import com.dispose.validation.group.ValidGroups;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
@ -51,8 +52,11 @@ public class kafkaController {
@Resource
private KafkaConfiguration kafkaConfiguration;
/**
* The message serial service.
*/
@Resource
private StringRedisTemplate stringRedisTemplate;
private MsgSerialService msgSerialService;
/**
* Dispatch command sent to kafka.
@ -78,8 +82,11 @@ public class kafkaController {
rspInfo.setMessage(new String[]{ErrorCode.EMOS_CREATEMESSAGE_ERROR.getMsg()});
return ProtocolRespDTO.result(ErrorCode.EMOS_CREATEMESSAGE_ERROR, rspInfo);
}
//推动数据格式到kafka
//保存数据格式到数据库
log.info("send alarm :{}", content);
//推动数据格式到kafka
ListenableFuture<SendResult<String, String>> sendResult = kafkaConfiguration
.kafkaTemplate()
.sendDefault(0, System.currentTimeMillis(), "dispose", content);
@ -90,7 +97,6 @@ public class kafkaController {
.topic(), v.getRecordMetadata().partition()),
ex -> log.error("Kafka send error: {}", ex.getMessage()));
//保存数据格式到数据库
rspInfo.setStatus(ErrorCode.ERR_OK.getCode());
rspInfo.setMessage(new String[]{ErrorCode.ERR_OK.getMsg()});
return ProtocolRespDTO.result(ErrorCode.ERR_OK, rspInfo);
@ -102,13 +108,23 @@ public class kafkaController {
*/
private String createSendContent(EmosAlarmInfo alarmInfo) {
try {
//告警序号的最大值2^32-1
long indexEnd = 4294967296L;
//告警序号的最大值
long indexStart = 1L;
//编号从1开始以实时消息发布通道为单位进行编号如果编号超过最大正整数(2^32-1)重新从1开始编号
Long increment = stringRedisTemplate.opsForValue().increment("SEND_KAFKA_INCREMENT", 1);
long increment = msgSerialService.getMaxMessageSerial();
increment = increment + 1;
if (increment > (indexEnd - 1)) {
increment = indexStart;
}
MsgSerial msgSerial = MsgSerial.builder().msgSerial(increment).build();
msgSerialService.addMessageSerial(msgSerial);
String dstIp = alarmInfo.getDstIp();
String alarmId = alarmInfo.getAlarmId();
//LocateInfo: 192.168.11.6 ddos attack alarmMemcache , 2019-08-01 12:31, 2.1Gbps, 1.01Mpps
String locateInfo = getAlarmEvent(alarmInfo);
String eventTime = alarmInfo.getStartTime();
String cancelTime = alarmInfo.getEndTime();

View File

@ -0,0 +1,19 @@
package com.dispose.manager;
import com.dispose.pojo.entity.MsgSerial;
public interface MsgSerialManager {
/**
* Add user business error code.
*
* @param msgSerial the message serial
*/
void addMsgSerialNumber(MsgSerial msgSerial);
/**
* get new max message serial.
*
* @return the long
*/
Long getMaxMsgSerial();
}

View File

@ -0,0 +1,36 @@
package com.dispose.manager.impl;
import com.dispose.manager.MsgSerialManager;
import com.dispose.mapper.MsgSerialMapper;
import com.dispose.pojo.entity.MsgSerial;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class MsgSerialManagerImpl implements MsgSerialManager {
/**
* The message serial mapper.
*/
@Resource
private MsgSerialMapper msgSerialMapper;
@Override
public void addMsgSerialNumber(MsgSerial msgSerial) {
msgSerialMapper.addMsgSerial(msgSerial);
}
/**
* get new max message serial.
*
* @return the long
*/
@Override
public Long getMaxMsgSerial() {
return msgSerialMapper.getMaxMsgSerial();
}
}

View File

@ -0,0 +1,21 @@
package com.dispose.mapper;
import com.dispose.pojo.entity.MsgSerial;
public interface MsgSerialMapper {
/**
* Add new task int.
*
* @param msgSerial the msgSerial number
* @return the int
*/
int addMsgSerial(MsgSerial msgSerial);
/**
* get new max message serial.
*
* @return the long
*/
long getMaxMsgSerial();
}

View File

@ -0,0 +1,48 @@
package com.dispose.pojo.entity;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import tk.mybatis.mapper.annotation.KeySql;
import tk.mybatis.mapper.annotation.NameStyle;
import tk.mybatis.mapper.code.Style;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
/**
* The type Dispose capacity.
*
* @author <huangxin@cmhi.chinamoblie.com>
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
@Table(name = "msg_serial")
@NameStyle(Style.normal)
public class MsgSerial implements Serializable {
/**
* The constant serialVersionUID.
*/
private static final long serialVersionUID = 1L;
/**
* The Id.
*/
@Id
@KeySql(useGeneratedKeys = true)
private Long id;
/**
* The Device id.
*/
private Long msgSerial;
}

View File

@ -0,0 +1,20 @@
package com.dispose.service;
import com.dispose.pojo.entity.MsgSerial;
public interface MsgSerialService {
/**
* add message serial.
*
* @param msgSerial the message serial
*/
void addMessageSerial(MsgSerial msgSerial);
/**
* get new max message serial.
*
* @return the long
*/
Long getMaxMessageSerial();
}

View File

@ -0,0 +1,34 @@
package com.dispose.service.impl;
import com.dispose.manager.MsgSerialManager;
import com.dispose.pojo.entity.MsgSerial;
import com.dispose.service.MsgSerialService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class MsgSerialServiceImpl implements MsgSerialService {
@Resource
private MsgSerialManager msgSerialManager;
/**
* add message serial.
*
* @param msgSerial the message serial
*/
@Override
public void addMessageSerial(MsgSerial msgSerial) {
msgSerialManager.addMsgSerialNumber(msgSerial);
}
/**
* get new max message serial.
*
* @return the long
*/
@Override
public Long getMaxMessageSerial() {
return msgSerialManager.getMaxMsgSerial();
}
}

View File

@ -9,3 +9,11 @@ INSERT INTO `user_account`
VALUES (1, 'admin', 'c3855e6b6bb120450f160ba91134522868f89d36062f2061ebeefd80817e1d58', '2020-11-13 09:25:19', '',
'2021-01-20 10:18:56', '1db9ddc47de514eb16b7ec07d7f7f96f7a714ae00e1209755bab30d543a0a2c3',
'2021-01-20 10:20:57', '1970-01-02 00:00:00', 0, 0);
-- ----------------------------
-- Records of msg_serial
-- ----------------------------
INSERT INTO `msg_serial` VALUES ('1', '1');

View File

@ -15,7 +15,8 @@
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
SET
FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for device_task
@ -171,4 +172,16 @@ CREATE TABLE `user_account`
COLLATE = utf8_general_ci
ROW_FORMAT = DYNAMIC;
SET FOREIGN_KEY_CHECKS = 1;
SET
FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Table structure for msg_serial
-- ----------------------------
DROP TABLE IF EXISTS `msg_serial`;
CREATE TABLE `msg_serial`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`msgSerial` bigint(20) NOT NULL DEFAULT '1',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dispose.mapper.MsgSerialMapper">
<resultMap id="msg_serial" type="com.dispose.pojo.entity.MsgSerial">
<id column="id" property="id"/>
<result column="msgSerial" property="msgSerial"/>
</resultMap>
<insert id="addMsgSerial" useGeneratedKeys="true" keyProperty="id"
parameterType="com.dispose.pojo.entity.MsgSerial">
INSERT
IGNORE INTO msg_serial(msgSerial)
VALUES (
#{msgSerial}
)
</insert>
<select id="getMaxMsgSerial" resultType="java.lang.Long">
SELECT msgSerial
FROM msg_serial
ORDER BY ID DESC LIMIT 1
</select>
</mapper>

View File

@ -0,0 +1,60 @@
package com.dispose.test.dev.mapper;
import com.dispose.mapper.MsgSerialMapper;
import com.dispose.pojo.entity.MsgSerial;
import com.dispose.test.dev.Global.InitTestEnvironment;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* The message serial test.
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@Transactional
public class MsgSerialMapperTest extends InitTestEnvironment {
/**
* The message serial mapper.
*/
@Resource
MsgSerialMapper msgSerialMapper;
/**
* A 1 add new message serial.
*/
@Test
public void a1_addMsgSerial() {
for (long i = 1L; i <= 10L; i++) {
MsgSerial msgSerial = MsgSerial.builder()
.msgSerial(i)
.build();
log.info("++++++++++++++++++MsgSerial {}", msgSerial.toString());
msgSerialMapper.addMsgSerial(msgSerial);
}
}
@Test
public void a2_getMaxMsgSerial() {
for (long i = 1L; i <= 16L; i++) {
MsgSerial msgSerial = MsgSerial.builder()
.msgSerial(i)
.build();
msgSerialMapper.addMsgSerial(msgSerial);
}
long maxMsgSerial = msgSerialMapper.getMaxMsgSerial();
log.info("+++++++++++++++++++ max MsgSerial {}", maxMsgSerial);
Assert.assertEquals(maxMsgSerial, 16);
}
}