SL 651是國家水文局制定的用于水文監(jiān)測(cè)數(shù)據(jù)通信的協(xié)議,它規(guī)定了水文監(jiān)測(cè)數(shù)據(jù)采集與傳輸?shù)母袷?、?nèi)容、傳輸方式等技術(shù)參數(shù),支持通過TCP(安全性低,不推薦)和TLS兩種方式接入設(shè)備。本文重點(diǎn)介紹通過TCP方式接入使用HEX格式傳輸報(bào)文方式的協(xié)議解析。
鏈路維持報(bào)2F
上行報(bào)文:
7E7E01001234567812342F0008020003591011155111036BCA
7E7E00987654321012342F0008027E0822101110074703C3AE
7E7E 起始符SOH(2個(gè)字節(jié))
01 中心站(1個(gè)字節(jié))
0012345678 遙測(cè)站(5個(gè)字節(jié))
1234 密碼(2個(gè)字節(jié))
2F 功能碼(1個(gè)字節(jié))
0008 數(shù)據(jù)長(zhǎng)度(2個(gè)字節(jié),第一個(gè)0表示上行,8表示下行,剩下3位表示16進(jìn)制的數(shù)據(jù)長(zhǎng)度)
02 數(shù)據(jù)起始符(1個(gè)字節(jié))
0003 流水號(hào)(2個(gè)字節(jié))
591011155111 時(shí)間(6個(gè)字節(jié),yyMMddHHmmss)
03 結(jié)束符(1個(gè)字節(jié))
6BCA crc(2個(gè)字節(jié))
7E7E 起始符SOH(2個(gè)字節(jié))
01 中心站(1個(gè)字節(jié))
0012345678 遙測(cè)站地址(5個(gè)字節(jié))
1234 密碼(2個(gè)字節(jié))
30 功能碼(1個(gè)字節(jié))
002B 數(shù)據(jù)長(zhǎng)度(2個(gè)字節(jié),第一個(gè)0表示上行,8表示下行,剩下3位表示16進(jìn)制的數(shù)據(jù)長(zhǎng)度)
02 數(shù)據(jù)起始符(1個(gè)字節(jié))
0003 流水號(hào)(2個(gè)字節(jié))
591011154947 時(shí)間(6個(gè)字節(jié),yyMMddHHmmss)
F1F10012345678 站號(hào)標(biāo)識(shí)符(F1F1)+測(cè)站地址(7個(gè)字節(jié))
48 遙測(cè)站類型(1個(gè)字節(jié))
F0F05910111549 觀測(cè)時(shí)間標(biāo)識(shí)符(F0F0)+時(shí)間(yyMMddHHmm)(7個(gè)字節(jié))
2019000005 當(dāng)前降水量:0.5mm(20要素標(biāo)識(shí)符;19【其中高5位表示數(shù)據(jù)長(zhǎng)度,低3位表示小數(shù)位】表示數(shù)據(jù)報(bào)文幀的長(zhǎng)度字節(jié)為3個(gè)字節(jié)和1位小數(shù)位;000005表示具體數(shù)據(jù))
2619000005 降水量累計(jì)值:0.5mm
392300000127 瞬時(shí)河道水位:0.127m
38121115 電源電壓:11.15V
03 結(jié)束符(1個(gè)字節(jié))
20FA crc(2個(gè)字節(jié))
由示例可看出每條報(bào)文頭部固定為起始符、中心站、遙測(cè)站、密碼;尾部固定為結(jié)束符、crc校驗(yàn)。每條報(bào)文包含的數(shù)據(jù)由功能碼決定。我們?cè)诮馕鰰r(shí)根據(jù)功能碼進(jìn)行對(duì)不同報(bào)文的不同解析。
public class SL651Codec implements ProtocolCodec {
private static final Logger log = LoggerFactory.getLogger(SL651Codec.class);
// 指定協(xié)議支撐信息
@Override
public ProtocolSupport support() {
return new ProtocolSupport(TransportProtocol.TCP)
.name("SL651")
.id("SL651")
.description("水文SL651")
.feature(new ProtocolFeature().keepOnline(true).keepOnlineTimeoutSeconds(180));
}
// 協(xié)議數(shù)據(jù)解析方法,將設(shè)備上報(bào)的消息解析為平臺(tái)設(shè)備消息。
@Override
public void decode(OperatorSupplier supplier, DeviceSession deviceSession, ProtocolMessage<?> message, MessageExporter<DeviceMessage<?>> messageExporter) {
TcpProtocolMessage message1 = (TcpProtocolMessage) message;
ByteBuf payload = message1.getPayload();
byte[] bytes = new byte[payload.readableBytes()];
payload.readBytes(bytes);
String payloadStr = HexStringUtil.bytes2HexStr(bytes);
boolean b = FrameUtil.verifyCRC16Code(HexStringUtil.hexStr2CharArray(payloadStr));
if (!b) {
throw new ServiceException("數(shù)據(jù)校驗(yàn)失敗");
}
FrameReqWrapper frameReqWrapper = SL651Resolver.decodeHex(payloadStr);
FrameReq reqDto = frameReqWrapper.getReqDto();
sendMessage(supplier, reqDto, messageExporter);
// 回復(fù)消息
ByteBuf byteBuf = encodeM24Rsp(frameReqWrapper);
TcpProtocolMessage protocolMessage = new TcpProtocolMessage();
protocolMessage.payload(byteBuf);
deviceSession.send(protocolMessage);
}
private void sendMessage(OperatorSupplier supplier, FrameReq reqDto, MessageExporter<DeviceMessage<?>> messageExporter) {
FrameReqBody body = reqDto.getBody();
String remoteStationAddr = body.getRemoteStationAddr();
HashMap<String, List<FrameReqBodyElement>> elementMap = body.getElementMap();
Set<Map.Entry<String, List<FrameReqBodyElement>>> entries = elementMap.entrySet();
for (Map.Entry<String, List<FrameReqBodyElement>> entry : entries) {
ReportPropertyMessage message = new ReportPropertyMessage();
message.deviceId(remoteStationAddr);
String funCode = entry.getKey();
if ("f4".equals(funCode) || "f5".equals(funCode)) {
// todo 具體場(chǎng)景
}
FrameReqBodyElement first = entry.getValue().getFirst();
message.addProperty(funCode, first.calDoubleVal());
message.timestamp(LocalDateTime.parse(first.getObserveTime()).toInstant(ZoneOffset.UTC).getEpochSecond());
messageExporter.export(message);
}
}
@Override
public void encode(OperatorSupplier supplier, DeviceMessage<?> message, MessageExporter<ProtocolMessage<?>> messageExporter) {
}
// 消息響應(yīng)
private ByteBuf encodeM24Rsp(FrameReqWrapper frameReqWrapper) {
try {
FrameReqHeader header = frameReqWrapper.getReqDto().getHeader();
StringBuilder sb = new StringBuilder();
sb.append(FrameConstant.HEADER_START_HEX).append(header.getRemoteStationAddr())
.append(header.getCenterStationAddr()).append(header.getPwd()).append(header.getFuncCode())
.append(FrameConstant.HEADER_RSP_DOWN_SYMBOL_AND_ZERO_LEN_HEX)
.append(FrameConstant.HEADER_RSP_BODY_END_STX_HEX).append(FrameConstant.HEADER_RSP_BODY_END_EOT_HEX);
sb.append(Crc16Util.crc16(Hex.decodeHex(sb.toString()), false).toUpperCase());
byte[] bytes = DatatypeConverter.parseHexBinary(sb.toString());
return Unpooled.copiedBuffer(bytes);// 將十六進(jìn)制轉(zhuǎn)換為二進(jìn)制發(fā)送
// return Unpooled.copiedBuffer(sb.toString().getBytes());
} catch (Exception e) {
log.info("出現(xiàn)異常了{(lán)}", e.getMessage());
}
return null;
}
}
@Slf4j
public class SL651Resolver {
private static boolean crcVerifyFlag = false;
public static Map<String, Object> resolve(String hexStr) {
// 取得幀頭
return null;
}
public static FrameReqWrapper decodeHex(String frameHexStr) {
FrameReqWrapper wrapper = new FrameReqWrapper();
FrameReq frameReq = new FrameReq();
wrapper.setReqDto(frameReq);
wrapper.setOriginalFrame(frameHexStr);
char[] frame = HexStringUtil.hexStr2CharArray(frameHexStr);
if (crcVerifyFlag && !FrameUtil.verifyCRC16Code(frame)) {
log.warn("sl651-2014 frame crc verify fail>>>" + frameHexStr);
wrapper.setWrapCodeEnum(ServiceCodeEnum.SERVICE_CRC_VERIFY_FAIL);
return wrapper;
}
FrameReqHeader header = decodeHeader(frame);
frameReq.setHeader(header);
int bodyLen = header.getBodyLen().intValue();
if (header.isM3Mode()) {
frameReq.setBody(decodeM3Body(FrameUtil.getM3Body(frame, bodyLen)));
} else {
frameReq.setBody(decodeM124Body(FrameUtil.getM124Body(frame, bodyLen),
SL651FunCode.getByValue(header.getFuncCode())));
}
frameReq.setBodyEndSymbol(FrameUtil.getBodyEndSymbol(frame));
frameReq.setCrcCode(FrameUtil.getCRC16Code(frame));
wrapper.setWrapCodeEnum(ServiceCodeEnum.SERVICE_SUCCESS);
return wrapper;
}
public static FrameReqHeader decodeHeader(char[] frame) {
FrameReqHeader header = new FrameReqHeader();
// 跳過幀起始的固定兩字節(jié)
// 中心站
header.setCenterStationAddr(FrameUtil.getHeaderCenterStationAddr(frame));
// 遙測(cè)站
header.setRemoteStationAddr(FrameUtil.getHeaderRemoteStationAddr(frame));
// 密碼
header.setPwd(FrameUtil.getHeaderPwd(frame));
// 功能碼
header.setFuncCode(FrameUtil.getHeaderFuncCode(frame));
// 報(bào)文正文長(zhǎng)度
header.setBodyLen(FrameUtil.getBodyLen(frame));
// 傳輸是否為M3多包模式
header.setM3Mode(FrameUtil.isM3Mode(frame));
if (header.isM3Mode()) {
// TODO 解析m3模式報(bào)文頭特殊參數(shù)
header.setFrameCnt(0);
header.setFrameSerialNo(null);
}
return header;
}
public static FrameReqBody decodeM3Body(char[] bodyFrame) {
return null;
}
public static FrameReqBody decodeM124Body(char[] bodyFrame, SL651FunCode funcEnum) {
if (funcEnum == null) {
return null;
}
switch (funcEnum) {
// 心跳消息
case F_2F -> {
return decodeM124BodyByHeartBeat(bodyFrame);
}
// 測(cè)試報(bào)文
case F_30 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
// 均勻時(shí)段水文信息 暫不支持
case F_31 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
// 遙測(cè)站定時(shí)報(bào)
case F_32 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
// 遙測(cè)站加報(bào)報(bào)
case F_33 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
// 遙測(cè)站小時(shí)報(bào) todo 重新寫邏輯
case F_34 -> {
return decodeF_34BodyByRegularReport(bodyFrame);
}
// 遙測(cè)站人工置數(shù)報(bào)
case F_35 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
// 遙測(cè)站圖片報(bào)或中心站查詢遙測(cè)站圖片采集信息
case F_36 -> {
return decodeM124BodyByRegularReport(bodyFrame);
}
default -> {
return null;
}
}
}
private static FrameReqBody decodeM124BodyByRegularReport(char[] bodyFrame) {
FrameReqBody body = new FrameReqBody();
body.setSerialNo(FrameM124Util.getBodySerialNo(bodyFrame));
body.setSendTime(FrameM124Util.getBodySendTime(bodyFrame));
body.setRemoteStationAddr(FrameM124Util.getBodyRemoteStationAddr(bodyFrame));
body.setRemoteStationTypeCode(FrameM124Util.getRemoteStationTypeCode(bodyFrame));
body.setObserveTime(FrameM124Util.getBodyObserveTime(bodyFrame));
// TODO 解析自定義擴(kuò)展參數(shù)
body.setElementMap(FrameUtil.getBodyElement(FrameM124Util.getBodyElementByRegularReport(bodyFrame)));
return body;
}
private static FrameReqBody decodeF_34BodyByRegularReport(char[] bodyFrame) {
FrameReqBody body = new FrameReqBody();
body.setSerialNo(FrameM124Util.getBodySerialNo(bodyFrame));
body.setSendTime(FrameM124Util.getBodySendTime(bodyFrame));
body.setRemoteStationAddr(FrameM124Util.getBodyRemoteStationAddr(bodyFrame));
body.setRemoteStationTypeCode(FrameM124Util.getRemoteStationTypeCode(bodyFrame));
body.setObserveTime(FrameM124Util.getBodyObserveTime(bodyFrame));
// TODO 解析自定義擴(kuò)展參數(shù)
body.setElementMap(FrameUtil.getBodyElementQiZhu(FrameM124Util.getF_34BodyElementByRegularReport(bodyFrame)));
return body;
}
private static FrameReqBody decodeM124BodyByHeartBeat(char[] bodyFrame) {
FrameReqBody body = new FrameReqBody();
body.setSerialNo(FrameM124Util.getBodySerialNo(bodyFrame));
body.setSendTime(FrameM124Util.getBodySendTime(bodyFrame));
return body;
}
public static void main(String[] args) {
String test = "7E7E05001122334403E8340068020033170718110014F1F1001122334448F0F01707181005F4600500000014FFFFFFFFFF0000F0F017071811002619000040F0F01707181005F5C0000C000C001C00310031FFFFFFFFFFFFFFFFFFFF00310031F0F0170718110039230001049020190000403812109903F502";
FrameReqWrapper frameReqWrapper = decodeHex(test);
FrameReq reqDto = frameReqWrapper.getReqDto();
System.out.println("reqDto = " + reqDto);
HashMap<String, List<FrameReqBodyElement>> elementMap = reqDto.getBody().getElementMap();
System.out.println("elementMap = " + elementMap);
}
}
以上便是我們對(duì)水文SL651協(xié)議的解析。如有出入之處歡迎大家指導(dǎo)交流。