FidsServer.java

package com.cares.fids.server.comm;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author wangcj
 * @desc
 * @date 2020/12/19 19:38
 **/
@Slf4j
@Component
public class FidsServer {

    private static final Integer port = 9526;

    @Resource
    private FidsServerChannelInit childChannelHandler;

    public void startServer() {
        new Thread(() -> {
            run(port);
        }).start();
    }

    public void run(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(childChannelHandler);
            //绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(port).sync();
            //等待服务监听端口关闭
            if (f.isSuccess()) {
                log.info("FIDS SOCKET 服务端启动成功!port={}", port);
            }
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //退出,释放线程资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

FidsServerChannelInit.java

package com.cares.fids.server.comm;

import com.cares.fids.server.comm.handler.FidsClientPreprocessHandler;
import com.cares.fids.server.comm.handler.FidsHeartbeatHandler;
import com.cares.fids.server.comm.handler.FidsServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


/**
 * @author wangcj
 * @desc
 * @date 2020/12/19 19:25
 **/
@Component
public class FidsServerChannelInit extends ChannelInitializer<SocketChannel> {

    @Resource
    FidsServerHandler fidsServerHandler;

    @Resource
    FidsHeartbeatHandler fidsHeartbeatHandler;

    @Resource
    FidsClientPreprocessHandler fidsClientPreprocessHandler;

    public void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 20));
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(getClass().getClassLoader())));
        pipeline.addLast(fidsClientPreprocessHandler);
        pipeline.addLast(fidsHeartbeatHandler);
        pipeline.addLast(fidsServerHandler);
    }
}

FidsClientPreprocessHandler.java

package com.cares.fids.server.comm.handler;

import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.fids.server.comm.ClientConnectionMap;
import com.cares.fids.server.service.FidsBusinessService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author wangcj
 * @desc
 * @date 2020/12/23 9:35
 **/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsClientPreprocessHandler extends ChannelInboundHandlerAdapter {

    @Resource
    FidsBusinessService fidsBusinessService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        BaseMsg baseMsg = (BaseMsg) msg;
        String clientId = baseMsg.getClientId();
        ClientConnectionMap.addClientConnection(clientId, channel);
        ctx.fireChannelRead(baseMsg);
        String mac = channel.attr(ClientConnectionMap.token).get();
        fidsBusinessService.online(mac);
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        ClientConnectionMap.removeClientConnection(channel);
        String mac = channel.attr(ClientConnectionMap.token).get();
        fidsBusinessService.offline(mac);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        ClientConnectionMap.removeClientConnection(channel);
        ctx.close();
    }
}

FidsHeartbeatHandler.java

package com.cares.fids.server.comm.handler;

import com.cares.acdm.fids.protocol.FidsMsgType;
import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.acdm.fids.protocol.msg.FidsHeartbeatReq;
import com.cares.fids.server.comm.ClientConnectionMap;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author wangcj
 * @desc
 * @date 2020/12/23 11:25
 **/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsHeartbeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        BaseMsg baseMsg = (BaseMsg) msg;
        if (baseMsg.getType().equals(FidsMsgType.FIDS_HEARTBEAT_RSP)) {
            log.info("收到客户端心跳响应");
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            log.info("=====连接空闲,尝试发送心跳=====");
            FidsHeartbeatReq req = new FidsHeartbeatReq();
            req.setType(FidsMsgType.FIDS_HEARTBEAT_REQ);
            ctx.channel().writeAndFlush(req).addListener((ChannelFutureListener) future -> {
                log.info("hearbeat result:" + future.isSuccess() + ",reason:" + future.cause());
                if (!future.isSuccess()) {
                    future.cause().printStackTrace();
                }
            }).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        ClientConnectionMap.removeClientConnection(channel);
        ctx.close();
    }
}

FidsServerHandler.java

package com.cares.fids.server.comm.handler;

import com.cares.acdm.fids.protocol.msg.BaseMsg;
import com.cares.fids.server.comm.ClientConnectionMap;
import com.cares.fids.server.service.FidsBusinessService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author wangcj
 * @desc
 * @date 2020/12/19 19:31
 **/
@Slf4j
@Component
@ChannelHandler.Sharable
public class FidsServerHandler extends SimpleChannelInboundHandler<BaseMsg> {

    @Resource
    private FidsBusinessService fidsBusinessService;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, BaseMsg baseMsg) throws Exception {
        log.info("业务处理");
        fidsBusinessService.processBusiness(baseMsg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        ClientConnectionMap.removeClientConnection(channel);
        ctx.close();
    }
}

package com.cares.fids.server.comm;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class ClientConnectionMap {

    private static final Logger logger = LoggerFactory.getLogger(ClientConnectionMap.class);
    public static ConcurrentHashMap<String, Channel> allClientMap = new ConcurrentHashMap<>();
    public static AttributeKey<String> token = AttributeKey.valueOf("token");

    public static void addClientConnection(String token, Channel channel) {
        channel.attr(ClientConnectionMap.token).set(token);
        if (allClientMap.contains(token)) {
            return;
        }
        allClientMap.put(token, channel);
        logger.info("{} add, token {} ", channel, token);
    }

    public static void addClientConnection(Channel channel) {
        String token = channel.attr(ClientConnectionMap.token).get();
        allClientMap.put(token, channel);
        logger.info("{} add, token {} ", channel, token);
    }

    /**
     * 当客户端掉线,channel被关闭时移除
     *
     * @param channel
     */
    public static void removeClientConnection(Channel channel) {

        String key = channel.attr(token).get();
        if (StringUtils.isNotEmpty(key) && allClientMap.containsKey(key)) {
            allClientMap.remove(key);
            logger.info("{} remove, token {} ", channel, key);
        }

    }

    public static List<Channel> getClientChannel(String token) {
        List<Channel> channels = new ArrayList<>();
        if (StringUtils.isNotEmpty(token)) {
            for (String item : allClientMap.keySet()) {
                if (StringUtils.isNotEmpty(item)) {
                    String itemKey;
                    int index = item.lastIndexOf("_");

                    if (index > -1) {
                        itemKey = item.substring(0, item.lastIndexOf("_"));
                    } else {
                        itemKey = item;
                    }

                    if (itemKey.equals(token)) {
                        channels.add(allClientMap.get(item));
                    }
                }
            }
        }
        return channels;
    }

    public static Channel getClientChannelBytoken(String token) {
        return allClientMap.get(token);
    }

    public static Set<String> keySet() {
        return allClientMap.keySet();
    }
}



JAVA      netty 连接管理

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!