实现自己的 RPC 框架(二)

代码星冰乐

专注成就未来

首页 归档 关于

实现自己的 RPC 框架(二)

Aug 5, 2019 | haifeiWu | Java | 阅读
文章目录
  1. 1. RPC 模型
  2. 2. 怎么用
  3. 3. 服务的发布与订阅
    1. 3.1. 服务的发布
    2. 3.2. 服务的订阅
  4. 4. 小结

作 者:haifeiWu
原文链接:https://www.hchstudio.cn/article/2019/ba29/
版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。


由于版权原因,请阅读原文 --> 实现自己的 RPC 框架(二)

关注我们

作 者:haifeiWu
原文链接:https://www.hchstudio.cn/article/2019/ba29/
版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。

作 者:haifeiWu
原文链接:https://www.hchstudio.cn/article/2019/ba29/
版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。

前段时间自己搞了个 RPC 的轮子,不过相对来说比较简单,最近在原来的基础上加以改造,使用 Zookeeper 实现了 provider 自动寻址以及消费者的简单负载均衡,对之前的感兴趣的请转 造个轮子—RPC动手实现。

RPC 模型

在原来使用 TCP 直连的基础上实现基于 Zookeeper 的服务的注册与发现,改造后的依赖关系是这样的。

child-rpc

怎么用

话不多说,我们来看下如何发布和引用服务。
服务端我们将服务的 IP 和端口号基础信息注册到 Zookeeper 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author wuhaifei 2019-08-02
*/
public class ZookeeperServerMainTest {

public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer)
.setHost("172.16.30.114")
.setPort(5201)
.setRef(HelloServiceImpl.class.getName())
.setRegister(true)
.setInterfaceId(HelloService.class.getName());

RegistryConfig registryConfig = new RegistryConfig().setAddress("127.0.0.1:2181")
.setSubscribe(true)
.setRegister(true)
.setProtocol(RpcConstants.ZOOKEEPER);
ServerProxy serverProxy = new ServerProxy(new NettyServerAbstract())
.setServerConfig(serverConfig)
.setRegistryConfig(registryConfig);
try {
serverProxy.export();
while (true){

}
} catch (Exception e) {
e.printStackTrace();
}
}
}

通过 Zookeeper 引用注册在其上的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author wuhaifei 2019-08-02
*/
public class ZookeeperClientMainTest {
public static void main(String[] args) {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProtocol(RpcConstants.ZOOKEEPER)
.setTimeoutMillis(100000)
.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer);

RegistryConfig registryConfig = new RegistryConfig()
.setAddress("127.0.0.1:2181")
.setProtocol(RpcConstants.ZOOKEEPER)
.setRegister(true)
.setSubscribe(true);
ClientProxy<HelloService> clientProxy = new ClientProxy(clientConfig, new NettyClientAbstract(), HelloService.class)
.setRegistryConfig(registryConfig);
for (int i = 0; i < 10; i++) {
HelloService helloService = clientProxy.refer();
System.out.println(helloService.sayHi());
}
}
}

运行结果就不一一贴出了,感兴趣的小伙伴可以查看楼主传到 github 上的源码这是一个rpc的轮子。

服务的发布与订阅

楼主在原来代码的基础上添加了 Zookeeper 的注册的逻辑,原来的代码相关介绍请转 造个轮子—RPC动手实现。

服务的发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 发布服务
*/
public void export() {
try {
Object serviceBean = Class.forName((String) serverConfig.getRef()).newInstance();
RpcInvokerHandler.serviceMap.put(serverConfig.getInterfaceId(), serviceBean);
this.childServer.start(this.getServerConfig());

if (serverConfig.isRegister()) {
// 将服务注册到zookeeper
register();
}
} catch (Exception e) {
// 取消服务注册
unregister();
if (e instanceof ChildRpcRuntimeException) {
throw (ChildRpcRuntimeException) e;
} else {
throw new ChildRpcRuntimeException("Build provider proxy error!", e);
}
}
exported = true;
}

/**
* 注册服务
*/
protected void register() {
if (serverConfig.isRegister()) {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();
try {
registry.register(this.serverConfig);
} catch (ChildRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = serverConfig.getInterfaceId();
LOGGER.info(appName, "Catch exception when register to registry: "
+ registryConfig.getId(), e);
}
}
}

服务的订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 服务的引用.
*/
public T refer() {
try {
if (config.isSubscribe()) {
subscribe();
}
childClient.init(this.clientConfig);
return invoke();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 订阅zk的服务列表.
*/
private void subscribe() {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();

this.clientConfig = (ClientConfig) config;
List<String> providerList = registry.subscribe(this.clientConfig);

if (null == providerList) {
throw new ChildRpcRuntimeException("无可用服务供订阅!");
}

// 使用随机算法,随机选择一个provider
int index = ThreadLocalRandom.current().nextInt(providerList.size());
String providerInfo = providerList.get(index);
String[] providerArr = providerInfo.split(":");
clientConfig = (ClientConfig) this.config;
clientConfig.setHost(providerArr[0]);
clientConfig.setPort(Integer.parseInt(providerArr[1]));
}

上面代码比较简单,就是在原来直连的基础上添加 zk 的操作,在发布服务的时候将 provider 的 IP 和端口号基础信息注册到 zk 上,在引用服务的时候使用随机算法从 zk 上选取可用的 provider 信息,然后进行 invoke 调用。

小结

RPC(Remote procedure call)底层逻辑相对来说比较简单,楼主在实现的过程中参考了其他 RPC 框架的部分代码,受益匪浅~

关注我们

作 者:haifeiWu
原文链接:https://www.hchstudio.cn/article/2019/ba29/
版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。

分享
Java
二分查找算法细节详解lang3 的 split 方法误用
微信关注我们
分类
  • Android8
  • Go4
  • Java59
  • Kafka,Java1
  • Kotlin2
  • Linux1
  • MapReduce1
  • Python2
  • Raft1
  • Redis1
  • ThreadPoolExecutor1
  • go1
  • 工具1
  • 总结8
  • 旅游日记1
标签
Nginx ChanghuiN haifeiWu Android Java 设计模式 hexo Kotlin 算法 MySQL 源码解析 Python Redis golang web Kafka 配置中心 总结 性能优化 旅游日记 Shell Go 问题排查 译文 Docker Spring Boot 工具 学习笔记 WebFlux 性能测试 go 散列表 源码 netty Raft
最近文章
  • Kafka的日志复制机制
  • 从20到21
  • go 并发编程
  • 【译】了解Linux CPU负载-您何时应该担心?
  • Zookeeper 与分布式锁
  • 基于Redis的分布式锁到底安全吗?
  • 【译】Raft 学生指南
  • ThreadPoolExecutor 的简单梳理
  • MapReduce 的简单实现
  • 使用 Map 实现策略模式
福利专区
    免费SSL证书
      阿里云红包
        腾讯云专属福利
        Copyright © 2021 代码星冰乐. Powered by ChanghuiN. 版权所有 晋ICP备15001365号
        特别感谢: 云服务器服务商 、 CDN 服务商