常用的接口之间的通信协议有Tcp和Http的以及webService的,这里使用介绍一下如何使用Mina框架开发一个Tcp协议的通信接口:sendRenChuangMessage:
@Component("sendRenChuangMessage")
@Scope("prototype")
public class SendRenChuangMessage {
protected Logger logger = LoggerFactory.getLogger(getClass());
/**
* 超时时间
*/
@Value("${renchuang.socket-keepAliveRequestTimeout}")
private String connectTimeOut;
/**
* 请求处理类
*/
@Autowired
@Qualifier("renChuangMessageHandler")
private IoHandler serverProtocolHandler;
/**
* 编解码工厂类
*/
@Autowired
@Qualifier("renChuangCodec")
private ProtocolCodecFactory factory;
/**
* 服务端主机IP
*/
@Value("${renChuang.IP}")
private String host;
/**
* 服务端端口号
*/
@Value("${renChuang.port}")
private String port;
private String ResultMsg;
/**
* 连接,调用“缴费销账”接口
*
* @param reqXml “缴费销账”接口的请求报文
*/
public void sendMsg(String reqXml, Exchange exchange) {
Object obj = new Object();
logger.info("调用缴费销账接口,开始!");
logger.info("调用缴费销账接口,请求地址:{},请求端口:{}", host, port);
IoConnector conn = new NioSocketConnector();
// 设置链接超时时间
conn.setConnectTimeoutMillis(30000L);
// 添加过滤器
// conn.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
conn.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
// 添加业务处理handler
conn.setHandler(serverProtocolHandler);
IoSession session = null;
try {
ConnectFuture future = conn.connect(new InetSocketAddress(host, Integer.parseInt(port)));// 创建连接
future.awaitUninterruptibly();// 等待连接创建完成
session = future.getSession();// 获得session
//保存“锁”对象,用于在RenChuangDecoder中唤醒这个线程
Long sessionId = session.getId();
LockUtil.hashMap.put(sessionId,obj);
//发送请求报文
WriteFuture writeFuture = session.write(reqXml);// 发送消息
// 等待结果返回,如果超过指定时间就超时了
synchronized (obj) {
try {
obj.wait(Integer.parseInt(connectTimeOut) * 1000L);
} catch (InterruptedException e) {
// 记录日志
logger.error("wait timeout " + e.getMessage());
}
}
} catch (Exception e) {
logger.info("调用充值接口发送异常:{}", e);
return;
}
// 输出结果信息
Message mes = exchange.getOut();
exchange.getOut().copyFrom(exchange.getIn());
mes.setBody(RechargeResultUtil.rechargeResultMap.get(session.getId()), String.class);
session.close(true);
}
RenChuangEncoder:
- @Component
- public class RenChuangEncoder extends ProtocolEncoderAdapter {
- private Logger logger = LoggerFactory.getLogger(getClass());
- @Override
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
- logger.debug("RenChuangEncoder In. Msg:{}", message);
- String value = (String) message;
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- buffer.put(value.getBytes("GBK"));
- buffer.flip();
- out.write(buffer);
- logger.debug("RenChuangEncoder Out.");
- }
- }
复制代码
RenChuangDecoder:
- @Component
- public class RenChuangDecoder extends CumulativeProtocolDecoder {
- private Logger logger = LoggerFactory.getLogger(getClass());
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
- int size = 0;
- byte[] sizes = null;
- try {
- int fullSize = in.remaining();
- logger.info("仁创返回报文的所有字节数为:{}",fullSize);
- // <EP>00000244
- if (in.remaining() >= 12) {// 有数据时,超过“报文头长度时”,读取82字节判断消息长度
- sizes = new byte[4];
- in.mark();// 标记当前位置,以便reset
- //读取报文的前4个字节存入这个字节数组中
- in.get(sizes);
- //报文长度
- size = Integer.parseInt(new String(sizes));
- byte[] rechargeResultCodeByte = new byte[8];
- in.get(rechargeResultCodeByte);
- //得到“交易结果码”
- String rechargeResultCode = new String(rechargeResultCodeByte);
- // //这种情况说明返回“报文头”中的“交易结果说明”不是空的,并且是50位长
- // if (!StringUtil.equals("10000000", rechargeResultCode)) {
- // byte[] rechargeResultDescByte = new byte[50];
- // in.get(rechargeResultDescByte);
- // //交易结果说明
- // String rechargeResultDesc = new String(rechargeResultCodeByte);
- // logger.info("交易结果说明:{},长度:{}", size, rechargeResultCodeByte.length);
- // }
- logger.info("接受到报文体长度:{},交易结果码:{}", size,rechargeResultCode);
- if (size > in.remaining()) {// 如果消息内容不够,则重置,相当于不读取size
- logger.info("消息内容不全. size:{}, receveSize:{}", size, in.remaining());
- in.reset();
- return false;// 接收新数据,以拼凑成完整数据
- } else {
- // in.flip();
- in.clear();
- byte[] bytes = new byte[fullSize];
- in.get(bytes, 0, fullSize);
- out.write(new String(bytes));
- logger.info("RenChuangDecoder 返回的字符:{}",new String(bytes));
- if (in.remaining() > 0) {
- return true;
- }
- }
- }
- }catch (NumberFormatException e){
- logger.info("报文长度转换异常,报文长度字符串sizes:{}",new String(sizes));
- }catch (Exception e) {
- logger.error("仁创接受消息解码失败, ByteBuffer:{}", in, e);
- }
- return false;// 处理成功,让父类进行接收下个包
- }
- }
复制代码
|