文章目录
  1. 1. 1.Aeron是什么
  2. 2. 2.Aeron架构
    1. 2.1. 基本代码案例
      1. 2.1.1. 完整的代码
      2. 2.1.2. 运行结果
    2. 2.2. 代码剖析
      1. 2.2.1. 参数构造
      2. 2.2.2. 构造Aeron对象
      3. 2.2.3. 发送消息
      4. 2.2.4. 接收消息
    3. 2.3. 下一篇

本系列基于Aeron cookbook编写,通过对原版英文文档进行翻译,加入笔者的一些思考,努力为Aeron提供一份可参考可阅读的中文参考资料。

话不多说,开始我们的学习之旅。

1.Aeron是什么

Aeron是一个开源高性能消息传输机制(单向),它支持高效可靠的UDP单播、UDP多播和IPC消息传输。

Aeron 以可预测的延迟,有效地跨进程或跨网络边界复制有序日志缓冲区。
Aeron API 提供了三个主要交互点

  1. Media Driver 媒体驱动程序
  2. Publications 发布
  3. Subscriptions 订阅

Media Driver 当前支持通过 UDP 和 IPC 进行传输。

UDP套接字交互可以卸载到 Open Onload,如果必要的驱动程序和硬件可供使用,

除了核心 IPC 和 UDP 功能外,Aeron 还提供:

  • Aeron Archive,它为流提供磁盘支持的存储
  • Aeron Cluster,允许开发人员构建基于 RAFT 协议的容错服务

思考
由于Aeron天然支持RAFT协议,并且具备高性能的消息处理能力,因此多使用在金融交易领域,它同时提供了高性能和高可靠的特性。(事实上,Aeron正是real-logic开发的,该团队的另一款代表作disruptor在金融高频交易场景被大规模应用,其超高性能超低延时得到了业界的普遍赞誉。)

从这里也可以想到,Aeron本身属于有状态服务的范畴,所以学习曲线、开发曲线、运维难度都高于传统的无状态服务框架;

2.Aeron架构

Media Driver 和 Aeron 客户端 API 构成了 Aeron 的核心。

Aeron Archive 和 Aeron Cluster 添加其他组件,如存档概览集群概览中所述。

基本代码案例

下面是一个完整的跨IPC发送消息的方法。

为了让它尽可能简单,发布者和订阅者在同一个线程上——但这在现实世界的应用程序中是没有意义的。

尽管如此,它还是很有用的,因为它显示了最小 Aeron 应用程序的完整设置。

该代码可以在 Aeron-ipc 项目的 SimplestCase.java 文件中找到。

作为示例的一部分,我们将构建 4 个部分:

  • Aeron API;
  • Media Driver 媒体驱动程序;
  • a Subscription; 一个发布者
  • a Publication 一个订阅者

完整的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AeronSimpleDemo {
public static void main(String[] args) {
String channel = "aeron:ipc";
String message = "this is a new Message.";
IdleStrategy idleStrategy = new SleepingIdleStrategy();
UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(256));
try (
MediaDriver driver = MediaDriver.launch();
Aeron aeron = Aeron.connect();
Subscription sub = aeron.addSubscription(channel, 10);
Publication pub = aeron.addPublication(channel, 10);
) {
while (!pub.isConnected()) {
// 连接失败,阻塞
idleStrategy.idle();
}
unsafeBuffer.putStringAscii(0, message);
System.out.println("sending:" + message);
// 尝试发布
while (pub.offer(unsafeBuffer) < 0) {
idleStrategy.idle();
}
FragmentHandler handler = ((buffer, offset, length, header) -> {
System.out.println("received:" + buffer.getStringAscii(offset));
});
while (sub.poll(handler, 1) <= 0) {
idleStrategy.idle();
}
}
}
}

运行结果

1
2
sending:this is a new Message.
received:this is a new Message.

代码剖析

参数构造

首先进行参数构造,主要是这几行

1
2
3
4
String channel = "aeron:ipc";
String message = "this is a new Message.";
IdleStrategy idleStrategy = new SleepingIdleStrategy();
UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(256));
  • 第 1 行指定 Aeron 发布者和订阅者进行通信的通道。这是使用 Aeron URL 方案定义的,本示例中的通道为IPC 通道。另请参阅有关频道配置的 Aeron Wiki https://github.com/real-logic/aeron/wiki/Channel-Configuration
  • 第 2 行指定发布者要发送的消息内容。
  • 第 3 行指定要使用的空闲策略,在本例中是 SleepingIdleStrategy,该策略将线程停顿 1 微秒(默认)。
  • 第 4 行提供了一个堆外缓冲区,该缓冲区(UnsafeBuffer)是不安全的缓冲区,它主要用来保存要发送的消息。为简单起见,我们为它分配了 256 个字节,当然,这个大小是可以调整的。

另请参阅:代理和空闲策略-Agents & Idle Strategies

构造Aeron对象

1
2
3
4
5
try (
MediaDriver driver = MediaDriver.launch();
Aeron aeron = Aeron.connect();
Subscription sub = aeron.addSubscription(channel, 10);
Publication pub = aeron.addPublication(channel, 10);

这几行代码创建了用于发送和接收数据的关键Aeron对象。

  • 第 1 行创建媒体驱动程序(MediaDriver)。 Media Driver负责所有的IPC和网络活动,后面会详细讨论
  • 第 2 行创建 Aeron 对象。这是应用程序中用于与 Aeron 交互的主要 API
  • 第 3 行创建消息的订阅者,用于轮询接收消息
  • 第 4 行创建消息发布者,用于消息的发布

另请参阅:媒体驱动程序-Media Driver

发送消息

1
2
3
4
5
6
7
8
9
10
1. while (!pub.isConnected()) {
// 连接失败,阻塞
2. idleStrategy.idle();
3. }
4. unsafeBuffer.putStringAscii(0, message);
5. System.out.println("sending:" + message);
6. // 尝试发布
7. while (pub.offer(unsafeBuffer) < 0) {
8. idleStrategy.idle();
9. }

这些代码行负责发布消息。

首先,应用程序等待发布者(Subscription)达到连接状态。 当连接成功之后,将消息写入不安全缓冲区(unsafeBuffer),最后将缓冲区提供给发布者。

下面对关键代码行进行解释:

  • 仅在连接建立成功后,第 1 行(pub.isConnected())才返回 true。 这是在一个while循环中轮询的过程,每个循环操作之间有1微秒的暂停,直到连接成功(底层代码为LockSupport.park实现的)。
  • 第 7 行(pub.offer(unsafeBuffer) < 0)为发布者提供了缓冲区,发布者通过offer方法将message发布出去。 当返回的值小于零时,某种机制会阻止发布者往缓冲区写入数据。 空闲策略再次轮询直到发布者可以继续写入缓冲区。

接收消息

1
2
3
4
5
6
1. FragmentHandler handler = ((buffer, offset, length, header) -> {
2. System.out.println("received:" + buffer.getStringAscii(offset));
3. });
4. while (sub.poll(handler, 1) <= 0) {
5. idleStrategy.idle();
6. }

代码的最后一部分负责接受来自订阅的消息。

首先,声明一个 FragmentHandler 用于接受消息,然后订阅者持续进行轮询,直到 Aeron将消息传递过来。

下面对关键行进行分析:

  • 第 1-2 行构造了 FragmentHandler。 在这个场景下,我们知道它是一个简单的小消息体,因此我们使用了最基本的FragmentHandler来接受消息。 如果消息体比较大,那么可以使用FragmentAssembler重新组装消息。
  • 第 4 行轮询数据订阅。 与发布过程一样,poll的返回值若等于或小于零表明没有收到消息,则以微秒空闲时间进行轮询。

下一篇

Aeron-CookBook学习之路-MediaDriver概述



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 1.Aeron是什么
  2. 2. 2.Aeron架构
    1. 2.1. 基本代码案例
      1. 2.1.1. 完整的代码
      2. 2.1.2. 运行结果
    2. 2.2. 代码剖析
      1. 2.2.1. 参数构造
      2. 2.2.2. 构造Aeron对象
      3. 2.2.3. 发送消息
      4. 2.2.4. 接收消息
    3. 2.3. 下一篇
Fork me on GitHub