Springboot integrates Netty to realize RPC server detailed explanation process

  • 2021-12-04 18:53:00
  • OfStack

Directory 1. What is RPC? 2. What problems do you need to solve to implement RPC? 1. Communicate communication protocol format RPC request RPC response 2. Serialization mode 3. TCP stick packet, unpack 4. Choice of network communication framework 3. RPC server 4. RPC client summary

1. What is RPC?

RPC (Remote Procedure Call) Remote Procedure Call (RPC) is an inter-process communication method that invokes a service on a remote computer as if it were a local method. The principle and process of its implementation are as follows:

Local processes make local method calls through interfaces. The RPC client sends the called interface name, interface method, method parameters and other information to the RPC server through network communication. The RPC server parses the request, finds the corresponding method implementation according to the interface name, interface method, method parameters and other information, and makes local method call, and then responds the method call result to the RPC client.

2. What problems do you need to solve to implement RPC?

1. Agreed communication protocol format

RPC is divided into client and server, just like HTTP1, we need to define the protocol format of interaction. It mainly includes three aspects:

Request format Response format Serialization mode of data in network communication

RPC Request

@Data
public class RpcRequest {
    /**
     *  Request ID  Used to identify this request to match RPC Response from the server 
     */
    private String requestId;
    /**
     *  Called class ( Interface ) Permission naming 
     */
    private String className;
    /**
     *  The name of the method called 
     */
    private String methodName;
    /**
     *  Method parameter type list 
     */
    private Class<?>[] parameterTypes;
    /**
     *  Method parameter 
     */
    private Object[] parameters;
}

RPC response

@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}

2. Serialization

Serialization mode can use the serialization mode of JDK or some third-party serialization mode, but the serialization mode of JDK is not recommended because of its poor performance. We choose JSON as the serialization protocol, that is, the request and response objects are serialized into JSON strings and then sent to the opposite end, and the opposite end receives the inverse sequence as the corresponding objects. Here, Ali's fastjson is used as the JSON serialization framework.

3. TCP Sticking and Unpacking

TCP is a "stream" protocol. The so-called stream is a string of data without boundaries. You can think of the running water in the river, which is connected into one piece, and there is no dividing line between them. The bottom layer of TCP doesn't know the specific meaning of the upper business data, and it divides packets according to the actual situation of TCP buffer. Therefore, in business, it is considered that a complete packet may be split into multiple packets by TCP for sending, and it is also possible to package multiple small packets into a large data packet for sending, which is called TCP sticky packet and unpacking problem. Sticking and unpacking need application layer programs to solve.

We solve the problem of sticking and unpacking by saving the length of the message body in the header of the request and response. The format of the request and response is as follows:

+--------+----------------+
| Length | Content |
4 bytes Length bytes
+--------+----------------+

4. Choice of network communication framework

For the sake of performance, RPC1 generally chooses asynchronous and non-blocking network communication mode, and NIO, which comes with JDK, has complicated network programming operation. Netty is a network communication framework developed based on NIO, which encapsulates ES90NIO and provides friendly API to the outside world, and has built-in many out-of-the-box components, such as various codec. Therefore, we use Netty as the network communication framework of RPC service.

3. RPC server

RPC is divided into client and server. They have a common service interface API. First, we define an interface HelloService


public interface HelloService {
    String sayHello(String name);
}

Then the server needs to provide an implementation class for the interface, which is annotated with a custom @ RpcService annotation that extends from @ Component, and the annotated class can be managed by the container of Spring.


@RpcService
public class HelloServiceImp implements HelloService {
    @Override
    public String sayHello(String name) {
        return "Hello " + name;
    }
}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
    
}

RPC Server Class

We implemented the ApplicationContextAware interface to retrieve the @ RpcService implementation class from the bean container and store it in our map container.


@Component
@Slf4j
public class RpcServer implements ApplicationContextAware, InitializingBean {
    // RPC Service implementation container 
    private Map<String, Object> rpcServices = new HashMap<>();
    @Value("${rpc.server.port}")
    private int port;
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> services = applicationContext.getBeansWithAnnotation(RpcService.class);
        for (Map.Entry<String, Object> entry : services.entrySet()) {
            Object bean = entry.getValue();
            Class<?>[] interfaces = bean.getClass().getInterfaces();
            for (Class<?> inter : interfaces) {
                rpcServices.put(inter.getName(),  bean);
            }
        }
        log.info(" Loading RPC Quantity of services :{}", rpcServices.size());
    }
 
    @Override
    public void afterPropertiesSet() {
        start();
    }
 
    private void start(){
        new Thread(() -> {
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new IdleStateHandler(0, 0, 60));
                                pipeline.addLast(new JsonDecoder());
                                pipeline.addLast(new JsonEncoder());
                                pipeline.addLast(new RpcInboundHandler(rpcServices));
                            }
                        })
                        .channel(NioServerSocketChannel.class);
                ChannelFuture future = bootstrap.bind(port).sync();
                log.info("RPC  Server startup ,  Listening port :" + port);
                future.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }).start();
 
    }
}

RpcServerInboundHandler handles RPC requests


@Slf4j
public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {
    private Map<String, Object> rpcServices;
 
    public RpcServerInboundHandler(Map<String, Object> rpcServices){
        this.rpcServices = rpcServices;
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" Client Connection Successful ,{}", ctx.channel().remoteAddress());
    }
 
    public void channelInactive(ChannelHandlerContext ctx)   {
        log.info(" Client disconnected ,{}", ctx.channel().remoteAddress());
        ctx.channel().close();
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        RpcRequest rpcRequest = (RpcRequest) msg;
        log.info(" Client request received ,  Request interface :{},  Request method :{}", rpcRequest.getClassName(), rpcRequest.getMethodName());
        RpcResponse response = new RpcResponse();
        response.setRequestId(rpcRequest.getRequestId());
        Object result = null;
        try {
            result = this.handleRequest(rpcRequest);
            response.setResult(result);
        } catch (Exception e) {
            e.printStackTrace();
            response.setSuccess(false);
            response.setErrorMessage(e.getMessage());
        }
        log.info(" Server response :{}", response);
        ctx.writeAndFlush(response);
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info(" Connection exception ");
        ctx.channel().close();
        super.exceptionCaught(ctx, cause);
    }
 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                log.info(" Client has exceeded 60 Seconds unread and written data ,  Close the connection .{}",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
 
    private Object handleRequest(RpcRequest rpcRequest) throws Exception{
        Object bean = rpcServices.get(rpcRequest.getClassName());
        if(bean == null){
            throw new RuntimeException(" No corresponding service found : " + rpcRequest.getClassName());
        }
        Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
        method.setAccessible(true);
        return method.invoke(bean, rpcRequest.getParameters());
    }
}

4. RPC client


/**
 * RPC Client called remotely 
 */
@Slf4j
@Component
public class RpcClient {
    @Value("${rpc.remote.ip}")
    private String remoteIp;
 
    @Value("${rpc.remote.port}")
    private int port;
 
    private Bootstrap bootstrap;
 
    //  Store the result of the call 
    private final Map<String, SynchronousQueue<RpcResponse>> results = new ConcurrentHashMap<>();
 
    public RpcClient(){
 
    }
 
    @PostConstruct
    public void init(){
        bootstrap = new Bootstrap().remoteAddress(remoteIp, port);
        NioEventLoopGroup worker = new NioEventLoopGroup(1);
        bootstrap.group(worker)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new IdleStateHandler(0, 0, 10));
                        pipeline.addLast(new JsonEncoder());
                        pipeline.addLast(new JsonDecoder());
                        pipeline.addLast(new RpcClientInboundHandler(results));
                    }
                });
    }
 
    public RpcResponse send(RpcRequest rpcRequest) {
        RpcResponse rpcResponse = null;
        rpcRequest.setRequestId(UUID.randomUUID().toString());
        Channel channel = null;
        try {
            channel = bootstrap.connect().sync().channel();
            log.info(" Connection establishment ,  Send a request :{}", rpcRequest);
            channel.writeAndFlush(rpcRequest);
            SynchronousQueue<RpcResponse> queue = new SynchronousQueue<>();
            results.put(rpcRequest.getRequestId(), queue);
            //  Blocking waits to get response 
            rpcResponse = queue.take();
            results.remove(rpcRequest.getRequestId());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel != null && channel.isActive()){
                channel.close();
            }
        }
        return rpcResponse;
    }
}

RpcClientInboundHandler handles the server-side response


@Slf4j
public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter {
    private Map<String, SynchronousQueue<RpcResponse>> results;
 
    public RpcClientInboundHandler(Map<String, SynchronousQueue<RpcResponse>> results){
        this.results = results;
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse) msg;
        log.info(" Receive server response :{}", rpcResponse);
        if(!rpcResponse.isSuccess()){
            throw new RuntimeException(" Exception of call result , Exception information :" + rpcResponse.getErrorMessage());
        }
        //  Take out the result container , Will response Put in queue Medium 
        SynchronousQueue<RpcResponse> queue = results.get(rpcResponse.getRequestId());
        queue.put(rpcResponse);
    }
 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state() == IdleState.ALL_IDLE){
                log.info(" Send heartbeat packets ");
                RpcRequest request = new RpcRequest();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        log.info(" Anomaly :{}", cause.getMessage());
        ctx.channel().close();
    }
}

Interface proxy

In order for the client to invoke the remote service as if it were calling local method 1, we need to proxy the interface dynamically.

Proxy class implementation


@Component
public class RpcProxy implements InvocationHandler {
 
    @Autowired
    private RpcClient rpcClient;
 
    @Override
    public Object invoke(Object proxy, Method method, Object[] args){
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);
        rpcRequest.setParameterTypes(method.getParameterTypes());
 
        RpcResponse rpcResponse = rpcClient.send(rpcRequest);
        return rpcResponse.getResult();
    }
}

Implement FactoryBean interface and bring production dynamic proxy class into Spring container management.


@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}
0

Custom ClassPath Scanner, RPC Interface under Scanning Package, Dynamic Production Agent Class, Incorporated into Spring Container Management


@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}
1

@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}
2

JSON codec


/**
 *  Will  RpcRequest  Encoded into a sequence of bytes and sent 
 *  Message format : Length + Content
 * Length Use int Storage , Identifies the length of the message body 
 *
 * +--------+----------------+
 * | Length |  Content       |
 * |  4 Byte  |   Length Bytes   |
 * +--------+----------------+
 */
public class JsonEncoder extends MessageToByteEncoder<RpcRequest> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcRequest rpcRequest, ByteBuf out){
        byte[] bytes = JSON.toJSONBytes(rpcRequest);
        //  Writes the length of the message body to the message header 
        out.writeInt(bytes.length);
        //  Write message body 
        out.writeBytes(bytes);
    }
}

@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}
4

Test

We wrote an Controller for testing


@Data
public class RpcResponse {
    /**
     *  Respond to the corresponding request ID
     */
    private String requestId;
    /**
     *  Identification of whether the call was successful 
     */
    private boolean success = true;
    /**
     *  Call error message 
     */
    private String errorMessage;
    /**
     *  Call result 
     */
    private Object result;
}
5

Calling the controller interface through PostMan
http://localhost: 9998/hello/sayHello? name = Xiaoming

Response: Hello Xiaoming

Summarize

In this paper, a simple RPC with basic concepts is implemented, and the main knowledge points involved are as follows:

Coding and decoding of network communication and communication protocol Serialization and Deserialization of Java Objects Heartbeat detection of communication link Java reflex JDK Dynamic Agent

For the complete code of the project, please refer to:
https://github.com/yinguodong/netty-rpc


Related articles: