一个最简单的 RPC 需要满足几个基本的要求。首先是通信,一般可选的有 HTTP 和 TCP,这里选择 TCP,直接使用 Java Socket 处理通信。然后就是寻址,也就是如何找到要调用的方法。这里根据服务消费者提供的基本调用信息,然后利用 Java 的反射机制进行调用。服务消费者在进行远程调用时就像调用本地方法一样的效果则依靠 Java 的动态代理机制来实现。最后是参数序列化和反序列化,这里使用最简单的 Java 原生的序列化机制。
首先我们需要一个注册中心,所有的服务提供者都需要先向注册中心进行服务注册,这样服务消费者才能“发现”并进行消费。
1 2 3 4 5 6 7 8
   | public interface Registry {
      void stop();
      void start();
      void register(Class<?> serviceInterface, Class<?> impl); }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
   | @Getter public class RegistryCenter implements Registry {
      private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
      private static Map<String, Class> serviceRegistry = new HashMap<>();
      private int port;
      private ServerSocket serverSocket;
      public RegistryCenter(int port) {         this.port = port;     }
      @Override     public void stop() {         this.executor.shutdown();         if (this.serverSocket != null) {             try {                 this.serverSocket.close();             } catch (IOException e) {                 e.printStackTrace();             }         }     }
      @Override     public void start() {         try {             this.serverSocket = new ServerSocket(this.port);             System.out.println("Registry Center Start...");                          while (true) {                                  this.executor.execute(new ServiceTask(this.serverSocket.accept()));             }         } catch (IOException e) {             e.printStackTrace();         } finally {             try {                 if (this.serverSocket != null) this.serverSocket.close();             } catch (IOException e) {                 e.printStackTrace();             }         }     }
      @Override     public void register(Class<?> serviceInterface, Class<?> impl) {         this.serviceRegistry.put(serviceInterface.getName(), impl);     }
      static class ServiceTask implements Runnable {
          private Socket socket;
          public ServiceTask(Socket socket) {             this.socket = socket;         }
          @Override         public void run() {             try (ObjectInputStream input = new ObjectInputStream(this.socket.getInputStream())) {                                  String serviceName = input.readUTF();                 String methodName = input.readUTF();                 Class<?>[] paramTypes = (Class<?>[]) input.readObject();                 Object[] arguments = (Object[]) input.readObject();                 Class<?> serviceClass = serviceRegistry.get(serviceName);                 if (serviceClass == null) {                     throw new ClassNotFoundException(serviceName + " not found.");                 }                                  Method method = serviceClass.getMethod(methodName, paramTypes);                 Object result = method.invoke(serviceClass.newInstance(), arguments);                                  try (ObjectOutputStream output = new ObjectOutputStream(this.socket.getOutputStream())) {                     output.writeObject(result);                     output.flush();                 } catch (Exception e) {                     e.printStackTrace();                 }             } catch (Exception e) {                 e.printStackTrace();             }         }     } }
   | 
 
注册中心应该是一个服务端,需要长期在某个端口进行监听,在有客户端连接过来时就分出一个线程去处理。将消费者提供的序列化后的服务名称、方法名称、方法参数类型和参数进行反序列化,然后利用反射机制调用对应的服务,并将调用结果序列化后传输给消费者(客户端)。
在客户端,当我们调用接口的一个方法时,接口的实现类需要将我们的调用信息通过网络传递给注册中心,然后等待调用结束后再接收调用结果返回,如果为每个接口都创建这么一个类,显然是不合适的,因此最好的方式就是使用动态代理机制。我们创建一个工具类,提供一个获取远程对象的方法,在该方法中实现 InvocationHandler 接口,将实现类的上述逻辑放在其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
   | public class ProxyUtil {
      @SuppressWarnings("unchecked")     public static <T> T getRemoteProxyInstance(Class<T> serviceInterface, InetSocketAddress address) {
          return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},                 ((proxy, method, args) -> {                     try (Socket socket = new Socket()) {                         socket.connect(address);                         try (ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {                                                          output.writeUTF(serviceInterface.getName());                             output.writeUTF(method.getName());                             output.writeObject(method.getParameterTypes());                             output.writeObject(args);                             output.flush();                             try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) {                                                                  return input.readObject();                             } catch (Exception e) {                                 e.printStackTrace();                             }                         } catch (Exception e) {                             e.printStackTrace();                         }                     } catch (IOException e) {                         e.printStackTrace();                     }                     return null;                 }));     } }
  | 
 
接下来就可以开始使用了,使用时有一点要特别注意,由于我们使用的是 Java 原生的序列化和反序列化机制,因此我们方法的参数和返回值都需要实现 java.io.Serializable 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13
   | @Getter @Setter @ToString public class User implements Serializable {
      private String id;     private String username;
      public User(String id, String username) {         this.id = id;         this.username = username;     } }
   | 
 
1 2 3
   | public interface UserService {     User getUserById(String id); }
  | 
 
1 2 3 4 5 6
   | public class UserServiceImpl implements UserService {     @Override     public User getUserById(String id) {         return new User(id, "saber");     } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | public class Application {     public static void main(String[] args) throws InterruptedException {         Registry registry = new RegistryCenter(8888);                  registry.register(UserService.class, UserServiceImpl.class);                  new Thread(() -> registry.start(), "server").start();
          UserService userService = ProxyUtil.getRemoteProxyInstance(UserService.class, new InetSocketAddress("localhost", 8888));         User user = userService.getUserById("12");         System.out.println(user);
                   TimeUnit.SECONDS.sleep(2);         registry.stop();     } }
  |