FidsServer.java
package com.cares.fids.server.comm;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author wangcj
* @desc
* @date 2020/12/19 19:38
**/
@Slf4j
@Component
public class FidsServer {
private static final Integer port = 9526;
@Resource
private FidsServerChannelInit childChannelHandler;
public void startServer() {
new Thread(() -> {
run(port);
}).start();
}
public void run(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(childChannelHandler);
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();
//等待服务监听端口关闭
if (f.isSuccess()) {
log.info("FIDS SOCKET 服务端启动成功!port={}", port);
}
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//退出,释放线程资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
FidsServerChannelInit.java
package com.cares.fids.server.comm;
import com.cares.fids.server.comm.handler.FidsClientPreprocessHandler;
import com.cares.fids.server.comm.handler.FidsHeartbeatHandler;
import com.cares.fids.server.comm.handler.FidsServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author wangcj
* @desc
* @date 2020/12/19 19:25
**/
@Component
public class FidsServerChannelInit extends ChannelInitializer<SocketChannel> {
@Resource
FidsServerHandler fidsServerHandler;
@Resource
FidsHeartbeatHandler fidsHeartbeatHandler;
@Resource
FidsClientPreprocessHandler fidsClientPreprocessHandler;
public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 20));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(getClass().getClassLoader())));
pipeline.addLast(fidsClientPreprocessHandler);
pipeline.addLast(fidsHeartbeatHandler);
pipeline.addLast(fidsServerHandler);
}
}
FidsClientPreprocessHandler.java
package com.cares.fids.server.comm.handler;
import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.fids.server.comm.ClientConnectionMap;
import com.cares.fids.server.service.FidsBusinessService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author wangcj
* @desc
* @date 2020/12/23 9:35
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsClientPreprocessHandler extends ChannelInboundHandlerAdapter {
@Resource
FidsBusinessService fidsBusinessService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
BaseMsg baseMsg = (BaseMsg) msg;
String clientId = baseMsg.getClientId();
ClientConnectionMap.addClientConnection(clientId, channel);
ctx.fireChannelRead(baseMsg);
String mac = channel.attr(ClientConnectionMap.token).get();
fidsBusinessService.online(mac);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
ClientConnectionMap.removeClientConnection(channel);
String mac = channel.attr(ClientConnectionMap.token).get();
fidsBusinessService.offline(mac);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
ClientConnectionMap.removeClientConnection(channel);
ctx.close();
}
}
FidsHeartbeatHandler.java
package com.cares.fids.server.comm.handler;
import com.cares.acdm.fids.protocol.FidsMsgType;
import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.acdm.fids.protocol.msg.FidsHeartbeatReq;
import com.cares.fids.server.comm.ClientConnectionMap;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author wangcj
* @desc
* @date 2020/12/23 11:25
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
BaseMsg baseMsg = (BaseMsg) msg;
if (baseMsg.getType().equals(FidsMsgType.FIDS_HEARTBEAT_RSP)) {
log.info("收到客户端心跳响应");
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
log.info("=====连接空闲,尝试发送心跳=====");
FidsHeartbeatReq req = new FidsHeartbeatReq();
req.setType(FidsMsgType.FIDS_HEARTBEAT_REQ);
ctx.channel().writeAndFlush(req).addListener((ChannelFutureListener) future -> {
log.info("hearbeat result:" + future.isSuccess() + ",reason:" + future.cause());
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
}).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
ClientConnectionMap.removeClientConnection(channel);
ctx.close();
}
}
FidsServerHandler.java
package com.cares.fids.server.comm.handler;
import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.fids.server.comm.ClientConnectionMap;
import com.cares.fids.server.service.FidsBusinessService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author wangcj
* @desc
* @date 2020/12/19 19:31
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
@Resource
private FidsBusinessService fidsBusinessService;
@Override
protected void channelRead0(ChannelHandlerContext ctx, BaseMsg baseMsg) throws Exception {
log.info("业务处理");
fidsBusinessService.processBusiness(baseMsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
ClientConnectionMap.removeClientConnection(channel);
ctx.close();
}
}
package com.cares.fids.server.comm;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ClientConnectionMap {
private static final Logger logger = LoggerFactory.getLogger(ClientConnectionMap.class);
public static ConcurrentHashMap<String, Channel> allClientMap = new ConcurrentHashMap<>();
public static AttributeKey<String> token = AttributeKey.valueOf("token");
public static void addClientConnection(String token, Channel channel) {
channel.attr(ClientConnectionMap.token).set(token);
if (allClientMap.contains(token)) {
return;
}
allClientMap.put(token, channel);
logger.info("{} add, token {} ", channel, token);
}
public static void addClientConnection(Channel channel) {
String token = channel.attr(ClientConnectionMap.token).get();
allClientMap.put(token, channel);
logger.info("{} add, token {} ", channel, token);
}
/**
* 当客户端掉线,channel被关闭时移除
*
* @param channel
*/
public static void removeClientConnection(Channel channel) {
String key = channel.attr(token).get();
if (StringUtils.isNotEmpty(key) && allClientMap.containsKey(key)) {
allClientMap.remove(key);
logger.info("{} remove, token {} ", channel, key);
}
}
public static List<Channel> getClientChannel(String token) {
List<Channel> channels = new ArrayList<>();
if (StringUtils.isNotEmpty(token)) {
for (String item : allClientMap.keySet()) {
if (StringUtils.isNotEmpty(item)) {
String itemKey;
int index = item.lastIndexOf("_");
if (index > -1) {
itemKey = item.substring(0, item.lastIndexOf("_"));
} else {
itemKey = item;
}
if (itemKey.equals(token)) {
channels.add(allClientMap.get(item));
}
}
}
}
return channels;
}
public static Channel getClientChannelBytoken(String token) {
return allClientMap.get(token);
}
public static Set<String> keySet() {
return allClientMap.keySet();
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!