diff --git a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ChannelInit.java b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ChannelInit.java index 4ae9b13..06a3293 100644 --- a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ChannelInit.java +++ b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ChannelInit.java @@ -3,10 +3,14 @@ package com.zjyr.beidouservice.adapter.impl.netty; import com.zjyr.beidouservice.adapter.impl.netty.decode.YuanRongProtocolDecode; import com.zjyr.beidouservice.adapter.impl.netty.encode.ProtocolStartEncode; import com.zjyr.beidouservice.adapter.impl.netty.encode.YuanRongProtocolEncode; +import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; import jakarta.annotation.Resource; +import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -17,21 +21,34 @@ import java.util.concurrent.TimeUnit; * * @param the type parameter */ +@EqualsAndHashCode(callSuper = true) @Component @RequiredArgsConstructor +@Data public class ChannelInit extends ChannelInitializer { + private static final int MAX_FRAME_LENGTH = 1024 * BaseBinaryProtocol.MAX_LEN; //最大长度 + private static final int LENGTH_FIELD_LENGTH = 2; //长度字段所占的字节数 + private static final int LENGTH_FIELD_OFFSET = BaseBinaryProtocol.START.length(); //长度偏移 + private static final int LENGTH_ADJUSTMENT = 0; + private static final int INITIAL_BYTES_TO_STRIP = BaseBinaryProtocol.START.length(); + /** * The Message handler. */ @Resource - private MessageHandler messageHandler; + private MessageHandler messageHandler; @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)) .addLast("decode", - new YuanRongProtocolDecode()) + new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, + LENGTH_FIELD_OFFSET, + LENGTH_FIELD_LENGTH, + LENGTH_ADJUSTMENT, + INITIAL_BYTES_TO_STRIP)) + .addLast("decodeProtocol", new YuanRongProtocolDecode()) .addLast("startFlag", new ProtocolStartEncode(2)) .addLast("encode", new YuanRongProtocolEncode()) .addLast("message", messageHandler); diff --git a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ISocketServer.java b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ISocketServer.java index 08ed06d..8d6c62c 100644 --- a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ISocketServer.java +++ b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/ISocketServer.java @@ -1,5 +1,6 @@ package com.zjyr.beidouservice.adapter.impl.netty; +import com.zjyr.beidouservice.pojo.vo.binary.MessageContent; import jakarta.annotation.PreDestroy; /** @@ -13,6 +14,8 @@ public interface ISocketServer { */ void start() throws Exception; + int sendData(T data, Long devId); + /** * Destory. * diff --git a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/MessageHandler.java b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/MessageHandler.java index 2f30e8d..8d8f12e 100644 --- a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/MessageHandler.java +++ b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/MessageHandler.java @@ -1,12 +1,14 @@ package com.zjyr.beidouservice.adapter.impl.netty; import com.zjyr.beidouservice.common.impl.ControlDeviceTypeName; -import com.zjyr.beidouservice.mapper.ControlDeviceMapper; +import com.zjyr.beidouservice.misc.HelperUtils; import com.zjyr.beidouservice.pojo.entry.ControlDevice; import com.zjyr.beidouservice.pojo.vo.ControlAdapterSocketCtx; import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol; +import com.zjyr.beidouservice.pojo.vo.binary.ControllerStatus; import com.zjyr.beidouservice.pojo.vo.binary.HeartProtocol; import com.zjyr.beidouservice.pojo.vo.binary.MessageContent; +import com.zjyr.beidouservice.service.impl.ControlDeviceServiceImpl; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,11 +36,12 @@ public class MessageHandler extends SimpleChannelInboundHandler ctxMap = new ConcurrentHashMap<>(); + /** * The Control device mapper. */ @Resource - private ControlDeviceMapper controlDeviceMapper; + private ControlDeviceServiceImpl controlDeviceService; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -61,13 +64,15 @@ public class MessageHandler extends SimpleChannelInboundHandler { - //长时间操作,不至于长时间的业务操作导致Handler阻塞 - //log.info("{}:: Receive Message: {}", ctx.channel().id(), HelperUtils.bytesToHexString(message - // .getStart())); - }); + public void channelRead0(ChannelHandlerContext ctx, BaseBinaryProtocol message) throws Exception { + if (message.getMsgContent().getMsgBody() instanceof ControllerStatus status) { + log.info("{}:: Receive Message: {}", + ctx.channel().id(), + HelperUtils.bytesToHexString(status.getBeidouSignaltrength())); + } else { + log.info("{}:: Receive Message: {}", ctx.channel().id(), ""); + } + } @Override @@ -75,16 +80,23 @@ public class MessageHandler extends SimpleChannelInboundHandler list = controlDeviceMapper.selectAll(); log.info("{}:: Connected <-- {}:{}", ctx.channel().id(), sa.getAddress().getHostAddress(), sa.getPort()); - controlDeviceMapper.addControlDevice(ControlDevice.builder() - .deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP) - .deviceAddr(sa.getAddress().getHostAddress()) - .build()); + + ControlDevice dev = ControlDevice.builder() + .deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP) + .deviceAddr(sa.getAddress().getHostAddress()) + .build(); + + controlDeviceService.addControlDevice(dev); + if (ctxMap.get(dev.getId()) != null) { + ctxMap.remove(dev.getId()); + } else { + ctxMap.put(dev.getId(), ControlAdapterSocketCtx.builder().controlAdapterId(dev.getId()).ctx(ctx).build()); + } + MessageContent msgCont = MessageContent.builder() .msgBody(new HeartProtocol()) .build(); - BaseBinaryProtocol h = BaseBinaryProtocol.builder().msgContent(msgCont).build(); - - ctx.writeAndFlush(h); + ctx.writeAndFlush(BaseBinaryProtocol.builder().msgContent(msgCont).build()); super.channelActive(ctx); } @@ -92,7 +104,22 @@ public class MessageHandler extends SimpleChannelInboundHandler entry : ctxMap.entrySet()) { + if (entry.getValue().getCtx().channel().id() == ctx.channel().id()) { + ctxMap.remove(entry.getKey()); + } + } + super.channelActive(ctx); ctx.close(); } + + public void channelSendData(E proMsg, Long devId) { + ControlAdapterSocketCtx ctx = ctxMap.get(devId); + MessageContent msg = MessageContent.builder().msgBody(proMsg).build(); + if (ctx != null) { + ctx.getCtx().channel().writeAndFlush(BaseBinaryProtocol.builder().msgContent(msg).build()); + } + } } diff --git a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/decode/YuanRongProtocolDecode.java b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/decode/YuanRongProtocolDecode.java index 0a584fd..959c2fb 100644 --- a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/decode/YuanRongProtocolDecode.java +++ b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/decode/YuanRongProtocolDecode.java @@ -1,5 +1,11 @@ package com.zjyr.beidouservice.adapter.impl.netty.decode; +import com.zjyr.beidouservice.common.CommonEnumHandler; +import com.zjyr.beidouservice.common.impl.ControlCommandName; +import com.zjyr.beidouservice.common.impl.SensorStatusName; +import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol; +import com.zjyr.beidouservice.pojo.vo.binary.ControllerStatus; +import com.zjyr.beidouservice.pojo.vo.binary.MessageContent; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; @@ -18,9 +24,61 @@ import java.util.List; @Slf4j public class YuanRongProtocolDecode extends ByteToMessageDecoder { @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { - // cache receive data - //receiveBuffer.writeBytes(buffer.array()); - log.info(ByteBufUtil.prettyHexDump(buffer)); + protected void decode(ChannelHandlerContext channelHandlerContext, + ByteBuf buf, + List list) throws Exception { + log.info("\n{}", ByteBufUtil.prettyHexDump(buf)); + short msgLength = buf.readShort(); + byte version = buf.readByte(); + int recvMajorId = buf.readInt(); + int recvMinorId = buf.readInt(); + int sendMajorId = buf.readInt(); + Integer sendMinorId = buf.readInt(); + byte cryptCytp = buf.readByte(); + Integer timeStamp = buf.readInt(); + Integer statusCode = buf.readInt(); + byte msgType = buf.readByte(); + short msgSize = buf.readShort(); + + ControlCommandName cmd = CommonEnumHandler.codeOf(ControlCommandName.class, msgType); + if (cmd != null) { + switch (cmd) { + case COMMAND_REPORT_HEART -> { + byte[] beidouSignal = new byte[10]; + buf.readBytes(beidouSignal, 0, 10); + SensorStatusName wireStatus = CommonEnumHandler.codeOf(SensorStatusName.class, buf.readByte()); + SensorStatusName phoneStatus = CommonEnumHandler.codeOf(SensorStatusName.class, buf.readByte()); + + ControllerStatus status = ControllerStatus.builder() + .beidouSignaltrength(beidouSignal) + .wirelessStatus(wireStatus) + .telphoneStatus(phoneStatus) + .build(); + MessageContent msgCtx = MessageContent.builder() + .msgType(msgType) + .msgSize(msgSize) + .msgBody(status) + .build(); + + list.add(BaseBinaryProtocol.builder() + .msgLength(msgLength) + .version(version) + .recvMajorId(recvMajorId) + .recvMinorId(recvMinorId) + .sendMajorId(sendMajorId) + .sendMinorId(sendMinorId) + .cryptoType(cryptCytp) + .timeStamp(timeStamp) + .statusCode(statusCode) + .msgContent(msgCtx) + .build()); + } + case COMMAND_REPORT_SENSOR -> { + } + case COMMAND_REPORT_QUERY_SENSOR -> { + } + default -> log.error("Unsupport Command: {}({})", cmd, msgType); + } + } } } diff --git a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/encode/YuanRongProtocolEncode.java b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/encode/YuanRongProtocolEncode.java index 2e3589b..e7f3bcf 100644 --- a/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/encode/YuanRongProtocolEncode.java +++ b/src/main/java/com/zjyr/beidouservice/adapter/impl/netty/encode/YuanRongProtocolEncode.java @@ -34,14 +34,14 @@ public class YuanRongProtocolEncode extends MessageToByteEncoder int sendData(T data, Long devId) { + channelInit.getMessageHandler().channelSendData(data, devId); + return 0; + } } diff --git a/src/main/java/com/zjyr/beidouservice/common/impl/SensorStatusName.java b/src/main/java/com/zjyr/beidouservice/common/impl/SensorStatusName.java new file mode 100644 index 0000000..6bd36ed --- /dev/null +++ b/src/main/java/com/zjyr/beidouservice/common/impl/SensorStatusName.java @@ -0,0 +1,30 @@ +package com.zjyr.beidouservice.common.impl; + +import com.zjyr.beidouservice.common.EnumerationBase; + +public enum SensorStatusName implements EnumerationBase { + WIRELESS_NOEXISTS(0, "WIRELESS_NOEXISTS"), + + WIRELESS_NORMAL(1, "WIRELESS_NORMAL"), + + WIRELESS_EXCEPTION(2, "WIRELESS_EXCEPTION"), + ; + + private final Integer code; + private final String desc; + + SensorStatusName(int val, String desc) { + this.code = val; + this.desc = desc; + } + + @Override + public Integer getValue() { + return this.code; + } + + @Override + public String getDescription() { + return this.desc; + } +} diff --git a/src/main/java/com/zjyr/beidouservice/misc/HelperUtils.java b/src/main/java/com/zjyr/beidouservice/misc/HelperUtils.java index 6477bea..d5ab3f3 100644 --- a/src/main/java/com/zjyr/beidouservice/misc/HelperUtils.java +++ b/src/main/java/com/zjyr/beidouservice/misc/HelperUtils.java @@ -18,7 +18,7 @@ public class HelperUtils { if (sTemp.length() < 2) { sb.append(0); } - sb.append(sTemp.toUpperCase()); + sb.append(sTemp.toUpperCase() + " "); } return sb.toString(); } diff --git a/src/main/java/com/zjyr/beidouservice/pojo/vo/ControlAdapterSocketCtx.java b/src/main/java/com/zjyr/beidouservice/pojo/vo/ControlAdapterSocketCtx.java index 700338a..ff00b91 100644 --- a/src/main/java/com/zjyr/beidouservice/pojo/vo/ControlAdapterSocketCtx.java +++ b/src/main/java/com/zjyr/beidouservice/pojo/vo/ControlAdapterSocketCtx.java @@ -1,6 +1,7 @@ package com.zjyr.beidouservice.pojo.vo; import io.netty.channel.ChannelHandlerContext; +import lombok.Builder; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -8,7 +9,7 @@ import lombok.RequiredArgsConstructor; * The type Control adapter socket ctx. */ @Data -@RequiredArgsConstructor +@Builder public class ControlAdapterSocketCtx { /** * The Control adapter id. diff --git a/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/BaseBinaryProtocol.java b/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/BaseBinaryProtocol.java index c2736a9..67eec26 100644 --- a/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/BaseBinaryProtocol.java +++ b/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/BaseBinaryProtocol.java @@ -15,12 +15,13 @@ public class BaseBinaryProtocol { /** * The constant VERSIN. */ - public static int VERSIN = 0; + public static int VERSIN = 1; /** * The constant MIN_LEN. */ public static int MIN_LEN = 39; + public static int MAX_LEN = 1400; /** * The constant START. */ @@ -73,4 +74,9 @@ public class BaseBinaryProtocol { * The Msg content. */ private MessageContent msgContent; + + BaseBinaryProtocol () { + recvMajorId = recvMinorId = sendMajorId = sendMinorId = 0; + cryptoType = 0; + } } diff --git a/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/ControllerStatus.java b/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/ControllerStatus.java new file mode 100644 index 0000000..94756a2 --- /dev/null +++ b/src/main/java/com/zjyr/beidouservice/pojo/vo/binary/ControllerStatus.java @@ -0,0 +1,13 @@ +package com.zjyr.beidouservice.pojo.vo.binary; + +import com.zjyr.beidouservice.common.impl.SensorStatusName; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class ControllerStatus { + private byte[] beidouSignaltrength; + private SensorStatusName wirelessStatus; + private SensorStatusName telphoneStatus; +} diff --git a/src/main/java/com/zjyr/beidouservice/service/ControlDeviceService.java b/src/main/java/com/zjyr/beidouservice/service/ControlDeviceService.java new file mode 100644 index 0000000..fb6ec66 --- /dev/null +++ b/src/main/java/com/zjyr/beidouservice/service/ControlDeviceService.java @@ -0,0 +1,25 @@ +package com.zjyr.beidouservice.service; + +import com.zjyr.beidouservice.pojo.entry.ControlDevice; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +public interface ControlDeviceService { + /** + * Select all list. + * + * @return the list + */ + List selectAll(); + + /** + * Add control device int. + * + * @param device the device + * @return the int + */ + int addControlDevice(@Param("device") ControlDevice device); + + ControlDevice getControllDeviceByAddr(String addr); +} diff --git a/src/main/java/com/zjyr/beidouservice/service/impl/ControlDeviceServiceImpl.java b/src/main/java/com/zjyr/beidouservice/service/impl/ControlDeviceServiceImpl.java new file mode 100644 index 0000000..ad04043 --- /dev/null +++ b/src/main/java/com/zjyr/beidouservice/service/impl/ControlDeviceServiceImpl.java @@ -0,0 +1,43 @@ +package com.zjyr.beidouservice.service.impl; + +import com.zjyr.beidouservice.mapper.ControlDeviceMapper; +import com.zjyr.beidouservice.pojo.entry.ControlDevice; +import com.zjyr.beidouservice.service.ControlDeviceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@Slf4j +public class ControlDeviceServiceImpl implements ControlDeviceService { + @Resource + ControlDeviceMapper controlDeviceMapper; + + @Override + public List selectAll() { + return controlDeviceMapper.selectAll(); + } + + @Override + public ControlDevice getControllDeviceByAddr(String addr) { + for (ControlDevice v : selectAll()) { + if (addr.equals(v.getDeviceAddr())) { + return v; + } + } + return null; + } + + @Override + public int addControlDevice(ControlDevice device) { + ControlDevice dev = getControllDeviceByAddr(device.getDeviceAddr()); + if (dev == null) { + return controlDeviceMapper.addControlDevice(device); + } else { + device.setId(dev.getId()); + } + return 0; + } +}