Skyframe StateMachine 指南

报告问题 查看来源 每晚 · 7.2。 · 7.1敬上 · 7.0。 · 6.5 · 6.4

概览

Skyframe StateMachine 是驻留在 上的解构函数对象 堆。当出现以下情况时,它支持灵活评估,且无需冗余1 所需值不能立即使用,而是异步计算。通过 StateMachine 在等待时无法占用线程资源,而是必须 暂停和恢复。因此,解构会暴露出显式重新输入 这样可以跳过之前的计算。

StateMachine 可用于表示序列、分支、结构化逻辑 并专门针对 Skyframe 交互进行了定制。 StateMachine 可以组合成更大的 StateMachine 并共享 子 StateMachine。并发始终按构造和 逻辑。每个并发子任务在单个共享父任务中运行 SkyFunction 线程。

简介

本部分简要激励并介绍了 StateMachine,可在 java.com.google.devtools.build.skyframe.state 软件包。

Skyframe 重启简介

Skyframe 是一个可对依赖关系图执行并行评估的框架。 图中的每个节点都对应一个 SkyFunction 的求值, SkyKey 可指定其参数,SkyValue 用于指定其结果。通过 这样,SkyFunction 就可以通过 SkyKey 查找 SkyValues, 触发对其他 SkyFunction 的递归、并行评估。而不是 当请求的 SkyValue 尚不存在时,阻塞线程, 由于计算的某些子图不完整,因此请求 SkyFunction 会观察 null getValue 响应,并应返回 null 而不是 SkyValue,表明因缺少输入而未完成。 在先前请求的所有 SkyValues 后,SkyFrame 会重启 SkyFunction 可用。

在引入 SkyKeyComputeState 之前, 以便完全重新运行计算虽然这有二次方程的 以这种方式编写的函数最终都会完成,因为每次重新运行, 返回 null 的查找更少。借助 SkyKeyComputeState,您可以 将手动指定的检查点数据与 SkyFunction 关联, 重新计算。

StateMachine 是位于 SkyKeyComputeState 内的对象,可消除 在 SkyFunction 重启时,几乎所有重新计算操作(假设 (例如,SkyKeyComputeState 不会从缓存中退出) 执行钩子

SkyKeyComputeState 中的有状态计算

从面向对象的设计的角度来看,考虑存储 SkyKeyComputeState 内的计算对象,而不是纯数据值。 在 Java 中,对携带对象的行为的最低要求是 功能接口,事实证明这已经足够了。StateMachine具有 奇怪的递归定义2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 接口类似于 SkyFunction.Environment, 专为异步设计而设计,并添加了对逻辑并发子任务的支持3

step 的返回值是另一个 StateMachine,符合规范 一系列步骤。在以下情况下,step 会返回 DONEStateMachine 已完成。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

描述了 StateMachine,其输出如下。

hello
world

请注意,方法引用 this::step2 也为 StateMachine,因为 step2,并且符合 StateMachine 的函数接口定义。方法 引用是用于指定 StateMachine

暂停和恢复

直观地说,将计算分解为 StateMachine 个步骤,而不是 单体式函数,提供挂起恢复 计算。当 StateMachine.step 返回时,存在明确的暂停 。由返回的 StateMachine 值指定的接续是 显式恢复点。因此可以避免重新计算,因为 就可以准确地从中断的地方继续计算

回调、接续和异步计算

从技术上讲,StateMachine 起到延续的作用, 要执行的后续计算。StateMachine除了会阻塞 通过从 step 函数返回自愿挂起,这会将 Driver 实例。Driver可以 然后切换到已就绪的 StateMachine 或将控制权交还给 Skyframe。

传统上,回调和接续会混淆成一个概念。 不过,StateMachine 保留了这两者之间的区别。

  • Callback - 描述将异步结果存储到什么位置 计算。
  • 接续 - 指定下一个执行状态。

调用异步操作时需要回调,这意味着 实际操作并不会在调用该方法后立即执行,如 SkyValue 查询的例子回调应尽可能简单。

接续StateMachineStateMachine 返回值, 封装所有异步脚本之后的复杂执行, 计算解析度。这种结构化方法有助于将 回调易于管理

Tasks

Tasks 接口为 StateMachine 提供用于查找 SkyValues 的 API 以及调度并发子任务。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查询

StateMachine 使用 Tasks.lookUp 重载来查找 SkyValues。它们分别是 类似于 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,并且具有类似的异常处理 语义信息。该实现不会立即执行查询, 而是先批量执行尽可能多的查找操作4,然后再执行此操作。值 可能无法立即使用,例如需要重启 Skyframe, 因此调用方会使用回调指定如何处理结果值。

StateMachine 处理器(Driver 并桥接至 SkyFrame),可确保该值 下一个状态开始。相关示例如下。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上面的示例中,第一步是查找 new Key(),并传递 this。之所以能这么做,是因为 DoesLookup 实现了 Consumer<SkyValue>

根据协定,在下一个状态 DoesLookup.processValue 开始之前, 对“DoesLookup.step”的查询已完成。因此,当value 它在 processValue 中被访问。

子任务

Tasks.enqueue 请求执行逻辑上并发的子任务。 子任务也是 StateMachine,可以执行常规 StateMachine 的任何操作 包括以递归方式创建更多子任务或查找 SkyValues。 与 lookUp 类似,状态机驱动程序可确保所有子任务 完成后,才能继续执行下一步。相关示例如下。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

虽然 Subtask1Subtask2 在逻辑上是并发的,但一切都是在 因此“并发”i 的更新不需要任何 同步。

结构化并发

由于每个 lookUpenqueue 必须先解析,然后才能推进到下一个 则并发会自然受限于树结构。时间是 创建分层5并发,如下所示: 示例。

结构化并发

通过 UML 很难判断并发结构是否构成了树。 备用视图可以更好地显示 树形结构。

非结构化并发

结构化并发更容易推断。

组合和控制流模式

本部分举例说明了如何组合多个 StateMachine 并解决某些控制流问题。

顺序状态

这是最常见且最直接的控制流模式。示例 如这篇有状态计算 SkyKeyComputeState

分支

StateMachine 中的分支状态可以通过返回不同的 值,如下例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

某些分支返回 DONE 表示提前完成的情况很常见。

高级顺序组合

由于 StateMachine 控件结构是无内存的,因此共享 StateMachine 子任务定义有时可能会很棘手。让 M1M2StateMachine实例共享 StateMachineSM1M2 为序列 <A, S, B><X, S, Y>。问题在于,S 不知道是否 在完成后继续执行 BY,并且 StateMachine 没有将 调用堆栈。本部分将介绍实现这一目标的一些技巧。

StateMachine 作为终端序列元素

这并不能解决最初出现的问题。它仅按顺序 当共享的 StateMachine 在序列中结束时。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身就是一个复杂的状态机,上述方法仍然适用。

依序组合的子任务

因为入队的子任务必定会在下一个状态前完成, 有时可能会稍微滥用6子任务机制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有时,滥用 Tasks.enqueue 是不可能的, 并行子任务或 Tasks.lookUp 调用(必须在 S 之前完成) 执行。在这种情况下,将 runAfter 参数注入 S 可用于 告知 S 下一步做什么。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

此方法比滥用子任务更简洁。不过,同样地 例如,通过使用 runAfter 嵌套多个 StateMachine, 通往 Callback Hell 的道路。最好是按顺序拆分 具有普通顺序状态的 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替换为以下内容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止访问替代方案:runAfterUnlessError

在之前的草稿中,我们考虑过会中止的 runAfterUnlessError 以发现错误。之所以这样做,是因为错误通常最终 检查了两次,一次是由具有 runAfter 引用的 StateMachine 进行检查, 由 runAfter 机器本身触发一次。

经过一番考虑,我们觉得代码的一致性 这比删除重复的错误检查更加重要。如果使用 runAfter 机制的运作方式与 tasks.enqueue 机制,该机制始终需要错误检查。

直接委托

每次出现正式状态转换时,主 Driver 循环都会前进。 根据合同,推进状态意味着之前将所有 SkyValue 加入队列 查询和子任务会在下一个状态执行之前解析。逻辑有时 StateMachine 会使阶段推进不必要,或 会适得其反。例如,如果委托的第一个 step 执行 可与委托状态的查找并行处理的 SkyKey 查找 那么阶段推进会使它们依序运行。更合理的做法是, 执行直接委托,如下例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

数据流

前面讨论的重点一直是控制流的管理。这个 部分介绍了数据值的传播。

实现 Tasks.lookUp 回调

您可以在 SkyValue 中实现 Tasks.lookUp 回调的示例 lookups。这一部分介绍了 处理多个 SkyValues 的方法

Tasks.lookUp 回调

Tasks.lookUp 方法接受回调 sink 作为参数。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

惯用的方法是使用 Java lambda 来实现这一点:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是执行相应任务的 StateMachine 实例的成员变量 查询。不过,与相比,lambda 需要额外的内存分配, 在 StateMachine 中实现 Consumer<SkyValue> 接口 实施。进行多次查找时,lambda 仍然非常有用 会有歧义。

还有错误处理 Tasks.lookUp 过载,类似于 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

实现示例如下所示。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

与不处理错误的查询一样,直接使用 StateMachine 类 实现回调可以保存 lamba 的内存分配。

错误处理提供了更多详细信息,但实质上 误差的传播和正常值的传播没有太大区别。

使用多个 Sky 值

通常需要多次查找 SkyValue 值。这种方法在很大程度上 打开 SkyValue 类型以下是一个 从原型生产代码开始简化。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

可以明确共享 Consumer<SkyValue> 回调实现 因为值类型不同。如果不是这样,则回退到 基于 lambda 的实现或实现 适当的回调

StateMachine 之间传播值

到目前为止,本文档仅解释了如何在子任务中排列工作, 子任务还需要将值报告给调用方。由于子任务 它们的结果在逻辑上异步异步执行, 回调。为此,子任务定义了一个接收器接口, 通过其构造函数注入。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

调用方 StateMachine 将如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上面的示例演示了几个方面。Caller 必须将其 返回并定义自己的 Caller.ResultSinkCaller 会实现 BarProducer.ResultSink 回调。恢复后,processResult 会检查: value 为 null 以确定是否发生了错误。这是一种常见情况 接受子任务或 SkyValue 查询的输出。

请注意,acceptBarError 的实现会尽快将结果转发给 Caller.ResultSink(按照错误气泡的要求)。

DriverStateMachine 桥接至 SkyFunction

错误处理

Tasks.lookUp 中已经有几个错误处理示例 回调StateMachines。以下例外情况除外: 系统不会抛出 InterruptedException,而是会传递 回调作为值。此类回调通常具有独占或语义, 传递的值或错误中的一个。

下一部分将介绍如何与 Skyframe 进行细微而重要的交互 错误处理。

错误提示 (--nokeep_going)

在错误冒泡期间,即使 SkyFunction 没有全部请求,也可能会重新启动 提供天空值。在此类情况下,后续状态永远不会 因为Tasks API 合同的期限限制。不过,StateMachine 应 仍会传播异常。

由于无论是否达到下一个状态,都必须进行传播, 错误处理回调必须执行此任务。对于内部 StateMachine, 这是通过调用父级回调实现的

在与 SkyFunction 连接的顶级 StateMachine 中,可以执行以下操作: 通过调用 ValueOrExceptionProducersetException 方法来完成。 然后,ValueOrExceptionProducer.tryProduceValue 会抛出异常,即使 如果缺少 SkyValues 的话。

如果直接使用 Driver,请务必检查 通过 SkyFunction 传播错误,即使机器尚未完成 处理。

事件处理

对于需要发出事件的 SkyFunctions 会注入 StoredEventHandler 并注入到 SkyKeyComputeState 中,并进一步注入到需要StateMachine 。过去,由于 Skyframe 丢失,需要 StoredEventHandler 某些事件,除非重播,但随后此问题得到了修复。 StoredEventHandler 注入得以保留,因为它简化了 错误处理回调所发出的事件的实现。

Driver 和桥接 SkyFunction

Driver 负责管理 StateMachine 的执行。 以指定的根 StateMachine 开头。StateMachine 递归地将子任务 StateMachine 加入队列,单个 Driver 可以管理 大量子任务。这些子任务会创建一个树形结构, 结构化并发Driver 对 SkyValue 进行批处理 以便提高效率。

使用以下 API 围绕 Driver 构建了许多类。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 接受单个根 StateMachine 作为参数。正在呼叫 Driver.drive 会在没有StateMachine Skyframe 已重启。此方法在 StateMachine 完成时返回 true,返回 false 否则,表示并非所有值都可用。

Driver 可保持 StateMachine 的并发状态, 适用于在 SkyKeyComputeState 中嵌入。

正在直接实例化 Driver

StateMachine实现通常会通过 回调。您可以直接实例化 Driver,如 示例。

DriverSkyKeyComputeState 实现一起嵌入到 相应 ResultSink 的实现可进一步定义 。在顶层,State 对象是 计算结果,因为它一定会比 Driver 存在。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下代码描绘了 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

用于延迟计算结果的代码可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入 Driver

如果 StateMachine 生成一个值并且没有引发任何异常,则使用 Driver 是另一种可能的实现,如以下示例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 的代码可能如下所示(其中 State 为 特定于函数类型的 SkyKeyComputeState)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 实现中嵌入 Driver 更适合 Skyframe 的同步编码样式。

可能会产生异常的 StateMachines

否则,有可嵌入 SkyKeyComputeStateValueOrExceptionProducer 以及具有要匹配的同步 API 的 ValueOrException2Producer 类 同步 SkyFunction 代码

ValueOrExceptionProducer 抽象类包括以下方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

它包含一个嵌入式 Driver 实例,与 嵌入式驱动程序和接口中的 ResultProducer 类 以类似方式将其与 SkyFunction 搭配使用。您不必定义 ResultSink, 当发生这两种情况时,实现会调用 setValuesetException。 如果两种情况都发生,系统会优先处理异常。tryProduceValue 方法 将异步回调代码桥接到同步代码并抛出 就会出现异常

如前所述,在错误冒出期间,可能会出现 即使机器尚未完成,因为并非所有输入都可用。接收者 为了适应这一限制,tryProduceValue 会抛出任何设置的异常,甚至在 运行完毕。

结语:最终移除回调

StateMachine 是一种非常高效但样板密集的执行方式 异步计算。接续(特别是以 Runnable 的形式) 传递给 ListenableFuture)在 Bazel 代码的某些部分中广泛应用, 但在分析 SkyFunction 中并不常用。分析主要受 CPU 限制, 不存在针对磁盘 I/O 的高效异步 API。最终,它就会是 非常适合优化离开回调,因为它们有一定的学习曲线, 可读性

Java 虚拟线程是最有潜力的替代方案之一。而不是 必须写入回调,一切都被同步、阻塞取代 调用。之所以能够这么做,是因为与 应该很便宜不过,即使使用虚拟线程 用线程创建和同步代替简单的同步操作 基元太贵了。我们已执行了从 StateMachineJava 虚拟线程,而且速度要慢几个数量级, 将端到端分析延迟时间增加到将近 3 倍。由于虚拟线程是 还只是一个预览版功能,此迁移可以在 在效果有所提升时进行。

另一种可以考虑的方法是等待 Loom 协程(如果有的话) 可用。这种方法的优势在于 减少同步开销。

如果其他方法都失败,低级别字节码重写也可能是可行的 经过充分优化后,就可能实现 性能接近手写回调代码的性能。

附录

回调地狱

回调地狱是使用回调的异步代码中一个令人恶名的问题。 它源于这样一个事实,即后续步骤的接续是 创建 Deployment 清单如果有很多步骤,这种嵌套 深度。如果与控制流结合使用,代码将变得无法管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

嵌套实现的优势之一是,代码的堆栈帧 外部步骤。在 Java 中,捕获的 lambda 变量必须是 因此使用此类变量可能会很麻烦。深度嵌套 可避免将方法引用作为接续(而不是像 lambda)返回 如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 注入 模式使用过于密集,但可以通过穿插注入来避免这种情况 和顺序步骤。

示例:链式 SkyValue 查询

通常,应用逻辑需要 例如,如果第二个 SkyKey 依赖于第一个 SkyValue,则可进行 SkyValue 查询。 简单地考虑一下,这会导致产生一个复杂、深层嵌套的 回调结构。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不过,由于接续已指定为方法引用,因此代码如下所示 跨状态转换过程:step2 遵循 step1。请注意,在这里, lambda 用于分配 value2。这样,代码的顺序就会与 的计算顺序。

其他提示

可读性:执行排序

为了提高可读性,请尽量保留 StateMachine.step 实现 执行顺序,并且回调实现紧跟在 会传递到代码中控制流并不总是可行的, 分支。在这种情况下,其他注释可能会有所帮助。

示例:链接的 SkyValue 查询中, 为此,会创建中间方法引用。这只会导致 提高可读性,这在这方面很有价值。

生成假设

存在时间中等的 Java 对象打破了 Java 的生成假设 垃圾回收器,旨在处理 或将永远存在的对象。根据定义, SkyKeyComputeState违反了这个假设。这类对象包含 所有仍在运行的 StateMachine(位于 Driver 根位置)的构造树, 应用挂起时的中间生命周期,等待异步计算 操作完成。

在 JDK19 中似乎没有那么糟糕,但在使用 StateMachine 时, GC 时间可能会增加, 实际产生的垃圾数。由于 StateMachine 的有效期处于中间阶段 它们可能会被提升到旧世代,导致存储空间用完 需要进行更昂贵的 Major 或完整 GC 进行清理。

最初的预防措施是尽量减少使用 StateMachine 变量,但 这并非总是可行,例如,如果多个 状态。如果可能,本地堆栈 step 变量为新生代 并高效地进行垃圾回收。

对于 StateMachine 变量,将任务细分为多个子任务, 建议使用的模式在两者之间传播值 StateMachine 也很有用。请注意, 遵循该模式,只有子级 StateMachine 引用了父级 StateMachine,反之亦然。也就是说,当子代完成 使用结果回调更新父项时,子项自然会从中 作用域,并且符合进行 GC 的条件。

最后,在某些情况下,早期状态下需要 StateMachine 变量 但在以后的状态下则不行将大型资源引用设为 null 对象。

命名状态

为方法命名时,通常可以基于行为命名方法 此方法内发生的事件目前尚不清楚如何在 StateMachine,因为没有堆栈。例如,假设方法为 foo 会调用子方法 bar。在 StateMachine 中,可以将其转换为 状态序列 foo,后跟 barfoo 不再包含该行为 bar。因此,状态的方法名称范围往往较小 并可能体现本地行为

并发树图

下面是该图表的结构化 并发来更好地描述树结构。 这些木块构成了一棵小树。

结构化并发 3D


  1. 这与 SkyFrame 值不可用。 

  2. 请注意,允许 step 抛出 InterruptedException,但 示例省略此值。Bazel 代码中有一些简单的方法会导致 此异常会传播到 Driver,稍后将进行说明, 运行 StateMachine。在以下情况下,可以不声明 。

  3. 并发子任务的驱动因素是 ConfiguredTargetFunction, 为每个依赖项执行独立工作。你不必处理 同时处理所有依赖项的复杂数据结构, 这会降低效率,每个依赖项都有自己的独立 StateMachine

  4. 单个步骤中的多个 tasks.lookUp 调用会一起进行批处理。 通过在并发作业中发生的查询,可以创建额外的批处理 子任务。 

  5. 它在概念上类似于 Java 的结构化并发机制, jeps/428。 

  6. 这类似于生成线程,然后联接线程来实现 依序组合。