今天来实战一下gRPC的特性,双向流。

首先认识一下,什么是双向流?

所谓双向流式 RPC ,是由客户端调用方法来初始化,而服务端接收到客户端的元数据,方法名和截止时间。

并且服务端可以选择发送回它的初始元数据或等待客户端发送请求的一种通信方式。

双向流,特点在于双向,也就是请求响应如何起作用是完全取决于应用怎么处理,因为客户端和服务端能在任意顺序上读写 ,也就是说这些流的操作是完全独立的。

例如服务端可以一直等待,直到它接收到所有客户端的消息才写应答,或者服务端和客户端可以像”乒乓球”一样:服务端后得到一个请求就回送一个应答,接着客户端根据应答来发送另一个请求,以此类推。

通俗地说,客户端发送了 N 个请求,服务端返回 N 个或者 M 个响应,该特性能够充分利用 HTTP/2.0 的多路复用功能。

某个时刻,HTTP/2.0 链路上可以既有请求也有响应,实现了全双工通信(对比单行道和双向车道),双向流式RPC通信用一个简单的图表示如下:

grpc双向流.png

Read More

本文案例代码地址 : https://github.com/TaXueWWL/grpc-demo

有了前面的铺垫,我们已经对gRPC的java实现机制,代码编写手法、阻塞RPC以及双向流等内容有了全面、直观地了解。

本文我们继续本系列,为我们的gRPC添加服务注册发现。

什么是服务注册发现?

在RPC调用流程中,服务调用方需要知道服务提供方的地址和端口,才能发起RPC调用。

如果是直连式调用,则服务提供方需要提前配置服务提供方的地址和端口,也就是大白话说的 写死

这种硬编码配置方式应对变化的能力很差,如果服务提供方宕机,服务消费者无法及时更换调用的目标,即便服务提供方存在冗余的机器,消费者也需要修改配置文件,重启服务才能调用至新的服务提供方节点。

通俗地说就是,这种方式将服务提供方与服务消费方耦合在了一起,不够灵活。

因此就需要有服务注册发现机制。如下图所示:

registry.png

这里引用了dubbo框架的简易架构图。图中,服务提供方(provider)启动后会向注册中心(Registry)发起服务注册,将自己的ip、端口、其他元数据信息发送给注册中心。

注册中心维护了一个注册表,对上报的服务注册信息进行记录。

服务消费者(consumer)启动后会向注册中心(Registry)拉取服务提供方列表,也就是图中的 subscribe ,即:服务发现过程。

注意看,3.notify 是一条虚线,这里的含义是指,一旦服务提供方的注册信息发生变更,如现有节点下线(有可能是正常的关机,如版本发布;也有可能是意外宕机,都会导致服务下线。)或者新节点上线,都会造成注册中心中记录的服务注册信息发生变更,此时注册中心会通知服务消费者存在注册表信息变更,此时需要对最新的服务注册信息进行变更,一般有几种方式:

  1. 注册中心通过push方式主动推送给消费者,这种方式往往通过消费者向注册中心注册监听器方式实现;
  2. 消费者定时通过pull方式从注册中心拉取注册表信息并在本地进行更新;
  3. 消费者通过长轮询方式从注册中心拉取注册表信息(推拉结合)。

Read More

微服务如火如荼的当下,各种服务框架层出不穷。Dubbo、SpringCloud在国内Java后端微服务领域目前占据大部分份额。

但是随着云原生愈发普及,具备跨语言、高性能特性的RPC通信框架横空出世,其中gRPC与Thrift是其中的佼佼者。

本文我们将视角集中在gRPC这RPC框架。

gRPC 是Google开源的高性能、通用的RPC框架。客户端与服务端约定接口调用, 可以在各种环境中运行,具有跨语言特性, 适合构建分布式、微服务应用。

个人认为,gRPC最为杀手锏的特性就是“跨语言”,其次才是高性能。

它的跨语言特性体现在,通过定义IDL(接口定义语言),隔离了不同编程语言之间的差异,对IDL进行编译后,生成对应编程语言的nativeCode,让开发者能够集中注意在实现业务需求上,而不需要花费额外的精力在语言层面上。

官网的一张图能够很好的体现这个特点

gRPC多语言

Read More

今天的文章,继续学点儿金融知识。重点来了解下金融领域的撮合交易机制。

本文主要介绍撮合交易的概念,委托单相关的知识,以及撮合交易集合竞价与连续竞价相关的要点。

最后讲解撤单相关的知识及止盈止损相关的知识点。

什么是撮合交易?

撮合交易指的是 买方 在交易所下委托买单买入标的,如一手股票,卖方 在交易所下委托卖单卖出标的,如一手股票。

交易所按照价格优先、时间优先原则确定双方成交价格,对符合交易规则的订单进行撮合成交,按交易定单指定的标的物进行交割的交易方式。

撮合是如何成交的?

一般来说,买入和卖出价格是确定的,这种订单叫做限价单,即下单时就指定确定的价格。

买入限价,指的是至多(小于等于)通过该价格成交,买入一定数量的股票。如限价$10, 则可以买入<=$10的股票。

卖出限价,指的至少(大于等于)要通过该价格成交,卖出一定数量的股票,如限价$10,则对手方至少要付出$10买入股票。

Read More

到此,Disruptor高性能之道系列就先告一段落,关于Disruptor的实战使用将会在后续的撮合实战案例中讲解。

本文推荐一些学习Disruptor的资料,感兴趣的读者可以根据这些资料进一步学习。

官网

官网是第一手资料,建议通读

https://lmax-exchange.github.io/disruptor/user-guide/index.html#_using_the_disruptor

并发编程网:disruptor译文

虽然有些过时,但是思想是值得参考的。建议通读

http://ifeve.com/disruptor/

ifeve.PNG



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

我们接着介绍Disruptor高性能实现之道–等待策略。

等待策略waitStrategy是一种决定一个消费者如何等待生产者将event对象放入Disruptor的方式/策略。

等待策略waitStrategy是一个接口,它的所有实现都是针对消费者生效的。

Read More

Ringbuffer(环形缓冲区/环形数组)是Disruptor的核心底层数据结构。

它不同于传统的阻塞队列(如:ArrayBlockingQueue)是从某一端入队,另外一端出队,而是一种收尾相连的环形结构。

ringbuffer.png

之所以叫它 buffer,我想大概是因为这个环形队列是作为不同线程(or上下文)之间传递数据媒介,类似于一个缓冲区。

RingBuffer拥有一个序号,指向数组中下一个可用的元素,需要注意的是Disruptor中的RingBuffer没有头尾指针,而只通过序号(sequence)就实现了生产者与消费者之间的进度协调。

RingBuffer可以一直填充吗?

假如不断地填充RingBuffer,那么必然会发生sequence一直增加,直到绕过环,覆盖原有的内容。

Disruptor是通过barrier实现了是否要覆盖原有内容的判断,这部分内容后面会说到。

如何定位RingBuffer中的元素呢?

正如我们在前面所说,RingBuffer本质上是个数组,那么必然可以通过数组的偏移量offset或者说index,定位到具体的元素。

在实际的开发中,我们常通过取模运算来获取元素在数组中的偏移量。也就是 序号 % 长度 == 索引

假设有8个元素,那么元素序号为13的元素就位于:

13 % 8 = 5

对于Disruptor而言,它强制要求数组的size初始化为 2的N次方,如 1024 * 1024。

设置为2的N次方有这样的好处:可以通过位运算更快速定位到元素位置。公式为:

seq & (ringBufferSize - 1) == index

在Disruptor中, ringBufferSize-1 成为mask,即掩码。

RingBuffer中的数据是如何预热的?

RingBuffer通过预分配对象机制来降低GC的影响。在实际运行过程中,业务从RingBuffer中获取对应sequence位置的对象引用,对该引用指向的对象属性赋值,通过覆盖写方式而不是直接覆盖整个对象的方式,保证了对象引用在整个disruptor存活的周期内都存在,保证GCRoot始终存在,因此能够大幅降低GC的影响。

这也是Disruptor高性能保证的策略之一,由于Disruptor主要使用场景之一就是低延迟环境,因此必须减少运行时内存分配,从而减少垃圾回收导致的系统停顿(STW)。

这种预加载机制在其他的中间件也有使用,如RocketMQ的commitLog也是在broker启动时就创建固定1G的文件,便于启动完成便可进行写入而不需要进行运行期创建。

Disruptor的RingBuffer数据预热具体的实现,查看Disruptor源码:

Disruptor初始化过程中会初始化RingBuffer:

1
2
3
RingBuffer( EventFactory<E> eventFactory,Sequencer sequencer){
super(eventFactory, sequencer);
}

RingBuffer是RingBufferFields子类:

1
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>

初始化RingBuffer时会先调用父类构造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
// 用于计算index的掩码,公式:seq & (ringBufferSize - 1) == index
this.indexMask = bufferSize - 1;
// 初始化RingBuffer数组
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
// 预填充RingBuffer数组
fill(eventFactory);
}

接着调用fill方法预填充数组,实现逻辑就是为数组的每个index填充一个对象实例。

1
2
3
4
5
private void fill(EventFactory<E> eventFactory){
for (int i = 0; i < bufferSize; i++){
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}

填充操作通过用户定义的eventFactory实现,该工厂一般写法为:

1
2
3
4
5
6
7
8
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
// new 一个空的orderEvent对象即可
// 就是为了返回空的event对象
return new OrderEvent();
}
}



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

Fork me on GitHub