自己写RPC之实现服务注册与发现
本文开始继续我们的造轮子之旅。
这个系列笔者将带领读者朋友实现简易的基于Netty、curator以及springBoot等技术的一个简易RPC通信轮子。
实现最基本的服务发现、服务注册、RPC通信等功能。该项目命名为:misaka,她是《某科学的超电磁炮》的女主角御坂美琴的名字。
本文是该系列的第一篇,主要实现服务注册与发现功能。
我选择zookeeper作为服务注册发现的核心组件,使用curator作为与zookeeper通信的客户端。
curator提供了一个服务注册发现的实现,curator-x-discovery,只需要在项目中引入即可。
建立项目misaka-api,在pom中引入如下依赖:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-x-discovery -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>4.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.44.Final</version>
</dependency>
引入netty作为网络通信组件。
服务发现核心类:DiscoveryService
声明一些变量以便后续使用
/**zk连接地址*/
private static final String connectString = "127.0.0.1:2181";
/**CuratorFramework*/
private CuratorFramework client = null;
/**服务发现实例*/
private ServiceDiscovery<String> discovery = null;
/**服务提供者实例*/
private ServiceProvider<String> provider = null;
/**zk根节点*/
private static final String BASE_PATH = "/misaka";
/**记录服务提供者列表,便于统一进行关闭*/
private List<Closeable> closeableList = Lists.newArrayList();
private Object lock = new Object();
/** 服务注册表*/
private Map<String, ServiceProvider<String>> providers = Maps.newConcurrentMap();
对外提供一个init方法,用于初始化Curator客户端并启动
public void init() {
// 初始化Curator客户端并启动
client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1));
client.start();
// 构造服务发现
discovery =
ServiceDiscoveryBuilder.builder(String.class)
.basePath(BASE_PATH)
.client(client)
.build();
try {
discovery.start();
} catch (Exception e) {
throw new RuntimeException("启动服务发现异常", e);
}
}
启动Curator客户端之后,通过ServiceDiscoveryBuilder这个构造器实例化一个ServiceDiscovery对象,并启动服务发现实例。
对外提供一个服务注册方法,用于在应用启动阶段对服务进行注册,将服务元信息写入zookeeper中
public void registeService(ServiceInstanceNode node) throws Exception {
if (node.getPort() == null) {
return;
}
ServiceInstance<String> serviceInstance =
ServiceInstance.<String>builder()
.payload(node.getPayload())
.name(node.getServiceName())
.port(Integer.valueOf(node.getPort())).build();
discovery.registerService(serviceInstance);
}
我们定义了一个ServiceInstanceNode实例,通过ServiceInstanceNode实例初始化一个ServiceInstance实例,ServiceInstance作为服务实例通过discovery.registerService方法注册到zookeeper中。
对外提供一个 getInstanceByName 方法,允许调用方根据服务名查询具体的ServiceInstance实例,从而获取服务的具体信息。
我们看一下ServiceInstance具体有哪些属性:
public class ServiceInstance<T> {
private final String name; // 服务名称
private final String id; // 服务id
private final String address; // 可寻址的服务域名或ip
private final Integer port; // 服务暴露端口
private final Integer sslPort; // ssl端口
private final T payload; // 自定义信息
......
可以看到,ServiceInstance包含了服务名称、服务id、服务地址以及端口等信息,我们提供的getInstanceByName方法给客户端,方便客户端获取服务元信息,从而实现服务发现。
getInstanceByName方法逻辑如下:
public ServiceInstance<String> getInstanceByName(String serviceName) throws Exception {
ServiceProvider<String> provider = providers.get(serviceName);
if (provider == null) {
synchronized (lock) {
provider = providers.get(serviceName);
if (provider == null) {
// 随机策略
provider = discovery.serviceProviderBuilder().serviceName(serviceName)
.providerStrategy(new RandomStrategy<String>()).build();
provider.start();
closeableList.add(provider);
providers.put(serviceName, provider);
}
}
}
return provider.getInstance();
}
providers是基于ConcurrentHashMap实现的一个服务注册表。每次来的时候先从provider中尝试获取ServiceProvider实例,如果获取不到则使用双重检查锁机制从discovery中获取服务提供者provider,这里使用了随机策略RandomStrategy(其余实现还有轮询策略、sticky粘滞策略)。
在serviceProvider中获取到provider提供者 之后,添加到providers中,同时添加到closeableList以便后续调用shutdown对所有的provider进行关闭操作。
提供一个shutdown方法用于关闭资源
public synchronized void shutdown() {
for (Closeable closeable : closeableList) {
CloseableUtils.closeQuietly(closeable);
}
CloseableUtils.closeQuietly(discovery);
CloseableUtils.closeQuietly(client);
}
这里我们对closeableList进行迭代,逐个关闭ServiceProvider实例。
ServiceProvider实现了Closeable接口,因此它也是一个Closeable的实例。
到这里,我们就实现了服务注册与发现的核心功能,接着通过一个demo案例去测试一下。
服务注册实现
新建服务misaka-provider,作为服务提供者,它在启动之后会对服务进行注册。
编写RegistyHandler添加@Configuration注解,标记为一个配置类。
注册DiscoveryService实例
@Bean(initMethod = "init", destroyMethod = "shutdown")
public DiscoveryService discoveryService() {
DiscoveryService discoveryService = new DiscoveryService();
return discoveryService;
}
声明并向Spring容器中注册DiscoveryService实例,标记初始化方法为init,销毁方法为shutdown
注册服务实现类HelloServiceImpl
定义RPC接口HelloService
public interface HelloService {
public String sayHello(String name, String content);
}
声明一个sayHello方法,用过dubbo等RPC框架的同学应当很熟悉了。没错,这里的HelloService在Dubbo中就是服务定义接口。
编写实现类HelloServiceImpl实现HelloService
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String name, String content) {
return new StringBuilder("hello:").append(name).append(", content:").append(content).toString();
}
}
注册服务实现类
在RegistyHandler注册类中,注册HelloServiceImpl实例,并将其元信息注册到discoveryService中。
@Bean
@ConditionalOnBean(value = DiscoveryService.class)
public HelloServiceImpl helloServiceImpl(DiscoveryService discoveryService) {
HelloServiceImpl helloService = new HelloServiceImpl();
ServiceInstanceNode helloServiceNode = new ServiceInstanceNode();
// 服务注册
helloServiceNode.setPort(servicePort).setAddress(ipAddress).setServiceName(HelloServiceImpl.class.getName());
try {
discoveryService.registeService(helloServiceNode);
} catch (Exception e) {
throw new RuntimeException("注册HelloServiceImpl异常");
}
return helloService;
}
通过 @ConditionalOnBean(value = DiscoveryService.class) 条件注册告诉Spring容器只有存在DiscoveryService实例才注册HelloServiceImpl。
构造一个ServiceInstanceNode,设置属性后通过discoveryService.registeService方法将元信息注册到zookeeper中。
服务注册部分的开发就告一段落,我们接着看下服务发现部分的代码实现。
服务发现实现
新建服务misaka-consumer,作为服务提供者,它会根据服务名称取zookeeper进行查询,获取具体的服务元信息。
和服务提供者服务类似,定义一个RegistyHandler类,添加注解@Configuration。
注册DiscoveryService实例
@Bean(initMethod = "init", destroyMethod = "shutdown")
public DiscoveryService discoveryService() {
DiscoveryService discoveryNode = new DiscoveryService();
return discoveryNode;
}
向Spring容器中注册DiscoveryService实例,用于服务发现。
@Bean(destroyMethod = "shutdown")
@ConditionalOnBean(DiscoveryService.class)
public RemoteClient remoteClient(DiscoveryService discoveryService) {
RemoteClient remoteClient;
try {
// 服务发现
ServiceInstance<String> serviceInstance = discoveryService.getInstanceByName(helloServiceName);
LOGGER.info(JSON.toJSONString(serviceInstance));
...省略其他逻辑...
} catch (Exception e) {
throw new RuntimeException("init RuntimeException error!", e);
}
return remoteClient;
}
注册客户端通信RemoteClient实例,这个类的作用为封装Netty用于RPC网络通信,具体逻辑在后续的通信实现部分进行讲解。
我们注意看try-catch中的代码,从DiscoveryService中获取了服务名为 key = helloServiceName 的服务实现,value通过@Value注解获取,具体值配置在application.properties中。
misaka.service.HelloService.name=com.snowalker.misaka.misaka.service.HelloServiceImpl
在RegistyHandler声明如下:
@Value("${misaka.service.HelloService.name}")
private String helloServiceName;
通过discoveryService.getInstanceByName方法获取到helloServiceName对应的具体服务元信息后,我们通过日志进行打印。
接下来先后启动提供者服务,消费者服务,对服务注册发现逻辑进行测试。
测试服务注册及发现
首先启动提供者服务,控制台输出如下:
......
2020-02-03 20:11:34.304 INFO 31092 --- [ main] org.apache.zookeeper.ZooKeeper :
Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@31c269fd
2020-02-03 20:11:34.313 INFO 31092 --- [ main] org.apache.zookeeper.ClientCnxnSocket :
jute.maxbuffer value is 4194304 Bytes
2020-02-03 20:11:34.317 INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-02-03 20:11:34.319 INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Socket connection established, initiating session, client: /127.0.0.1:8071, server: 127.0.0.1/127.0.0.1:2181
2020-02-03 20:11:34.320 INFO 31092 --- [ main] o.a.c.f.imps.CuratorFrameworkImpl :
Default schema
2020-02-03 20:11:34.323 INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1000fa2eb800032, negotiated timeout = 40000
2020-02-03 20:11:34.327 INFO 31092 --- [ain-EventThread] o.a.c.f.state.ConnectionStateManager :
State change: CONNECTED
2020-02-03 20:14:27.180 INFO 31048 --- [ main] c.s.misaka.misaka.config.RegistyHandler :
service : com.snowalker.misaka.misaka.service.HelloServiceImpl registered success
......
可以看到已经与zookeeper建立了链接,并注册helloServiceImpl服务到zookeeper中。
接着启动服务消费者服务,控制台输出如下:
2020-02-03 17:52:49.673 INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-02-03 17:52:49.674 INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Socket connection established, initiating session, client: /127.0.0.1:4290, server: 127.0.0.1/127.0.0.1:2181
2020-02-03 17:52:49.676 INFO 6824 --- [ main] o.a.c.f.imps.CuratorFrameworkImpl :
Default schema
2020-02-03 17:52:49.678 INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn :
Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1000fa2eb800031, negotiated timeout = 40000
2020-02-03 17:52:49.682 INFO 6824 --- [ain-EventThread] o.a.c.f.state.ConnectionStateManager :
State change: CONNECTED
2020-02-03 17:52:49.790 INFO 6824 --- [ main] c.s.m.m.config.RegistyHandler :
{"address":"192.168.0.100","enabled":true,"id":"5e44a372-f1e0-44da-8a22-daa81a346f37",
"name":"com.snowalker.misaka.misaka.service.HelloServiceImpl","port":18083,
"registrationTimeUTC":1580723559543,"serviceType":"DYNAMIC"}
注意观察最后一行日志,这里打印出了HelloServiceImpl服务的注册元信息,该元信息即是提供者服务启动时注册到zookeeper上的服务元信息。
我们使用zk-cli观察一下zookeeper中的Node节点:
笔者已经在本地部署了一套zk-ui,关于zk-ui的使用可以自行查看附录中的参考链接。
从图中可以看出,服务HelloServiceImpl已经成功注册到zookeeper上,并且能够被服务消费者发现。
小结
作为 “自己写RPC” 系列的第一篇,本文详细的讲解了如何利用curator整合spring Boot框架实现跨服务的服务注册与发现功能,并且给出了详细的代码实现与讲解。
在开发过程中,笔者强烈地体会到掌握zookeeper组件对于后端开发者的必要性,zookeeper真的很强大。
在后续的文章中,我将继续带领读者,实现远程服务调用逻辑。
附录
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。