文章目录
  1. 1. 核心流程
  2. 2. 代码实现
    1. 2.1. 卖个关子,先跑一下样例看看效果
    2. 2.2. 这么优秀的功能,是如何实现的呢?
    3. 2.3. 数据加载监听器:DataCheckingOnLoadHashListener
    4. 2.4. 数据一致性定义监听器:DataCheckingConsistenceListener
    5. 2.5. 对账核心处理器:DataCheckingDefaultProcessor
      1. 2.5.1. handleCheckByHashStrategy对账核心逻辑
    6. 2.6. 为什么这样设计?
    7. 2.7. 总结

在笔者的公众号上发布了关于 对账 业务分析的一篇文章,, 该文也是笔者新书中的一节内容。

本文作为补充,我们从实战角度,从代码角度呈现一个对账框架的实现。

注意:本文中提供的对账框架为广义上的对账,也就是说不局限于支付、交易领域的对账场景,凡是需要通过数据比对进行数据校准、比对、核准的场景,均能够采用本文提供的思路进行实现。

核心流程

首先提供一张框架的核心流程图:

framework.png

通过该流程图可以看到,我们将获取对账基准数据、获取对账目标数据、定义平账依据、异常数据修复等功能通过接口方式提供了扩展点给用户,方便用户根据自己的业务特点进行定制化的开发,这里是对“开闭原则”的实践。

图中绿色部分为调用者实现的代码逻辑,橙色部分为框架自身的业务逻辑。具体的实现细节在接下来的代码实现中将会详细分析。

代码实现

最好的描述实际上还是代码,我们根据上文中的流程图,提供代码实现。

卖个关子,先跑一下样例看看效果

我们先通过运行样例代码,看一下框架实际的使用效果。

具体的调用代码如下:

public static void main(String[] args) {
    // 声明对账处理器
    DataCheckingDefaultProcessor processor = new DataCheckingDefaultProcessor();
    // 执行对账操作
    testHashStrategy(processor);
}

private static void testHashStrategy(DataCheckingDefaultProcessor processor) {
    // 加载Map对账监听器实现
    DataCheckingOnLoadHashListener dataCheckingOnLoadHashListener = new DataCheckingOnLoadHashListenerImpl();
    // 加载数据一致性对比监听器实现
    DataCheckingConsistenceListener dataCheckingConsistenceListener = new DataCheckingConsistenceListenerImpl();

    // 依赖注入
    processor.setDataCheckingConsistenceListener(dataCheckingConsistenceListener);
    processor.setDataCheckingOnLoadHashListener(dataCheckingOnLoadHashListener);

    // 执行对账
    processor.execute();
    // 打印结果
    System.out.println(processor.printCheckResult("测试对账"));
}

具体调用方式,注释写的比较清楚了,我们直接关注一下控制台输出:

模拟执行数据修复
[测试对账],successCount:34,failureCount:66,doubleCheckSuccessCount:0,doubleCheckFailureCount:0

Process finished with exit code 0

可见输出了对账的结果,有34个成功,66个不一致

这么优秀的功能,是如何实现的呢?

通过样例代码可以看出,用户只需要实现对基准数据、目标数据的获取,以及定义平账的依据,框架就能够自动的对基准数据和目标数据进行比对。

并且如果我们定义了数据修复能力,框架还能够实现对数据的修复功能。

数据加载监听器:DataCheckingOnLoadHashListener

public interface DataCheckingOnLoadHashListener<K> {

    /**
    * 加载对账数据到Map,中,如果采用hash结构对账,则实现该方法
    * Map:
    *      key:    关联id
    *      value:  BasicCheckData 基准对账实体
    * @return
    */
    Map<K, BasicCheckData> loadBasicData2Map();

    /**
    * 加载对账数据到Map,中,如果采用hash结构对账,则实现该方法
    * Map:
    *      key:    关联id
    *      value:  TargetCheckData 目标对账实体
    * @return
    */
    Map<K, TargetCheckData> loadTargeDataMap();
}

该接口用于加载对账基准数据与对账的目标数据到内容中,加载之后的数据结构为Map,key为业务唯一标识。

数据一致性定义监听器:DataCheckingConsistenceListener

public interface DataCheckingConsistenceListener<T> {

    /**
    * 是否一致
    * @param basicCheckEntity
    * @param targetCheckEntity
    * @return
    */
    boolean isCheckConsistent(BasicCheckData basicCheckEntity, TargetCheckData targetCheckEntity);

    /**
    * 数据修复
    */
    void fixData();

    /**
    * 是否需要二次对账
    * @return
    */
    boolean needDoubleCheck();
}

DataCheckingConsistenceListener为数据一致性声明的监听器接口,主要方法为 isCheckConsistent ,该方法需要调用者实现,根据具体业务场景定义对账是否成功,成功返回true,不一致则返回false。

对账核心处理器:DataCheckingDefaultProcessor

DataCheckingDefaultProcessor为对账的核心处理器。

public DataCheckingDefaultProcessor(DataCheckingOnLoadHashListener dataCheckingOnLoadHashListener,
                                    DataCheckingConsistenceListener dataCheckingConsistenceListener) {
    Preconditions.checkNotNull(dataCheckingOnLoadHashListener);
    Preconditions.checkNotNull(dataCheckingConsistenceListener);
    this.dataCheckingOnLoadHashListener = dataCheckingOnLoadHashListener;
    this.dataCheckingConsistenceListener = dataCheckingConsistenceListener;
}

首先是DataCheckingDefaultProcessor的构造方法,它接收DataCheckingOnLoadHashListener和DataCheckingConsistenceListener的实例。

调用者通过实现接口,并将接口的实现通过该构造方法注入到DataCheckingDefaultProcessor之中。

/**
 * 执行对账
 */
public void execute() {
    check();
}

/**
 * 执行对账
 */
private void check() {
    // 对账前数据准备
    Map<String, BasicCheckData> basicCheckDataMap = dataCheckingOnLoadHashListener.loadBasicData2Map();
    Map<String, TargetCheckData> targetCheckDataMap = dataCheckingOnLoadHashListener.loadTargeDataMap();
    Preconditions.checkNotNull(basicCheckDataMap);
    Preconditions.checkNotNull(targetCheckDataMap);

    // 执行对账
    handleCheckByHashStrategy(basicCheckDataMap, targetCheckDataMap, successCount, failureCount);

    // 需要二次校验则二次校验
    if (dataCheckingConsistenceListener.needDoubleCheck()) {
        handleCheckByHashStrategy(basicCheckDataMap, targetCheckDataMap, doubleCheckSuccessCount, doubleCheckFailureCount);
    }

    // 数据修复
    dataCheckingConsistenceListener.fixData();
}

check()方法为核心的对账逻辑。

  1. 首先通过DataCheckingOnLoadHashListener加载调用者回传的对账基准Map,以及对账目标Map;两个Map的key均为业务唯一标识;
  2. 接着通过 handleCheckByHashStrategy 执行对账操作,对两个Map进行比对;
  3. 如果调用方允许二次校验,则再次执行一次对账操作;
  4. 执行业务方实现的数据修复接口,根据业务方的数据修复逻辑进行数据修复。这里实际上是需要通过一个上下文对象将待修复的数据回传给框架,暂时未实现,就留给读者自行实现吧。(提示: 通过定义一个上下文对象,通过fixData方法入参传递给调用者)

handleCheckByHashStrategy对账核心逻辑

我们重点关注一下handleCheckByHashStrategy对账核心逻辑:

/**
 * hash结构对账逻辑
 * @param basicCheckDataMap
 * @param targetCheckDataMap
 * @param successCount
 * @param failureCount
 */
private void handleCheckByHashStrategy(Map<String, BasicCheckData> basicCheckDataMap,
                                       Map<String, TargetCheckData> targetCheckDataMap,
                                       AtomicLong successCount,
                                       AtomicLong failureCount) {
    for (Map.Entry<String, BasicCheckData> checkEntry : basicCheckDataMap.entrySet()) {
        String checkEntryKey = checkEntry.getKey();

        BasicCheckData basicCheckData = checkEntry.getValue();
        if (basicCheckData == null) {
            failureCount.incrementAndGet();
            continue;
        }

        TargetCheckData targetCheckData = targetCheckDataMap.get(checkEntryKey);
        if (targetCheckData == null) {
            failureCount.incrementAndGet();
            continue;
        }

        // 校验checkEntryKey是否与对账实体的id一致
        String basicCheckBizId = basicCheckData.getCheckBizId();
        String targetCheckBizId = targetCheckData.getCheckBizId();
        if (!isCheckBizIdEqual(checkEntryKey, basicCheckBizId, targetCheckBizId)) {
            throw new DataCheckRuntimeException("checkEntryKey must equals basicCheckBizId and checkEntryKey must equals targetCheckBizId!");
        }

        // 执行对账
        if (!dataCheckingConsistenceListener.isCheckConsistent(basicCheckData, targetCheckData)) {
            failureCount.incrementAndGet();
            continue;
        }

        successCount.incrementAndGet();
    }
}

通过逻辑,我们能够清楚的看到,对账核心逻辑实际上是通过对基准对账Map进行迭代,去key(对账唯一依据)所在的目标对账Map查找对应的目标对账实体,并将基准对账实体与目标对账实体进行比对。

比对的核心逻辑为用户实现的 DataCheckingConsistenceListener.isCheckConsistent 方法,框架根据用户返回的数据是否一致的标识,决定是对账成功或者失败,并进行相关数据的统计。

为什么这样设计?

实际上,我在开发这个对账框架的时候,是参考了RocketMQ中的某些思想。

RocketMQ在进行消息消费时,允许用户通过实现消费监听器接口,来完成不同种类的消费逻辑,比如说通过实现MessageListenerConcurrently接口,实现并发消息消费。

我开发的对账框架中,也通过定义监听器接口,让用户自定义数据加载、数据比对的具体逻辑,将业务相关的逻辑交给用户,框架本身只关注不变的东西。

这里实际上就是一种依赖倒置思想的集中体现,即:

1、上层模块不应该依赖底层模块,它们都应该依赖于抽象。

2、抽象不应该依赖于细节,细节应该依赖于抽象。

框架通过依赖抽象,做到了对用户具体逻辑的灵活适配。对框架而言,通过依赖抽象,提高了扩展性,完全不需要耦合具体的实现;对使用者而言,只需要实现框架提供的回调方法,不需要关注框架是如何对自己编写的业务代码进行调用的,这实际上就是依赖倒置带来的优越性。做到了对扩展开放。

实际上,回调接口/方法就是一种介于框架和使用者之间的协议,只要遵从该协议进行开发,就能够达到完成业务逻辑的目的。

总结

本文中,我们通过编写一个简单的对账框架,实现了广义对账场景。

同时通过对代码进行分析,也体会到了面向对象设计原则带来的好处,希望本文能够对你有所帮助,更多精彩的内容敬请持续关注我,也欢迎你关注公众号“分布式朝闻道”,与我一同感受代码之美,技术之貌。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 核心流程
  2. 2. 代码实现
    1. 2.1. 卖个关子,先跑一下样例看看效果
    2. 2.2. 这么优秀的功能,是如何实现的呢?
    3. 2.3. 数据加载监听器:DataCheckingOnLoadHashListener
    4. 2.4. 数据一致性定义监听器:DataCheckingConsistenceListener
    5. 2.5. 对账核心处理器:DataCheckingDefaultProcessor
      1. 2.5.1. handleCheckByHashStrategy对账核心逻辑
    6. 2.6. 为什么这样设计?
    7. 2.7. 总结
Fork me on GitHub