使用双异步后,如何保证数据一致性?

发布时间:2025-05-18 00:31:21 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:来源:哪吒编程 unsetunset一、前情提要unsetunset 在上一篇文章中,我们使用双异步后,从 191s 优化到 2s,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢? 很简单,通过对比Excel文件行数和入库数量是否相等即

来源:哪吒编程

unsetunset一、前情提要unsetunset

在上一篇文章中,我们使用双异步后,从 191s 优化到 2s,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?

很简单,通过对比Excel文件行数和入库数量是否相等即可。

那么,如何获取异步线程的返回值呢?

unsetunset二、通过Future获取异步返回值unsetunset

我们可以通过给异步方法添加Future返回值的方式获取结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer实现的

AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS实现的同步器包含两种操作:

acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。

2、FutureTask执行流程

执行@Async异步方法;建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);判断状态state;如果未新建或者不处于AQS,直接返回;否则进入COMPLETING状态,执行异步线程代码;如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;线程async-executor-1被唤醒后将自己从AQS队列中移除;然后唤醒next线程async-executor-2;改变线程async-executor-1的state;等待get()线程取值。next等待线程被唤醒后,循环线程async-executor-1的步骤被唤醒从AQS队列中移除唤醒next线程改变异步线程状态新建线程async-executor-N,监听异步方法的state如果处于EXCEPTIONAL以上状态,抛出异常;如果处于COMPLETING状态,加入AQS队列等待;如果处于NORMAL状态,返回结果;

3、get()方法执行流程

get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。

自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

如果state小于等于COMPLETING,表示任务还在执行中;计算超时时间;如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;阻塞队列nanos毫秒。如果已有等待节点WaitNode,将线程置空;返回当前状态;如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;如果state大于COMPLETING;如果任务正在执行,让出时间片;如果还未构造等待节点,则new一个新的等待节点;如果未入队列,CAS尝试入队;如果有超时时间参数;否则阻塞队列;如果state大于COMPLETING;如果执行完毕,返回结果;如果大于等于取消状态,则抛出异常。

很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~

其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~

简而言之:

1. 如果异步线程还没执行完,则进入CAS自旋;

2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程;

3. 再通过get()判断状态state;

4. 直至返回结果或(取消、超时、异常)为止。

unsetunset三、FutureTask源码具体分析unsetunset

1、FutureTask源码

通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。

public interface RunnableFuture<Vextends RunnableFuture<V

{

    

/**

     * Sets this Future to the result of its computation

     * unless it has been cancelled.

     */
    void run()

;

}

public class FutureTask<Vimplements RunnableFuture<V

{

 // 最初始的状态是new 新建状态 private volatile int

 state;

    private static final int NEW          = 0// 新建状态    private static final int COMPLETING   = 1// 完成中    private static final int NORMAL       = 2// 正常执行完    private static final int EXCEPTIONAL  = 3// 异常    private static final int CANCELLED    = 4// 取消    private static final int INTERRUPTING = 5// 正在中断    private static final int INTERRUPTED  = 6// 已中断 public V get() throws InterruptedException, ExecutionException 

{

     int

 s = state;

     // 任务还在执行中     if

 (s <= COMPLETING)

         s = awaitDone(false0L

);

     return

 report(s);

 }

 private int awaitDone(boolean timed, long nanos)        throws InterruptedException 

{

        final long deadline = timed ? System.nanoTime() + nanos : 0L

;

        WaitNode q = null

;

        boolean queued = false

;

        for

 (;;) {

         // 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常            if

 (Thread.interrupted()) {

                removeWaiter(q);

                throw new

 InterruptedException();

            }

            int

 s = state;

            // 任务已执行完毕或取消            if

 (s > COMPLETING) {

             // 如果已有等待节点WaitNode,将线程置空                if (q != null

)

                    q.thread = null

;

                return

 s;

            }

            // 任务正在执行,让出时间片            else if (s == COMPLETING) // cannot time out yet

                Thread.yield();

            // 还未构造等待节点,则new一个新的等待节点            else if (q == null

)

                q = new

 WaitNode();

            // 未入队列,CAS尝试入队            else if

 (!queued)

                queued = UNSAFE.compareAndSwapObject(this

, waitersOffset,

                                                     q.next = waiters, q);

            // 如果有超时时间参数            else if

 (timed) {

             // 计算超时时间

                nanos = deadline - System.nanoTime();

                // 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state                if (nanos <= 0L

) {

                    removeWaiter(q);

                    return

 state;

                }

                // 阻塞队列nanos毫秒                LockSupport.parkNanos(this

, nanos);

            }

            else             // 阻塞队列                LockSupport.park(this

);

        }

    }

 private V report(int s) throws ExecutionException 

{

  // 获取outcome中记录的返回结果

        Object x = outcome;

        // 如果执行完毕,返回结果        if

 (s == NORMAL)

            return

 (V)x;

            // 如果大于等于取消状态,则抛出异常        if

 (s >= CANCELLED)

            throw new

 CancellationException();

        throw new

 ExecutionException((Throwable)x);

    }

}

2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;@Async("async-executor"

)

public void readXls(String filePath, String filename) 

{

    try

 {

     // 此代码为简化关键性代码        List<Future<Integer>> futureList = new

 ArrayList<>();

        for (int time = 0

; time < times; time++) {

            Future sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();

            futureList.add(sumFuture);

        }

    }catch

 (Exception e){

        logger.error("readXlsCacheAsync---插入数据异常:"

,e);

    }

}

@Async("async-executor"

)

public Future<Integer> readXlsCacheAsync() 

{

    try

 {

        // 此代码为简化关键性代码        return new

 AsyncResult<>(sum);

    }catch

 (Exception e){

        return new AsyncResult<>(0

);

    }

}

3、通过Future<Integer>.get()获取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow)

{

    int[] futureSumArr = new int

[futureList.size()];

    for (int i = 0

;i

        try

 {

            Future future = futureList.get(i);

            while (true

) {

                if

 (future.isDone() && !future.isCancelled()) {

                    Integer futureSum = future.get();

                    logger.info("获取Future返回值成功"+"----Future:"

 + future

                            + ",Result:"

 + futureSum);

                    futureSumArr[i] += futureSum;

                    break

;

                } else

 {

                    logger.info("Future正在执行---获取Future返回值中---等待3秒"

);

                    Thread.sleep(3000

);

                }

            }

        } catch

 (Exception e) {

            logger.error("获取Future返回值异常: "

, e);

        }

    }

    boolean

 insertFlag = getInsertSum(futureSumArr, excelRow);

    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="

+insertFlag);

    return

 insertFlag;

}

4、这里也可以通过新线程+Future获取Future返回值

不过感觉多此一举了,就当练习Future异步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) 

{

    ExecutorService service = Executors.newSingleThreadExecutor();

    final boolean[] insertFlag = {false

};

    service.execute(new

 Runnable() {

        public void run() 

{

            try

 {

                insertFlag[0

] = getFutureResult(futureList, excelRow);

            } catch

 (Exception e) {

                logger.error("新线程+Future获取Future返回值异常: "

, e);

                insertFlag[0] = false

;

            }

        }

    });

    service.shutdown();

    return new AsyncResult<>(insertFlag[0

]);

}

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!