当前位置:网站首页>Rpc-bdy (2) - register multiple services

Rpc-bdy (2) - register multiple services

2022-07-23 05:43:00 Hash..

RPC-BDY(2)

An early bird should not be a caged beast

-2022.7.14



Preface

In the simplest implementation of the previous section , Only one service can be registered , Now implement the service and registration separately , Guarantee to provide multiple services


One 、 Service registry

1. Service registry interface
Through the function of registering services and obtaining services


/** *  Service registration interface  */
public interface ServiceRegistry {
    
    // Registration method 
    public <T> void registry(T service);
    // Access method 
    public Object getService(String serviceName);
}

2. Service registry implementation class


/** *  Service registration implementation class  */
public class DefaultServiceRegistry implements ServiceRegistry{
    
    private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);
    //ConcurrentHashMap Thread safety 
    final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
    final Set<String> serviceSet = ConcurrentHashMap.newKeySet();

    @Override
    public <T> void registry(T service) {
    
        // Add method name to set in 
        String serviceName = service.getClass().getCanonicalName();
        if(serviceSet.contains(serviceName)) return;
        serviceSet.add(serviceName);
        // Get the interface implemented by the class 
        Class<?>[] interfaces = service.getClass().getInterfaces();
        if (interfaces.length==0) {
    
            throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
        }
        for(Class<?> i : interfaces) {
    
            // The name of the interface : service 
            serviceMap.put(i.getCanonicalName(), service);
        }
        logger.info(" Interface to : {}  Registration service : {}", interfaces, serviceName);
    }

    @Override
    // From here serviceName It represents the interface name 
    public Object getService(String serviceName) {
    
        // stay map Find the service corresponding to the name in 
        Object service = serviceMap.get(serviceName);
        if(service == null) {
    
            throw new RpcException(RpcError.SERVICE_NOT_FOUND);
        }
        return service;
    }
}

Two 、 Request processing

1. Request processing class

/** *  Request processing class  */
public class RequestHandler {
    
    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

    //request Request information ,service Object after service instantiation 
    public Object handle(RpcRequest rpcRequest, Object service) {
    
        // Reflection call 
        try {
    
            // Pass in the method name and parameter type to find the corresponding method 
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            logger.info(" service :{}  Successfully called method :{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
            //service: Instantiated object 
            return method.invoke(service, rpcRequest.getParameters());
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
    
            logger.error(" An error occurred while calling or sending :", e);
        }
        return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
    }
}

2. Request processing thread

/** *  Request processing thread  */
public class RequestHandlerThread implements Runnable{
    
    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

    private Socket socket;
    private RequestHandler requestHandler;
    private ServiceRegistry serviceRegistry;

    public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
    
        this.socket = socket;
        this.requestHandler = requestHandler;
        this.serviceRegistry = serviceRegistry;
    }


    @Override
    public void run() {
    
        try {
    
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Object service = serviceRegistry.getService(rpcRequest.getInterfaceName());
            Object handle = requestHandler.handle(rpcRequest, service);
            objectOutputStream.writeObject(RpcResponse.success(handle));
            objectOutputStream.flush();
        } catch (IOException | ClassNotFoundException e) {
    
            logger.error(" An error occurred while calling or sending :", e);
        }

    }
}

3、 ... and 、 Service class and test class

1. Service


public class RpcServer  {
    
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 50;
    private static final int KEEP_ALIVE_TIME = 60;
    private static final int BLOCKING_QUEUE_CAPACITY = 100;
    private final ExecutorService threadPool;
    private RequestHandler requestHandler = new RequestHandler();
    private final ServiceRegistry serviceRegistry;
    // Creating a thread pool 
    // Put the registry on the server 
    public RpcServer(ServiceRegistry serviceRegistry) {
    
        this.serviceRegistry = serviceRegistry;
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory);
    }

    // Opening service 
    public void start(int port){
    
        try {
    
            ServerSocket serverSocket = new ServerSocket(port);
            logger.info(" Server startup ……");
            Socket socket; while((socket = serverSocket.accept()) != null) {
    
                logger.info(" Consumers connect : {}:{}", socket.getInetAddress(), socket.getPort());
                threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
            }
            threadPool.shutdown();
        } catch (IOException e) {
    
            logger.error(" An error occurred while the server was starting :", e);
        }
    }

}

2. Test class


public class TestSerer2 {
    
    public static void main(String[] args) {
    
        HelloService helloService = new HelloServiceImpl();
        ServiceRegistry registry = new DefaultServiceRegistry();
        registry.registry(helloService);
        RpcServer rpcServer = new RpcServer(registry);
        rpcServer.start(9001);
    }
}

Four 、 knowledge

1.ConcurrentHashMap

ConcurrentHashmap The way to ensure thread safety : Optimism lock +Sysnchronized

2.socket and serverSocket

serversocket: To implement a server application , Different approaches are needed . The server needs to be on standby , Because I don't know when the client will send a request , here , We need to use ServerSocket
ServerSocket And Socket Different ,ServerSocket Is waiting for the client's request , Once you get a connection request , Just create one Socket Example to communicate with the client

3.registry Medium service What do parameters mean ?

stay registry.registry(helloService); in ,HelloService Namely service
therefore service It represents the instance object of the interface implementation class


summary

You probably know the implementation method of dynamic proxy and reflection

原网站

版权声明
本文为[Hash..]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207221755085213.html