多线程之FutureTask源码解读
网站首页 文章专栏 多线程之FutureTask源码解读
多线程之FutureTask源码解读
编辑时间:2019-05-22 10:52 作者:毛毛小妖 浏览量:103 评论数:0

上一篇我们解读了Future的源码,这一篇我们来解读一下他的一个唯一的实现类FutureTask。FutureTask是可取消的异步计算,这个类提供了Future的基本实现,通过实现的方法来启动和取消计算任务,判断计算是否完成并返回计算结果。只有当计算结束才会返回结果,在计算还没完成时,get方法会一直阻塞。一旦计算完成,计算就不能被重启或取消了,除非调用runAndReset方法。

FutureTask可以用来覆盖Callable或者Runnable对象。因为FutureTask实现了Runnable接口,因此他可以使用线程池Executor。

除了作为独立的类,这个类还提供了protect修饰的方法,当创建特定的任务时是十分有用的。下面我们据欸他看一下这个类里的属性和方法

1、状态state

state变量表示任务运行的状态,初始化为NEW。状态的转换使用set,setException和cancel方法。在任务运行期间,这个状态可能会出现瞬时的COMPLETING或者INTERRUPTING状态,状态的转换使用简单的顺序写入,因为这些状态值都是唯一的并且不会被修改。可能的状态转换有以下几种。

1>NEW -> COMPLETING -> NORMAL

2>NEW -> COMPLETING -> EXCEPTIONAL

3>NEW -> CANCELLED

4>NEW -> INTERRUPTING -> INTERRUPTED

各状态的定义

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

2、创建FutureTask

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

这个方法我们可以指定callable

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

这个方法我们可以指定Runnable,在计算完成时返回执行的结果。

3、获取执行结果

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

4、执行方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

5、设置状态

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}


protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

6、重启任务

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

 

来说两句吧
最新评论
    还没有人评论哦,快来坐沙发吧!