一个最简单的 RPC 需要满足几个基本的要求。首先是通信,一般可选的有 HTTP 和 TCP,这里选择 TCP,直接使用 Java Socket 处理通信。然后就是寻址,也就是如何找到要调用的方法。这里根据服务消费者提供的基本调用信息,然后利用 Java 的反射机制进行调用。服务消费者在进行远程调用时就像调用本地方法一样的效果则依靠 Java 的动态代理机制来实现。最后是参数序列化和反序列化,这里使用最简单的 Java 原生的序列化机制。
首先我们需要一个注册中心,所有的服务提供者都需要先向注册中心进行服务注册,这样服务消费者才能“发现”并进行消费。
1 2 3 4 5 6 7 8
| public interface Registry {
void stop();
void start();
void register(Class<?> serviceInterface, Class<?> impl); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| @Getter public class RegistryCenter implements Registry {
private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static Map<String, Class> serviceRegistry = new HashMap<>();
private int port;
private ServerSocket serverSocket;
public RegistryCenter(int port) { this.port = port; }
@Override public void stop() { this.executor.shutdown(); if (this.serverSocket != null) { try { this.serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }
@Override public void start() { try { this.serverSocket = new ServerSocket(this.port); System.out.println("Registry Center Start..."); while (true) { this.executor.execute(new ServiceTask(this.serverSocket.accept())); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (this.serverSocket != null) this.serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }
@Override public void register(Class<?> serviceInterface, Class<?> impl) { this.serviceRegistry.put(serviceInterface.getName(), impl); }
static class ServiceTask implements Runnable {
private Socket socket;
public ServiceTask(Socket socket) { this.socket = socket; }
@Override public void run() { try (ObjectInputStream input = new ObjectInputStream(this.socket.getInputStream())) { String serviceName = input.readUTF(); String methodName = input.readUTF(); Class<?>[] paramTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Class<?> serviceClass = serviceRegistry.get(serviceName); if (serviceClass == null) { throw new ClassNotFoundException(serviceName + " not found."); } Method method = serviceClass.getMethod(methodName, paramTypes); Object result = method.invoke(serviceClass.newInstance(), arguments); try (ObjectOutputStream output = new ObjectOutputStream(this.socket.getOutputStream())) { output.writeObject(result); output.flush(); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } } }
|
注册中心应该是一个服务端,需要长期在某个端口进行监听,在有客户端连接过来时就分出一个线程去处理。将消费者提供的序列化后的服务名称、方法名称、方法参数类型和参数进行反序列化,然后利用反射机制调用对应的服务,并将调用结果序列化后传输给消费者(客户端)。
在客户端,当我们调用接口的一个方法时,接口的实现类需要将我们的调用信息通过网络传递给注册中心,然后等待调用结束后再接收调用结果返回,如果为每个接口都创建这么一个类,显然是不合适的,因此最好的方式就是使用动态代理机制。我们创建一个工具类,提供一个获取远程对象的方法,在该方法中实现 InvocationHandler 接口,将实现类的上述逻辑放在其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ProxyUtil {
@SuppressWarnings("unchecked") public static <T> T getRemoteProxyInstance(Class<T> serviceInterface, InetSocketAddress address) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, ((proxy, method, args) -> { try (Socket socket = new Socket()) { socket.connect(address); try (ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) { output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); output.flush(); try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) { return input.readObject(); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } return null; })); } }
|
接下来就可以开始使用了,使用时有一点要特别注意,由于我们使用的是 Java 原生的序列化和反序列化机制,因此我们方法的参数和返回值都需要实现 java.io.Serializable
接口。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Getter @Setter @ToString public class User implements Serializable {
private String id; private String username;
public User(String id, String username) { this.id = id; this.username = username; } }
|
1 2 3
| public interface UserService { User getUserById(String id); }
|
1 2 3 4 5 6
| public class UserServiceImpl implements UserService { @Override public User getUserById(String id) { return new User(id, "saber"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Application { public static void main(String[] args) throws InterruptedException { Registry registry = new RegistryCenter(8888); registry.register(UserService.class, UserServiceImpl.class); new Thread(() -> registry.start(), "server").start();
UserService userService = ProxyUtil.getRemoteProxyInstance(UserService.class, new InetSocketAddress("localhost", 8888)); User user = userService.getUserById("12"); System.out.println(user);
TimeUnit.SECONDS.sleep(2); registry.stop(); } }
|