1. 增加协议发送接口

2. 增加协议接收功能
This commit is contained in:
HuangXin 2023-08-12 16:12:32 +08:00
parent 38f8973963
commit 132294ae8a
13 changed files with 263 additions and 32 deletions

View File

@ -3,10 +3,14 @@ package com.zjyr.beidouservice.adapter.impl.netty;
import com.zjyr.beidouservice.adapter.impl.netty.decode.YuanRongProtocolDecode;
import com.zjyr.beidouservice.adapter.impl.netty.encode.ProtocolStartEncode;
import com.zjyr.beidouservice.adapter.impl.netty.encode.YuanRongProtocolEncode;
import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@ -17,21 +21,34 @@ import java.util.concurrent.TimeUnit;
*
* @param <T> the type parameter
*/
@EqualsAndHashCode(callSuper = true)
@Component
@RequiredArgsConstructor
@Data
public class ChannelInit<T> extends ChannelInitializer<SocketChannel> {
private static final int MAX_FRAME_LENGTH = 1024 * BaseBinaryProtocol.MAX_LEN; //最大长度
private static final int LENGTH_FIELD_LENGTH = 2; //长度字段所占的字节数
private static final int LENGTH_FIELD_OFFSET = BaseBinaryProtocol.START.length(); //长度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = BaseBinaryProtocol.START.length();
/**
* The Message handler.
*/
@Resource
private MessageHandler messageHandler;
private MessageHandler<T> messageHandler;
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
.addLast("decode",
new YuanRongProtocolDecode())
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH,
LENGTH_FIELD_OFFSET,
LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT,
INITIAL_BYTES_TO_STRIP))
.addLast("decodeProtocol", new YuanRongProtocolDecode())
.addLast("startFlag", new ProtocolStartEncode(2))
.addLast("encode", new YuanRongProtocolEncode<T>())
.addLast("message", messageHandler);

View File

@ -1,5 +1,6 @@
package com.zjyr.beidouservice.adapter.impl.netty;
import com.zjyr.beidouservice.pojo.vo.binary.MessageContent;
import jakarta.annotation.PreDestroy;
/**
@ -13,6 +14,8 @@ public interface ISocketServer {
*/
void start() throws Exception;
<T> int sendData(T data, Long devId);
/**
* Destory.
*

View File

@ -1,12 +1,14 @@
package com.zjyr.beidouservice.adapter.impl.netty;
import com.zjyr.beidouservice.common.impl.ControlDeviceTypeName;
import com.zjyr.beidouservice.mapper.ControlDeviceMapper;
import com.zjyr.beidouservice.misc.HelperUtils;
import com.zjyr.beidouservice.pojo.entry.ControlDevice;
import com.zjyr.beidouservice.pojo.vo.ControlAdapterSocketCtx;
import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol;
import com.zjyr.beidouservice.pojo.vo.binary.ControllerStatus;
import com.zjyr.beidouservice.pojo.vo.binary.HeartProtocol;
import com.zjyr.beidouservice.pojo.vo.binary.MessageContent;
import com.zjyr.beidouservice.service.impl.ControlDeviceServiceImpl;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -34,11 +36,12 @@ public class MessageHandler<T> extends SimpleChannelInboundHandler<BaseBinaryPro
* The constant ctxMap.
*/
public static ConcurrentHashMap<Long, ControlAdapterSocketCtx> ctxMap = new ConcurrentHashMap<>();
/**
* The Control device mapper.
*/
@Resource
private ControlDeviceMapper controlDeviceMapper;
private ControlDeviceServiceImpl controlDeviceService;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@ -61,13 +64,15 @@ public class MessageHandler<T> extends SimpleChannelInboundHandler<BaseBinaryPro
}
@Override
public void channelRead0(ChannelHandlerContext ctx, BaseBinaryProtocol message) throws Exception {
//获取到线程池eventLoop添加线程执行
ctx.channel().eventLoop().execute(() -> {
//长时间操作不至于长时间的业务操作导致Handler阻塞
//log.info("{}:: Receive Message: {}", ctx.channel().id(), HelperUtils.bytesToHexString(message
// .getStart()));
});
public void channelRead0(ChannelHandlerContext ctx, BaseBinaryProtocol<T> message) throws Exception {
if (message.getMsgContent().getMsgBody() instanceof ControllerStatus status) {
log.info("{}:: Receive Message: {}",
ctx.channel().id(),
HelperUtils.bytesToHexString(status.getBeidouSignaltrength()));
} else {
log.info("{}:: Receive Message: {}", ctx.channel().id(), "");
}
}
@Override
@ -75,16 +80,23 @@ public class MessageHandler<T> extends SimpleChannelInboundHandler<BaseBinaryPro
InetSocketAddress sa = (InetSocketAddress) ctx.channel().remoteAddress();
//List<ControlDevice> list = controlDeviceMapper.selectAll();
log.info("{}:: Connected <-- {}:{}", ctx.channel().id(), sa.getAddress().getHostAddress(), sa.getPort());
controlDeviceMapper.addControlDevice(ControlDevice.builder()
ControlDevice dev = ControlDevice.builder()
.deviceType(ControlDeviceTypeName.DEVICE_SOCKET_TCP)
.deviceAddr(sa.getAddress().getHostAddress())
.build());
.build();
controlDeviceService.addControlDevice(dev);
if (ctxMap.get(dev.getId()) != null) {
ctxMap.remove(dev.getId());
} else {
ctxMap.put(dev.getId(), ControlAdapterSocketCtx.builder().controlAdapterId(dev.getId()).ctx(ctx).build());
}
MessageContent<HeartProtocol> msgCont = MessageContent.<HeartProtocol>builder()
.msgBody(new HeartProtocol())
.build();
BaseBinaryProtocol<HeartProtocol> h = BaseBinaryProtocol.<HeartProtocol>builder().msgContent(msgCont).build();
ctx.writeAndFlush(h);
ctx.writeAndFlush(BaseBinaryProtocol.<HeartProtocol>builder().msgContent(msgCont).build());
super.channelActive(ctx);
}
@ -92,7 +104,22 @@ public class MessageHandler<T> extends SimpleChannelInboundHandler<BaseBinaryPro
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress sa = (InetSocketAddress) ctx.channel().remoteAddress();
log.info("{}:: Disonnected <-- {}", ctx.channel().id(), sa.getAddress().getHostAddress());
for (ConcurrentHashMap.Entry<Long, ControlAdapterSocketCtx> entry : ctxMap.entrySet()) {
if (entry.getValue().getCtx().channel().id() == ctx.channel().id()) {
ctxMap.remove(entry.getKey());
}
}
super.channelActive(ctx);
ctx.close();
}
public <E> void channelSendData(E proMsg, Long devId) {
ControlAdapterSocketCtx ctx = ctxMap.get(devId);
MessageContent<E> msg = MessageContent.<E>builder().msgBody(proMsg).build();
if (ctx != null) {
ctx.getCtx().channel().writeAndFlush(BaseBinaryProtocol.<E>builder().msgContent(msg).build());
}
}
}

View File

@ -1,5 +1,11 @@
package com.zjyr.beidouservice.adapter.impl.netty.decode;
import com.zjyr.beidouservice.common.CommonEnumHandler;
import com.zjyr.beidouservice.common.impl.ControlCommandName;
import com.zjyr.beidouservice.common.impl.SensorStatusName;
import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol;
import com.zjyr.beidouservice.pojo.vo.binary.ControllerStatus;
import com.zjyr.beidouservice.pojo.vo.binary.MessageContent;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
@ -18,9 +24,61 @@ import java.util.List;
@Slf4j
public class YuanRongProtocolDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
// cache receive data
//receiveBuffer.writeBytes(buffer.array());
log.info(ByteBufUtil.prettyHexDump(buffer));
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf buf,
List<Object> list) throws Exception {
log.info("\n{}", ByteBufUtil.prettyHexDump(buf));
short msgLength = buf.readShort();
byte version = buf.readByte();
int recvMajorId = buf.readInt();
int recvMinorId = buf.readInt();
int sendMajorId = buf.readInt();
Integer sendMinorId = buf.readInt();
byte cryptCytp = buf.readByte();
Integer timeStamp = buf.readInt();
Integer statusCode = buf.readInt();
byte msgType = buf.readByte();
short msgSize = buf.readShort();
ControlCommandName cmd = CommonEnumHandler.codeOf(ControlCommandName.class, msgType);
if (cmd != null) {
switch (cmd) {
case COMMAND_REPORT_HEART -> {
byte[] beidouSignal = new byte[10];
buf.readBytes(beidouSignal, 0, 10);
SensorStatusName wireStatus = CommonEnumHandler.codeOf(SensorStatusName.class, buf.readByte());
SensorStatusName phoneStatus = CommonEnumHandler.codeOf(SensorStatusName.class, buf.readByte());
ControllerStatus status = ControllerStatus.builder()
.beidouSignaltrength(beidouSignal)
.wirelessStatus(wireStatus)
.telphoneStatus(phoneStatus)
.build();
MessageContent<ControllerStatus> msgCtx = MessageContent.<ControllerStatus>builder()
.msgType(msgType)
.msgSize(msgSize)
.msgBody(status)
.build();
list.add(BaseBinaryProtocol.<ControllerStatus>builder()
.msgLength(msgLength)
.version(version)
.recvMajorId(recvMajorId)
.recvMinorId(recvMinorId)
.sendMajorId(sendMajorId)
.sendMinorId(sendMinorId)
.cryptoType(cryptCytp)
.timeStamp(timeStamp)
.statusCode(statusCode)
.msgContent(msgCtx)
.build());
}
case COMMAND_REPORT_SENSOR -> {
}
case COMMAND_REPORT_QUERY_SENSOR -> {
}
default -> log.error("Unsupport Command: {}({})", cmd, msgType);
}
}
}
}

View File

@ -34,14 +34,14 @@ public class YuanRongProtocolEncode<T> extends MessageToByteEncoder<BaseBinaryPr
T msgBody = baseBinaryProtocol.getMsgContent().getMsgBody();
byteBuf.writeByte(BaseBinaryProtocol.VERSIN); // 协议版本号
byteBuf.writeInt(0); // 接收设备主ID
byteBuf.writeInt(0); // 接收设备次ID
byteBuf.writeInt(0); // 发送设备主ID
byteBuf.writeInt(0); // 发送设备次ID
byteBuf.writeByte(0); // 协议加密选项
byteBuf.writeByte(BaseBinaryProtocol.VERSIN & 0xFF); // 协议版本号
byteBuf.writeInt(baseBinaryProtocol.getRecvMajorId()); // 接收设备主ID
byteBuf.writeInt(baseBinaryProtocol.getRecvMinorId()); // 接收设备次ID
byteBuf.writeInt(baseBinaryProtocol.getSendMajorId()); // 发送设备主ID
byteBuf.writeInt(baseBinaryProtocol.getSendMinorId()); // 发送设备次ID
byteBuf.writeByte(baseBinaryProtocol.getCryptoType()); // 协议加密选项
byteBuf.writeInt((int) (System.currentTimeMillis() / 1000)); // UTC时间戳单位秒协议发送时的UTC时间
byteBuf.writeInt(0); // 消息状态码
byteBuf.writeInt(baseBinaryProtocol.getStatusCode()); // 消息状态码
if (msgBody instanceof HeartProtocol) {
byteBuf.writeByte(ControlCommandName.COMMAND_HEART.getValue().byteValue()); // 消息类型

View File

@ -3,6 +3,8 @@ package com.zjyr.beidouservice.adapter.impl.netty.impl;
import com.zjyr.beidouservice.adapter.impl.netty.ChannelInit;
import com.zjyr.beidouservice.adapter.impl.netty.ISocketServer;
import com.zjyr.beidouservice.config.NettySocketConfigure;
import com.zjyr.beidouservice.pojo.vo.binary.BaseBinaryProtocol;
import com.zjyr.beidouservice.pojo.vo.binary.MessageContent;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@ -81,4 +83,10 @@ public class TcpServer implements ISocketServer {
public void destory() {
}
@Override
public <T> int sendData(T data, Long devId) {
channelInit.getMessageHandler().channelSendData(data, devId);
return 0;
}
}

View File

@ -0,0 +1,30 @@
package com.zjyr.beidouservice.common.impl;
import com.zjyr.beidouservice.common.EnumerationBase;
public enum SensorStatusName implements EnumerationBase {
WIRELESS_NOEXISTS(0, "WIRELESS_NOEXISTS"),
WIRELESS_NORMAL(1, "WIRELESS_NORMAL"),
WIRELESS_EXCEPTION(2, "WIRELESS_EXCEPTION"),
;
private final Integer code;
private final String desc;
SensorStatusName(int val, String desc) {
this.code = val;
this.desc = desc;
}
@Override
public Integer getValue() {
return this.code;
}
@Override
public String getDescription() {
return this.desc;
}
}

View File

@ -18,7 +18,7 @@ public class HelperUtils {
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
sb.append(sTemp.toUpperCase() + " ");
}
return sb.toString();
}

View File

@ -1,6 +1,7 @@
package com.zjyr.beidouservice.pojo.vo;
import io.netty.channel.ChannelHandlerContext;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
@ -8,7 +9,7 @@ import lombok.RequiredArgsConstructor;
* The type Control adapter socket ctx.
*/
@Data
@RequiredArgsConstructor
@Builder
public class ControlAdapterSocketCtx {
/**
* The Control adapter id.

View File

@ -15,12 +15,13 @@ public class BaseBinaryProtocol<T> {
/**
* The constant VERSIN.
*/
public static int VERSIN = 0;
public static int VERSIN = 1;
/**
* The constant MIN_LEN.
*/
public static int MIN_LEN = 39;
public static int MAX_LEN = 1400;
/**
* The constant START.
*/
@ -73,4 +74,9 @@ public class BaseBinaryProtocol<T> {
* The Msg content.
*/
private MessageContent<T> msgContent;
BaseBinaryProtocol () {
recvMajorId = recvMinorId = sendMajorId = sendMinorId = 0;
cryptoType = 0;
}
}

View File

@ -0,0 +1,13 @@
package com.zjyr.beidouservice.pojo.vo.binary;
import com.zjyr.beidouservice.common.impl.SensorStatusName;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class ControllerStatus {
private byte[] beidouSignaltrength;
private SensorStatusName wirelessStatus;
private SensorStatusName telphoneStatus;
}

View File

@ -0,0 +1,25 @@
package com.zjyr.beidouservice.service;
import com.zjyr.beidouservice.pojo.entry.ControlDevice;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface ControlDeviceService {
/**
* Select all list.
*
* @return the list
*/
List<ControlDevice> selectAll();
/**
* Add control device int.
*
* @param device the device
* @return the int
*/
int addControlDevice(@Param("device") ControlDevice device);
ControlDevice getControllDeviceByAddr(String addr);
}

View File

@ -0,0 +1,43 @@
package com.zjyr.beidouservice.service.impl;
import com.zjyr.beidouservice.mapper.ControlDeviceMapper;
import com.zjyr.beidouservice.pojo.entry.ControlDevice;
import com.zjyr.beidouservice.service.ControlDeviceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class ControlDeviceServiceImpl implements ControlDeviceService {
@Resource
ControlDeviceMapper controlDeviceMapper;
@Override
public List<ControlDevice> selectAll() {
return controlDeviceMapper.selectAll();
}
@Override
public ControlDevice getControllDeviceByAddr(String addr) {
for (ControlDevice v : selectAll()) {
if (addr.equals(v.getDeviceAddr())) {
return v;
}
}
return null;
}
@Override
public int addControlDevice(ControlDevice device) {
ControlDevice dev = getControllDeviceByAddr(device.getDeviceAddr());
if (dev == null) {
return controlDeviceMapper.addControlDevice(device);
} else {
device.setId(dev.getId());
}
return 0;
}
}