随笔分类
Rpc框架
Remote Process Call,远程过程调用
说起 Rpc,我第一想到的便是 "微服务",即实现的便是像调用本地方法一样进行远程调用
大多大型应用,随着自身规模的不断扩大,都会逐渐向 "微服务化"进行演变,即,演进后的大型应用系统,都是由多个微服务构成的,而微服务间不是独立的,即会进行通信,因此,我个人也是比较赞同 Rpc是微服务的基础这一说辞的
实际上,但凡涉及到网络通信的,我们都有可能会使用到 Rpc
即,Rpc是解决分布式系统间通信的一大利器;Rpc指的并不是某一个具体的技术,而是整个网络远程调用的过程的过程
实现分布式系统间的通信,通常会去采用四层的 Tcp协议或者 七层的 Http协议,而出于稳定与高效的考虑,多数情况下,我们会去采用基于 Tcp的一个解决方案 (众所周知,Tcp是传输层协议,Http是应用层协议,越靠底层的协议,数据传输会更快);实际上,实现网络通信并不容易,这涉及到:对端节点的查找、双端连接的建立以及对于连接的管理、传输数据的编解码问题等,这些都是我们需要进行考虑到;其实从这也可以看出来,若是每搭建一个分布式系统的话,我们都要从网络底层开始对涉及到通信的逻辑都要去进行一系列复杂的编码的话,这实际上是一件恐怖的事情;so,对于分布式系统而言,网络通信始终是其搭建的一大难点
而 Rpc对网络通信的整个过程进行了封装,这使得我们在搭建分布式系统时,对于网络通信逻辑的编写开发变得更得简单明了,同时也使得我们的网络通信变得更加可靠!
而受到最近读的一些文章的影响,自己也对架构的设计有了个自己的认识:这里不扯什么抽象的,就以人来比喻;首先,人的组成必须有骨架,这就好比大型应用系统中的整体架构;而骨架下支撑着一些器官,这些器官各司其职,对应的便是各个模块;而器官之间需要交流,这是通过血管来进行承载的,对应的便是中间件,此时血液便是器官间进行信息传输的载体,即承载着所需以及所产出的物质;
这下,对于整个架构应该是有个整体的认识了吧
整体架构:
简易 Rpc框架实现:
Spring:强大的依赖注入框架
Netty:高性能、异步 NIO框架,封装了 NIO的底层细节
FastJson:高效 JSON序列化
目前实现的 Rpc框架属于简易版本,日后将结合 ZK等来服务的发现以及监控等
已开源:tiandankanfeng/Netty-RPC: 基于 Java实现的轻量级 Rpc框架 (github.com)
第一步:编写服务接口
/**
* 公共接口
*/
public interface UserService {
/**
* 根据 id查询用户
* @param id
* @return
*/
User getById(Integer id);
}
第二步:编写接口实现类
@RpcService(clazz = UserService.class) // 指定远程接口
public class UserServiceImpl implements UserService {
private volatile Map<Object, User> map;
@Override
public User getById(Integer id) {
if (null == map) {
synchronized (UserServiceImpl.class) {
if (null == map) {
map = new HashMap<>();
}
}
}
if (map.size() == 0) {
map.put(1, new User().setId(1).setName("liangye"));
map.put(2, new User().setId(2).setName("lake"));
}
return map.get(id);
}
}
@RpcService注解放置在接口的实现类上,起着标记的作用,即对外暴露的实现
/**
* 用于暴露接口
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class<?> clazz();
}
第三步:封装请求与响应
封装,其实是便于数据的统一读取与相关操作
/**
* 封装请求对象
*/
@Data
public class RpcRequest {
/**
* 请求对象id
*/
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterType;
/**
* 入参
*/
private Object[] parameters;
}
/**
* 封装响应对象
*/
@Data
public class RpcResponse {
/**
* 响应 id
*/
private String responseId;
/**
* 错误信息
*/
private String error;
/**
* 返回的结果
*/
private Object result;
}
第四步:编写 Netty服务端
使用 Netty来编写服务端,接受 Netty客户端发来的请求,自定义 handler去解析传输过来的数据,底层通过代理的方式去调用目标方法,然后将目标方法执行结果封装在 RpcResponse(包含了可能发生的错误等信息)中,返回给 Netty客户端
/**
* Netty服务端
*/
@Component // 能够被 Spring管理
public class NettyRpcServer implements DisposableBean {
@Autowired
private NettyServerHandler nettyServerHandler;
NioEventLoopGroup bossGroup = null;
NioEventLoopGroup workerGroup = null;
public void start(String inetHost, Integer inetPort) {
if (StringUtils.isEmpty(inetHost) || StringUtils.isEmpty(inetPort)) {
startUp("127.0.0.1", 9898);
} else {
startUp(inetHost, inetPort);
}
}
public void startUp(String inetHost, Integer inetPort) {
try {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ChannelFuture future = new ServerBootstrap()
.group(workerGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new StringEncoder());
// add self handler
channel.pipeline().addLast(nettyServerHandler);
}
})
.bind(inetHost, inetPort).sync();
System.out.println("Netty server start successfully!");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
shutdownGracefully();
}
}
public void shutdownGracefully() {
if (null != bossGroup) bossGroup.shutdownGracefully();
if (null != workerGroup) workerGroup.shutdownGracefully();
}
/**
* 容器关闭时, Netty服务端也需要被关闭
* @throws Exception
*/
@Override
public void destroy() throws Exception {
shutdownGracefully();
}
}
自定义相关 handler:
@Slf4j
@Component
@ChannelHandler.Sharable // 单例默认不可共享, 这里来设置下可以共享
public class NettyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
static final Map<String, Object> SERVICE_INSTANCE_MAP = new ConcurrentHashMap<>();
/**
* 事件就绪, 异步回调
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
// 解析客户端请求
RpcRequest rpcRequest = JSON.parseObject(s, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setResponseId(rpcRequest.getRequestId());
// business process
try {
rpcResponse.setResult(handler(rpcRequest));
}catch (Exception e) {
rpcResponse.setError(e.getMessage());
e.printStackTrace();
}
// 响应客户端
ctx.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* Reflective method invoke
* @param rpcRequest
* @return
*/
private Object handler(RpcRequest rpcRequest) throws ClassNotFoundException, InvocationTargetException, NoSuchMethodException, IllegalAccessException {
var className = rpcRequest.getClassName();
var methodName = rpcRequest.getMethodName();
Assert.notNull(className);
Assert.notNull(methodName);
Object bean = SERVICE_INSTANCE_MAP.get(className);
if (null == bean) {
log.warn("服务端未找到对应服务, rpcRequest:{}", rpcRequest);
throw new RuntimeException("服务端未找到对应服务");
}
// Reflective invoke
// FastClass | JdkDynamicProxy
Object proxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), bean.getClass().getInterfaces(),
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(bean, args);
}
}
);
Class<?> intr = bean.getClass().getInterfaces()[0];
// proxy = intr.cast(proxy);
Method method = proxy.getClass().getDeclaredMethod(rpcRequest.getMethodName(), rpcRequest.getParameterType());
return method.invoke(proxy, rpcRequest.getParameters());
// FastClass proxy = FastClass.create(bean.getClass());
// FastMethod method = proxy.getMethod(methodName, rpcRequest.getParameterType());
// return method.invoke(bean, rpcRequest.getParameters());
}
// cache @RpcService bean
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
Set<Map.Entry<String, Object>> entries = beans.entrySet();
for (Map.Entry<String, Object> entry : entries) {
Object bean = entry.getValue();
if (bean.getClass().getInterfaces().length == 0) {
throw new RuntimeException("对外暴露的服务必须要去实现接口");
}
// 默认去缓存第一个实现的接口
String serviceName = bean.getClass().getInterfaces()[0].getName();
SERVICE_INSTANCE_MAP.put(serviceName, bean);
}
}
}
第五步:编写 Netty客户端
当服务被调用时,通过 Netty客户端向服务端发送数据,从而实现服务的远程调用
@Component
public class NettyRpcClient implements InitializingBean, DisposableBean {
@Autowired
private NettyClientHandler clientHandler;
private NioEventLoopGroup group = new NioEventLoopGroup();
private Channel ch;
private ExecutorService executor = Executors.newCachedThreadPool(); // 仅个人开发时这么写
@Override
public void afterPropertiesSet() throws Exception {
try {
ChannelFuture future = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new StringEncoder());
// business process
channel.pipeline().addLast(clientHandler);
}
}).connect("127.0.0.1", 9898).sync();
System.out.println("client connect sucessfully!");
ch = future.channel();
} catch (Exception e) {
e.printStackTrace();
shutdownGracefully();
}
}
public Object sndMsg(String msg) throws ExecutionException, InterruptedException {
clientHandler.setReqMsg(msg);
Future future = executor.submit(clientHandler);
return future.get();
}
@Override
public void destroy() throws Exception {
shutdownGracefully();
}
public void shutdownGracefully() {
if (null != group) {
group.shutdownGracefully();
}
if (null != ch) {
ch.close();
}
}
}
即,当我们通过接口去调用对应方法时,需要经过代理的方式,将对应接口调用相关数据封装成 RpcRequest,然后通过 Netty来取发送到服务器
/**
* proxy for Rpc-Client
*/
@Component
public class RpcClientProxy {
@Autowired
private NettyRpcClient rpcClient;
static final Map<Class, Object> SERVICE_PROXY_MAP = new ConcurrentHashMap<>(16);
public Object getProxy(Class clazz) {
var proxy = SERVICE_PROXY_MAP.get(clazz);
if (null != proxy) {
return proxy;
}
// new a proxy
proxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 封装请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setMethodName(method.getName());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setParameters(args);
rpcRequest.setParameterType(method.getParameterTypes());
try {
// send msg and decorate response
Object msg = rpcClient.sndMsg(JSON.toJSONString(rpcRequest));
RpcResponse response = JSON.parseObject(msg.toString(), RpcResponse.class);
if (null != response.getError()) {
throw new RuntimeException(response.getError());
}
var res = response.getResult();
if (null != res) {
return JSON.parseObject(res.toString(), method.getReturnType());
}
return null;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
});
SERVICE_PROXY_MAP.put(clazz, proxy);
return proxy;
}
}
光这样还不够,我们需要去实现的便是类似于 @Autowired的效果,实现依赖注入,此时可以通过注解的方式(@RpcReference)去标识哪一个 Field需要被代理以及借助于 BeanPostProcessor来去实现注入:
/**
* 引用代理类
* 即,实现将代理注入到接口中去
*/
@Target(ElementType.FIELD) // 作用于字段
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {
}
代理的生成,这里采用了基于 JdkDynamicProxy的方式,Cglib也可以去实现:
@Component
public class ServiceBeanPostProcessor implements BeanPostProcessor {
@Autowired
private RpcClientProxy clientProxy;
/**
* 实现自定义注解的注入
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// get all fields of bean
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
// 查询是否有需要去代理的接口
if (field.isAnnotationPresent(RpcReference.class)) {
Object proxy = clientProxy.getProxy(field.getType());
try {
field.setAccessible(true);
field.set(bean, proxy);
} catch (Exception e) {
e.printStackTrace();
}
}
// RpcReference annotation = field.getAnnotation(RpcReference.class);
// if (null != annotation) {
// Object proxy = clientProxy.getProxy(field.getType());
// try {
// field.setAccessible(true);
// field.set(bean, proxy);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
return bean;
}
}
剩下的便是,便是对于请求与响应的封装处理:
由于 Netty是基于异步 NIO实现的,即客户端发送请求后,是不会等待响应回来再去执行后续操作的,然,我们的业务需要即时获取响应数据,这块基于 wait / notify机制实现了伪同步:
@Component
@Data
public class NettyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
private ChannelHandlerContext ctx;
// 请求消息
private String reqMsg;
// 响应消息
private String respMsg;
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
respMsg = s;
notify();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
super.channelActive(ctx);
}
@Override
public synchronized Object call() throws Exception {
ctx.writeAndFlush(reqMsg);
/**
* 借助 thread wait/notify attain 伪同步
*/
wait();
return respMsg;
}
}
综上,便实现了简易版的 rpc
测试:
@RestController
@RequestMapping("/user")
public class UserController {
@RpcReference
private UserService userService;
@GetMapping("/{id}")
public User getById(@PathVariable("id") Integer id) {
return userService.getById(id);
}
}
Web简单测试下,即可实现 Rpc效果.