1. 重写格式化代码

This commit is contained in:
HuangXin 2023-08-12 12:36:39 +08:00
parent e7df31d28b
commit b1e49391e8
7 changed files with 98 additions and 47 deletions

View File

@ -28,9 +28,11 @@ public class ChannelInit<T> extends ChannelInitializer<SocketChannel> {
@Override @Override
protected void initChannel(SocketChannel channel) { protected void initChannel(SocketChannel channel) {
channel.pipeline().addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)).addLast("decode", channel.pipeline()
new YuanRongProtocolDecode()) .addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
.addLast("startFlag", new ProtocolStartEncode(2)).addLast("encode", new YuanRongProtocolEncode<T>()) .addLast("decode", new YuanRongProtocolDecode())
.addLast("startFlag", new ProtocolStartEncode(2))
.addLast("encode", new YuanRongProtocolEncode<T>())
.addLast("message", messageHandler); .addLast("message", messageHandler);
} }
} }

View File

@ -22,12 +22,14 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* The type Message handler. * The type Message handler.
*
* @param <T> the type parameter
*/ */
@Slf4j @Slf4j
@Component @Component
@ChannelHandler.Sharable @ChannelHandler.Sharable
@RequiredArgsConstructor @RequiredArgsConstructor
public class MessageHandler extends SimpleChannelInboundHandler<BaseBinaryProtocol> { public class MessageHandler<T> extends SimpleChannelInboundHandler<BaseBinaryProtocol<T>> {
/** /**
* The constant ctxMap. * The constant ctxMap.
*/ */
@ -43,7 +45,8 @@ public class MessageHandler extends SimpleChannelInboundHandler<BaseBinaryProtoc
if (evt instanceof IdleStateEvent idleStateEvent) { if (evt instanceof IdleStateEvent idleStateEvent) {
if (idleStateEvent.state() == IdleState.ALL_IDLE) { if (idleStateEvent.state() == IdleState.ALL_IDLE) {
log.info("{}:: Trigger Heart Signe", ctx.channel().id()); log.info("{}:: Trigger Heart Signe", ctx.channel()
.id());
} }
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
@ -53,26 +56,34 @@ public class MessageHandler extends SimpleChannelInboundHandler<BaseBinaryProtoc
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, BaseBinaryProtocol message) throws Exception { public void channelRead0(ChannelHandlerContext ctx, BaseBinaryProtocol message) throws Exception {
//获取到线程池eventLoop添加线程执行 //获取到线程池eventLoop添加线程执行
ctx.channel().eventLoop().execute(new Runnable() { ctx.channel()
@Override .eventLoop()
public void run() { .execute(() -> {
//长时间操作不至于长时间的业务操作导致Handler阻塞 //长时间操作不至于长时间的业务操作导致Handler阻塞
//log.info("{}:: Receive Message: {}", ctx.channel().id(), HelperUtils.bytesToHexString(message //log.info("{}:: Receive Message: {}", ctx.channel().id(), HelperUtils.bytesToHexString(message
// .getStart())); // .getStart()));
} });
});
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress sa = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress sa = (InetSocketAddress) ctx.channel()
.remoteAddress();
//List<ControlDevice> list = controlDeviceMapper.selectAll(); //List<ControlDevice> list = controlDeviceMapper.selectAll();
log.info("{}:: Connected <-- {}:{}", ctx.channel().id(), sa.getAddress().getHostAddress(), sa.getPort()); log.info("{}:: Connected <-- {}:{}", ctx.channel()
controlDeviceMapper.addControlDevice(ControlDevice.builder().deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP) .id(), sa.getAddress()
.deviceAddr(sa.getAddress().getHostAddress()).build()); .getHostAddress(), sa.getPort());
controlDeviceMapper.addControlDevice(ControlDevice.builder()
.deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP)
.deviceAddr(sa.getAddress()
.getHostAddress())
.build());
BaseBinaryProtocol<HeartProtocol> h = BaseBinaryProtocol.<HeartProtocol>builder().msgContent( BaseBinaryProtocol<HeartProtocol> h = BaseBinaryProtocol.<HeartProtocol>builder()
MessageContent.<HeartProtocol>builder().msgBody(new HeartProtocol()).build()).build(); .msgContent(MessageContent.<HeartProtocol>builder()
.msgBody(new HeartProtocol())
.build())
.build();
ctx.writeAndFlush(h); ctx.writeAndFlush(h);
super.channelActive(ctx); super.channelActive(ctx);
@ -80,8 +91,11 @@ public class MessageHandler extends SimpleChannelInboundHandler<BaseBinaryProtoc
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress sa = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress sa = (InetSocketAddress) ctx.channel()
log.info("{}:: Disonnected <-- {}", ctx.channel().id(), sa.getAddress().getHostAddress()); .remoteAddress();
log.info("{}:: Disonnected <-- {}", ctx.channel()
.id(), sa.getAddress()
.getHostAddress());
super.channelActive(ctx); super.channelActive(ctx);
ctx.close(); ctx.close();
} }

View File

@ -25,7 +25,9 @@ public class ProtocolStartEncode extends LengthFieldPrepender {
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(ctx.alloc().buffer(BaseBinaryProtocol.START.length()).writeBytes(BaseBinaryProtocol.START.getBytes())); out.add(ctx.alloc()
.buffer(BaseBinaryProtocol.START.length())
.writeBytes(BaseBinaryProtocol.START.getBytes()));
super.encode(ctx, msg, out); super.encode(ctx, msg, out);
} }
} }

View File

@ -27,11 +27,13 @@ public class YuanRongProtocolEncode<T> extends MessageToByteEncoder<BaseBinaryPr
ByteBuf byteBuf) { ByteBuf byteBuf) {
if (baseBinaryProtocol == null || baseBinaryProtocol.getMsgContent() == null || if (baseBinaryProtocol == null || baseBinaryProtocol.getMsgContent() == null ||
baseBinaryProtocol.getMsgContent().getMsgBody() == null) { baseBinaryProtocol.getMsgContent()
.getMsgBody() == null) {
return; return;
} }
T msgBody = baseBinaryProtocol.getMsgContent().getMsgBody(); T msgBody = baseBinaryProtocol.getMsgContent()
.getMsgBody();
byteBuf.writeByte(BaseBinaryProtocol.VERSIN); // 协议版本号 byteBuf.writeByte(BaseBinaryProtocol.VERSIN); // 协议版本号
byteBuf.writeInt(0); // 接收设备主ID byteBuf.writeInt(0); // 接收设备主ID
@ -43,15 +45,21 @@ public class YuanRongProtocolEncode<T> extends MessageToByteEncoder<BaseBinaryPr
byteBuf.writeInt(0); // 消息状态码 byteBuf.writeInt(0); // 消息状态码
if (msgBody instanceof HeartProtocol) { if (msgBody instanceof HeartProtocol) {
byteBuf.writeByte(ControlCommandName.COMMAND_HEART.getValue().byteValue()); // 消息类型 byteBuf.writeByte(ControlCommandName.COMMAND_HEART.getValue()
.byteValue()); // 消息类型
byteBuf.writeShort(0); // 消息内容长度 byteBuf.writeShort(0); // 消息内容长度
} else if (msgBody instanceof SensorControlProtocol msg) { } else if (msgBody instanceof SensorControlProtocol msg) {
byteBuf.writeByte(ControlCommandName.COMMAND_SENSOR.getValue().byteValue()); // 消息类型 byteBuf.writeByte(ControlCommandName.COMMAND_SENSOR.getValue()
.byteValue()); // 消息类型
if (!msg.getControllContents().isEmpty()) { if (!msg.getControllContents()
byteBuf.writeShort(13 + 4 * msg.getControllContents().size()); // 消息内容长度 .isEmpty()) {
byteBuf.writeByte(msg.getControlTunnel().getValue().byteValue()); byteBuf.writeShort(13 + 4 * msg.getControllContents()
.size()); // 消息内容长度
byteBuf.writeByte(msg.getControlTunnel()
.getValue()
.byteValue());
byteBuf.writeBytes(msg.getTunnelAddr()); byteBuf.writeBytes(msg.getTunnelAddr());
byteBuf.writeByte(msg.getMonth()); byteBuf.writeByte(msg.getMonth());
byteBuf.writeByte(msg.getDays()); byteBuf.writeByte(msg.getDays());
@ -59,31 +67,42 @@ public class YuanRongProtocolEncode<T> extends MessageToByteEncoder<BaseBinaryPr
byteBuf.writeByte(msg.getMinute()); byteBuf.writeByte(msg.getMinute());
byteBuf.writeByte(msg.getProvince()); byteBuf.writeByte(msg.getProvince());
byteBuf.writeByte(msg.getCity()); byteBuf.writeByte(msg.getCity());
byteBuf.writeByte(msg.getControlMode().getValue().byteValue()); byteBuf.writeByte(msg.getControlMode()
.getValue()
.byteValue());
byteBuf.writeByte(msg.getControlCommand()); byteBuf.writeByte(msg.getControlCommand());
byteBuf.writeByte(msg.getNControlInfo()); byteBuf.writeByte(msg.getNControlInfo());
for (var c : msg.getControllContents()) { for (var c : msg.getControllContents()) {
byteBuf.writeByte(c.getDistrictsCode()); byteBuf.writeByte(c.getDistrictsCode());
byteBuf.writeByte(c.getControlAction().getValue().byteValue()); byteBuf.writeByte(c.getControlAction()
.getValue()
.byteValue());
byteBuf.writeShort(c.getSensorId()); byteBuf.writeShort(c.getSensorId());
} }
} }
} else if (msgBody instanceof QuerySensorProtocol msg) { } else if (msgBody instanceof QuerySensorProtocol msg) {
byteBuf.writeByte(ControlCommandName.COMMAND_QUERY_SENSOR.getValue().byteValue()); // 消息类型 byteBuf.writeByte(ControlCommandName.COMMAND_QUERY_SENSOR.getValue()
.byteValue()); // 消息类型
if (!msg.getControllContents().isEmpty()) { if (!msg.getControllContents()
byteBuf.writeShort(2 + 4 * msg.getControllContents().size()); // 消息内容长度 .isEmpty()) {
byteBuf.writeShort(2 + 4 * msg.getControllContents()
.size()); // 消息内容长度
byteBuf.writeByte(msg.getProvince()); byteBuf.writeByte(msg.getProvince());
byteBuf.writeByte(msg.getCity()); byteBuf.writeByte(msg.getCity());
byteBuf.writeByte(msg.getNControlInfo()); byteBuf.writeByte(msg.getNControlInfo());
for (var c : msg.getControllContents()) { for (var c : msg.getControllContents()) {
byteBuf.writeByte(c.getDistrictsCode()); byteBuf.writeByte(c.getDistrictsCode());
byteBuf.writeByte(c.getControlAction().getValue().byteValue()); byteBuf.writeByte(c.getControlAction()
.getValue()
.byteValue());
byteBuf.writeShort(c.getSensorId()); byteBuf.writeShort(c.getSensorId());
} }
} }
} else { } else {
log.info("XXXXX Encode: {}", baseBinaryProtocol.getMsgContent().getMsgBody().getClass()); log.info("XXXXX Encode: {}", baseBinaryProtocol.getMsgContent()
.getMsgBody()
.getClass());
} }
log.info("\n{}", ByteBufUtil.prettyHexDump(byteBuf)); log.info("\n{}", ByteBufUtil.prettyHexDump(byteBuf));

View File

@ -7,6 +7,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -27,7 +28,7 @@ public class TcpServer implements ISocketServer {
* The Channel init. * The Channel init.
*/ */
@Resource @Resource
private final ChannelInit channelInit; private final ChannelInit<SocketChannel> channelInit;
/** /**
* The Netty socket configure. * The Netty socket configure.
@ -50,10 +51,15 @@ public class TcpServer implements ISocketServer {
*/ */
private void tcpServer(int port) { private void tcpServer(int port) {
try { try {
new ServerBootstrap().group(boosGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress( new ServerBootstrap().group(boosGroup, workerGroup)
new InetSocketAddress(port)).childHandler(channelInit).option(ChannelOption.SO_BACKLOG, 128) .channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, false).childOption(ChannelOption.SO_KEEPALIVE, .localAddress(new InetSocketAddress(port))
true).bind().sync(); .childHandler(channelInit)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind()
.sync();
log.info("Netty TCP Server Beginning Listen: {}", port); log.info("Netty TCP Server Beginning Listen: {}", port);
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage()); log.error(e.getMessage());

View File

@ -5,15 +5,24 @@ import lombok.Data;
import java.util.List; import java.util.List;
/**
* The type Query sensor protocol.
*/
@Data @Data
@Builder @Builder
public class QuerySensorProtocol { public class QuerySensorProtocol {
/**
* The Province.
*/
private byte province; private byte province;
/** /**
* The City. * The City.
*/ */
private byte city; private byte city;
/**
* The N control info.
*/
private byte nControlInfo; private byte nControlInfo;
/** /**
* The Controll contents. * The Controll contents.

View File

@ -2,15 +2,12 @@ package com.zjyr.beidouservice.mapper;
import com.zjyr.beidouservice.common.impl.ControlDeviceTypeName; import com.zjyr.beidouservice.common.impl.ControlDeviceTypeName;
import com.zjyr.beidouservice.pojo.entry.ControlDevice; import com.zjyr.beidouservice.pojo.entry.ControlDevice;
import com.zjyr.beidouservice.pojo.entry.ControlDeviceType;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.Rollback;
import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
@ -22,16 +19,18 @@ import java.util.List;
public class ControlDeviceMapperTest { public class ControlDeviceMapperTest {
@Resource @Resource
private ControlDeviceMapper controlDeviceMapper; private ControlDeviceMapper controlDeviceMapper;
@Test @Test
public void a1_getAllControlDevice() { public void a1_getAllControlDevice() {
List<ControlDevice> typeList = controlDeviceMapper.selectAll(); List<ControlDevice> typeList = controlDeviceMapper.selectAll();
Assertions.assertNotEquals(typeList.size(), 0); Assertions.assertNotEquals(typeList.size(), 0);
} }
@Test @Test
public void a2_addControlDevice() { public void a2_addControlDevice() {
ControlDevice dev = ControlDevice.builder().deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP).deviceAddr("127.0.0.2").build(); ControlDevice dev = ControlDevice.builder().deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP).deviceAddr(
int i = controlDeviceMapper.addControlDevice(dev); "127.0.0.2").build();
int i = controlDeviceMapper.addControlDevice(dev);
System.out.println("Add " + i); System.out.println("Add " + i);
} }
} }