Skyframe StateMachine 指南

报告问题 查看源代码

概览

Skyframe StateMachine 是位于堆上的解构函数对象。当所需值无法立即获得但会进行异步计算时,它支持在没有冗余的情况下进行灵活和评估1StateMachine 在等待时无法绑定线程资源,而必须挂起和恢复。因此,解构会公开显式重新入口点,以便跳过之前的计算。

StateMachine 可用于表示序列、分支、结构化逻辑并发,专为 Skyframe 交互量身定制。StateMachine 可以组合成较大的 StateMachine 并共用子 StateMachine。并发始终按构造进行分层,并且纯粹是逻辑。每个并发子任务都在单个共享父级 SkyFunction 线程中运行。

简介

本部分简要介绍了 java.com.google.devtools.build.skyframe.state 软件包中的 StateMachine

Skyframe 重启简介

Skyframe 是用于对依赖关系图执行并行评估的框架。 图中的每个节点都与 SkyFunction 的评估相对应,SkyKey 指定了其参数,而 SkyValue 指定了其结果。计算模型使得 SkyFunction 可通过 SkyKey 查找 SkyValues,从而触发对其他 SkyFunction 的递归并行评估。当请求的 SkyValue 因计算的某些子图不完整而尚未就绪时,阻塞线程会阻塞线程,而是会发出调用请求的 SkyFunction 观察到 null getValue 响应,并应返回 null 而非 SkyValue,表明因缺少输入而未完成。 当之前请求的所有 SkyValues 可用时,Skyframe 将重启 SkyFunction。

在引入 SkyKeyComputeState 之前,处理重启的传统方法是完全重新运行计算。虽然这种方法具有二次复杂性,但以这种方式编写的函数最终会完成,因为每次重新运行时,返回 null 的查询会更少。借助 SkyKeyComputeState,可以将手动指定的检查点数据与 SkyFunction 相关联,从而节省大量的重新计算工作。

StateMachine 是位于 SkyKeyComputeState 内的对象,通过公开挂起和恢复执行钩子,当 SkyFunction 重启(假设 SkyKeyComputeState 未从缓存中)重启时,几乎所有重新计算都消除了。

SkyKeyComputeState 内的有状态计算

从面向对象的设计角度来看,有必要考虑将计算对象存储在 SkyKeyComputeState 中,而不是将纯数据值存储在其中。在 Java 中,对行为承载对象的最基本描述是功能接口,事实证明这已经足够。StateMachine 具有以下定义(奇怪的是递归定义)。2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 接口类似于 SkyFunction.Environment,但它是专为异步设计的,添加了对逻辑并发子任务的支持3

step 的返回值是另一个 StateMachine,允许以归纳方式指定一系列步骤。当 StateMachine 完成时,step 会返回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

描述具有以下输出的 StateMachine

hello
world

请注意,方法引用 this::step2 也是 StateMachine,因为 step2 满足 StateMachine 的函数接口定义。方法引用是在 StateMachine 中指定下一个状态的最常用方法。

暂停和恢复

直观地说,将计算细分为 StateMachine 步骤而不是单体式函数,就提供了挂起和恢复计算所需的钩子。suspendsuspendStateMachine.step 返回时,会出现明确的暂停点。返回的 StateMachine 值指定的接续点是一个明确的恢复点。因此,可以避免重新计算,因为计算可以从上次停止的位置继续。

回调、接续和异步计算

从技术角度来讲,StateMachine 充当着“延续”continuation,可确定要执行的后续计算。StateMachine 可以从 step 函数返回(这会将控制权转回 Driver 实例)主动suspend,而不是阻塞。然后,Driver 可以切换到准备就绪的 StateMachine,或将控制权交还给 Skyframe。

一直以来,“回调”和“接续”都是一种概念。callbackscallbacks 不过,StateMachine 对两者进行了区分。

  • 回调 - 描述异步计算结果的存储位置。
  • 接续 - 指定下一个执行状态。

调用异步操作时需要回调,这意味着实际操作不会在调用该方法时立即发生,如 SkyValue 查询为例。回调应尽可能简单。

接续StateMachineStateMachine 返回值,用于封装所有异步计算解析完成后接下来的复杂执行。这种结构化方法有助于将回调的复杂性保持在可管理范围内。

待办事项

Tasks 接口为 StateMachine 提供 API,用于按 SkyKey 查找 SkyValues,以及安排并发子任务。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查询

StateMachine 使用 Tasks.lookUp 过载来查找 SkyValues。它们类似于 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,并且具有类似的异常处理语义。实现不会立即执行查找,而是先批量执行4查找尽可能多的查询。该值可能不会立即可用(例如需要重启 Skyframe),因此调用方会使用回调指定要对结果值执行什么操作。

StateMachine 处理器(Driver 和桥接至 SkyFrame)可保证在下一个状态开始之前该值可用。示例如下。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上面的示例中,第一步会查找 new Key(),并将 this 作为使用方传递。之所以能够这么做,是因为 DoesLookup 实现了 Consumer<SkyValue>

根据合同,在下一个状态 DoesLookup.processValue 开始之前,DoesLookup.step 的所有查找操作已经完成。因此,在 processValue 中访问 value 时,它可用。

子任务

Tasks.enqueue 请求执行逻辑上并发的子任务。子任务也是 StateMachine,可以执行常规 StateMachine 可以执行的任何操作,包括以递归方式创建更多子任务或查找 SkyValues。与 lookUp 类似,状态机驱动程序可确保所有子任务都已完成,然后再继续执行下一步。示例如下。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

虽然 Subtask1Subtask2 在逻辑上是并发的,但所有内容都在单个线程中运行,因此 i 的“并发”更新不需要任何同步。

结构化并发

由于每个 lookUpenqueue 都必须在进入下一个状态之前进行解析,这意味着并发自然局限于树结构。可以创建分层5 并发,如以下示例所示。

结构化并发

UML 中很难看出并发结构形成了树。有一种备用视图可以更好地显示树结构。

非结构化并发

结构化并发更易于推断。

组合和控制流模式

本部分举例说明了如何组合多个 StateMachine 以及如何解决某些控制流问题。

顺序状态

这是最常见且最直接的控制流模式。SkyKeyComputeState 内的有状态计算中对此进行了显示。

分支

您可以使用常规 Java 控制流返回不同的值,从而实现 StateMachine 中的分支状态,如以下示例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

某些分支常常返回 DONE,以便提前完成。

高级顺序合成

由于 StateMachine 控件结构是无内存的,因此将 StateMachine 定义作为子任务共享有时可能会很尴尬。让 M1M2 成为共用 StateMachineSStateMachine 实例,其中 M1M2 分别是序列 <A, S, B><X, S, Y>。问题在于,S 不知道在它完成后是继续执行 B 还是 Y,并且 StateMachine 并没有完全保留调用堆栈。本部分将介绍一些实现这一点的技巧。

StateMachine 作为终端序列元素

这并不能解决最初提出的问题。它仅在共享 StateMachine 是序列的终端时演示顺序组合。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是一个复杂的状态机,这也同样适用。

顺序合成的子任务

由于已加入队列的子任务必定会在下一个状态之前完成,因此有时可能会略微滥用6子任务机制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有时,滥用 Tasks.enqueue 是不可能的,因为在执行 S 之前必须完成其他并行子任务或 Tasks.lookUp 调用。在这种情况下,将 runAfter 参数注入 S 可用于通知 S 后续步骤。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

这种方法比滥用子任务更简洁。不过,过度使用(例如,使用 runAfter 嵌套多个 StateMachine)会进入 Callback Hell。最好改为拆分具有普通序列状态的顺序 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可替换为以下内容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止替代项:runAfterUnlessError

在之前的草稿中,我们考虑过 runAfterUnlessError,它会在遇到错误时中止。造成这种结果的原因是,错误通常最终会被检查两次,一次由具有 runAfter 引用的 StateMachine 检查,一次由 runAfter 机器本身检查。

经过一番考虑之后,我们认为代码的均匀性比对错误检查进行去重处理更重要。如果 runAfter 机制与 tasks.enqueue 机制的运作方式不一致(后者始终需要进行错误检查),则令人感到困惑。

直接委托

每当出现正式的状态转换时,主 Driver 循环都会推进。根据合同,推进状态意味着,先前加入队列的所有 SkyValue 查询和子任务都会在下一个状态执行之前解析。有时,代理 StateMachine 的逻辑会使阶段推进不必要或适得其反。例如,如果委托的第一个 step 执行可与委托状态查找并行处理的 SkyKey 查找,则阶段推进会使这些查询按顺序执行。如以下示例所示,执行直接委托可能更合理。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

数据流

前面讨论的重点是管理控制流。本部分介绍了数据值的传播。

实现 Tasks.lookUp 回调

您可以在 SkyValue 查询中实现 Tasks.lookUp 回调的示例。本部分介绍了处理多个 SkyValues 的原理和方法建议。

Tasks.lookUp 回调

Tasks.lookUp 方法接受回调 sink 作为参数。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

惯用方法是使用 Java lambda 来实现此:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

myValue 是执行查找的 StateMachine 实例的成员变量。不过,与在 StateMachine 实现中实现 Consumer<SkyValue> 接口相比,lambda 需要额外的内存分配。当存在多个可能不明确的查询时,lambda 仍然有用。

还有一些错误处理 Tasks.lookUp 过载,这与 SkyFunction.Environment.getValueOrThrow 类似。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

下面展示了一个实现示例。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

与不进行错误处理的查找一样,让 StateMachine 类直接实现回调可以节省 lamba 的内存分配。

错误处理要略微详细一些,但从本质上讲,错误的传播与正常值的传播之间没有太大区别。

使用多个 SkyValues

通常需要进行多次 SkyValue 查询。一种在大部分情况下都有效的方法是打开 SkyValue 的类型。以下是从原型生产代码中简化的示例。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

Consumer<SkyValue> 回调实现可以明确共享,因为值类型是不同的。如果不可用,则可以回退到基于 lambda 的实现或实现相应回调的完整内部类实例。

StateMachine 之间传播值

到目前为止,本文档仅说明了如何在子任务中安排工作,但子任务也需要将值报告给调用方。由于子任务在逻辑上是异步的,因此其结果会通过回调传达给调用方。为了实现此功能,子任务会定义一个通过其构造函数注入的接收器接口。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

调用方 StateMachine 将如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上面的示例演示了一些内容。Caller 必须将其结果传回并定义自己的 Caller.ResultSinkCaller 会实现 BarProducer.ResultSink 回调。恢复后,processResult 会检查 value 是否为 null,以确定是否发生了错误。这是在接受子任务或 SkyValue 查询的输出之后的一种常见行为模式。

请注意,acceptBarError 的实现会按照错误气泡的要求将结果即时转发给 Caller.ResultSink

Driver 与 SkyFunctions 桥接中介绍了顶级 StateMachine 的替代方案。

错误处理

Tasks.lookUp 回调StateMachines 之间传播值中已提供了几个错误处理示例。系统不会抛出 InterruptedException 以外的异常,而是作为值通过回调进行传递。此类回调通常具有独占性或语义,且恰好传递了一个值或错误之一。

下一部分将介绍与 Skyframe 错误处理机制之间的细微但重要的交互。

错误显示 (--nokeep_going)

在错误冒泡期间,即使请求的所有 SkyValues 不可用,SkyFunction 也可能会重启。在这种情况下,由于 Tasks API 协定,永远不会达到后续状态。不过,StateMachine 应该仍会传播异常。

由于无论是否达到下一个状态,都必须进行传播,因此错误处理回调必须执行此任务。对于内部 StateMachine,这通过调用父级回调来实现。

在与 SkyFunction 进行交互的顶层 StateMachine 中,可通过调用 ValueOrExceptionProducersetException 方法来完成此操作。然后,即使缺少 SkyValues,ValueOrExceptionProducer.tryProduceValue 也会抛出异常。

如果直接使用 Driver,即使机器尚未完成处理,也必须检查 SkyFunction 中传播的错误。

事件处理

对于需要发出事件的 SkyFunction,StoredEventHandler 会注入到 SkyKeyComputeState 中,并进一步注入到需要它们的 StateMachine 中。过去,之所以需要 StoredEventHandler,是因为 Skyframe 会丢弃某些事件(除非它们被重放),但此问题随后得到了修复。保留了 StoredEventHandler 注入,因为它可以简化错误处理回调发出的事件的实现。

Driver 与 SkyFunctions 桥接

Driver 负责管理 StateMachine 的执行,从指定的根 StateMachine 开始。由于 StateMachine 可以递归方式将子任务 StateMachine 加入队列,因此单个 Driver 可以管理多个子任务。这些子任务会创建树结构,这是结构化并发的结果。Driver 可跨子任务批量执行 SkyValue 查询,以提高效率。

有许多围绕 Driver 构建的类包含以下 API。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 接受一个根 StateMachine 作为参数。调用 Driver.drive 会尽可能在不重启 Skyframe 的情况下执行 StateMachine。当 StateMachine 完成时,它返回 true,否则返回 false,表示并非所有值都可用。

Driver 可保持 StateMachine 的并发状态,非常适合嵌入 SkyKeyComputeState

直接实例化 Driver

StateMachine 实现通常通过回调来传达其结果。可以直接实例化 Driver,如以下示例所示。

Driver 与要进一步定义的相应 ResultSink 的实现一起嵌入到 SkyKeyComputeState 实现中。在顶层,State 对象是计算结果的适当接收器,因为它保证比 Driver 存在更长的时间。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下代码会大致描绘 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

那么,用于延迟计算结果的代码可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入“Driver

如果 StateMachine 生成一个值并且没有引发任何异常,则嵌入 Driver 是另一种可能的实现,如以下示例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 可能包含如下所示的代码(其中 State 是特定于函数类型的 SkyKeyComputeState)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 实现中嵌入 Driver 更适合 Skyframe 的同步编码样式。

可能会产生异常的 StateMachine

否则,存在 SkyKeyComputeState 嵌入的 ValueOrExceptionProducerValueOrException2Producer 类,这些类具有与同步 SkyFunction 代码匹配的同步 API。

ValueOrExceptionProducer 抽象类包括以下方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

它包含一个嵌入式 Driver 实例,与嵌入驱动程序中的 ResultProducer 类非常相似,并以类似的方式与 SkyFunction 进行交互。当出现以下任一情况时,实现会调用 setValuesetException,而不是定义 ResultSink。如果两种情况都有,则会优先处理例外情况。tryProduceValue 方法将异步回调代码桥接到同步代码,并在设置了异常时抛出异常。

如前所述,在错误冒泡期间,即使机器尚未完成运行,也可能会出现错误,因为并非所有输入都可用。为了应对这种情况,tryProduceValue 会抛出任何设定的异常,甚至在机器完成之前也会抛出。

结语:最终移除回调

StateMachine 是一种执行异步计算的高效但样板密集型方式。延续功能(特别是传递给 ListenableFutureRunnable 形式)广泛应用于 Bazel 代码的某些部分,但在分析 SkyFunction 中并不常见。分析主要受 CPU 限制,并且没有用于磁盘 I/O 的高效异步 API。最终,最好是优化远离回调,因为它们具有学习曲线,会妨碍可读性。

Java 虚拟线程是最具潜力的替代方案之一。完全被同步阻塞调用所取代,而不必编写回调。之所以能够这么做,是因为与平台线程不同,绑定虚拟线程资源应该很便宜。但是,即使使用虚拟线程,用线程创建和同步基元取代简单的同步操作成本也太高。我们执行了从 StateMachineJava 虚拟线程的迁移,结果速度减慢了几个数量级,导致端到端分析延迟时间增加了近 3 倍。由于虚拟线程仍是预览版功能,因此日后性能有所提升时,还可以执行此迁移。

另一种需要考虑的方法是等待 Loom 协程(如果它们可用)。这种方法的优势在于,使用协作式多任务处理或许可以降低同步开销。

如果所有其他方法都失败了,低级别字节码重写也可能是可行的替代方案。通过充分的优化,或许可以实现接近手写回调代码的性能。

附录

回调地狱

回调地狱是使用回调的异步代码中的一个令人担忧的问题。源于一个事实,即后续步骤的延续操作嵌套在前一个步骤中。如果涉及许多步骤,则此嵌套可能会非常深。如果与控制流结合,则代码将变得难以管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

嵌套实现的优点之一是可以保留外部步骤的堆栈帧。在 Java 中,捕获的 lambda 变量必须是最终的,因此使用此类变量可能会很麻烦。通过将方法引用作为接续(而不是 lambda)返回以避免深层嵌套,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 注入模式使用过于密集,也可能会出现回调异常,但通过按顺序步骤穿插注入可以避免这种情况。

示例:链式 SkyValue 查询

通常,应用逻辑需要依赖的 SkyValue 查询链,例如,如果第二个 SkyKey 依赖于第一个 SkyValue,就会出现这种情况。简单来说,这会产生一个复杂的深层嵌套回调结构。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不过,由于接续被指定为方法引用,因此代码在状态转换之间看起来是程序化的:step2 遵循 step1。请注意,这里使用 lambda 分配 value2。这使代码顺序与计算从上到下的顺序一致。

其他提示

可读性:执行排序

为了提高可读性,请尽量确保 StateMachine.step 实现在代码中的传递位置之后紧跟执行顺序和回调实现。在控制流分支时,这并非总是可行。在这种情况下,其他意见可能会有帮助。

示例:链式 SkyValue 查找中,为实现此目的,创建了一个中间方法引用。这会牺牲少量性能来换取可读性,这在这里可能是值得的。

生成假设

适度存在的 Java 对象打破了 Java 垃圾回收器的分代假设,该回收器用于处理生命周期极短的对象或永久存在的对象。根据定义,SkyKeyComputeState 中的对象违反了这一假设。此类对象包含位于 Driver 的所有仍在运行的 StateMachine 的构造树,在它们挂起时具有中间生命周期,等待异步计算完成。

在 JDK19 中,这似乎不太糟糕,但在使用 StateMachine 时,即使实际生成的垃圾回收量大幅减少,也可能观察到 GC 时间增加。由于 StateMachine 具有中间生命周期,因此它们可能会被提升到旧代,导致其填充速度更快,因此需要清理费用更高的主要 GC 或完整 GC。

最初的预防措施是尽量减少使用 StateMachine 变量,但有时不可行,例如需要跨多个状态指定值时。在可能的情况下,本地堆栈 step 变量是新生代变量,可以高效地进行 GC 操作。

对于 StateMachine 变量,将内容分解为子任务并遵循StateMachine 之间传播值的建议模式也很有用。请注意,遵循此模式时,只有子 StateMachine 引用了父 StateMachine,反之亦然。这意味着,当子级使用结果回调完成和更新父级时,子级会自然地超出作用域,并符合 GC 的条件。

最后,在某些情况下,早期状态需要 StateMachine 变量,但后状态不需要。如果已经知道不再需要对大型对象的引用,将其清零,这样做会很有帮助。

命名状态

为方法命名时,通常可以针对该方法中发生的行为命名。目前不太清楚如何在 StateMachine 中执行此操作,因为没有堆栈。例如,假设方法 foo 调用子方法 bar。在 StateMachine 中,它可以转换为状态序列 foo,后跟 barfoo 不再包含 bar 行为。因此,状态的方法名称范围往往较小,有可能反映出本地行为。

并发树图

下面是结构化并发部分中的图表的替代视图,可以更好地描述树结构。这些方块构成了一棵小树。

结构化并发 3D


  1. Skyframe 的惯例是在没有值时从头开始重启。

  2. 请注意,step 可以抛出 InterruptedException,但这些示例省略了此操作。Bazel 代码中有一些低级方法会抛出此异常,并会将此异常传播到运行 StateMachineDriver(稍后将对其进行介绍)。无需声明在不需要时抛出。

  3. 并发子任务由 ConfiguredTargetFunction 驱动,后者为每个依赖项执行独立工作。每个依赖项都有自己的独立 StateMachine,而不是操纵同时处理所有依赖项的复杂数据结构,从而导致效率低下。

  4. 一个步骤中的多个 tasks.lookUp 调用会一起进行批处理。通过在并发子任务中执行的查询,可以创建额外的批处理。

  5. 这在概念上类似于 Java 的结构化并发 jeps/428

  6. 此操作与生成线程并联接该线程以实现顺序组合类似。