Skyframe StateMachine 指南

报告问题 查看源代码

概览

Skyframe StateMachine 是位于堆上的解构函数对象。当所需值不是立即可用,而是异步计算时,它支持灵活和无冗余的评估1StateMachine 在等待时不能占用线程资源,而必须挂起和恢复。因此,解构会公开明确的重新入口点,以便跳过之前的计算。

StateMachine 可用于表示序列、分支、结构化逻辑并发,专为 Skyframe 互动量身打造。StateMachine 可以组合成更大的 StateMachine 并共享子 StateMachine。并发始终按结构进行分层,并且完全采用逻辑方式。每个并发子任务都在单个共享的父级 SkyFunction 线程中运行。

简介

本部分简要激励并介绍了 StateMachine(可在 java.com.google.devtools.build.skyframe.state 软件包中找到)。

Skyframe 重启简介

Skyframe 是一个可对依赖关系图执行并行评估的框架。图中的每个节点都对应一个 SkyFunction 的评估,其中 SkyKey 指定了其参数,而 SkyValue 指定了其结果。在这种计算模型中,SkyFunction 可以通过 SkyKey 查找 SkyValues,从而触发对其他 SkyFunction 的递归并行评估。当请求的 SkyValue 因某个计算子图未完成而尚未就绪时,便会观察到 null getValue 响应,并应返回 null(而非 SkyValue),表明由于缺少输入,请求 SkyValue 尚未就绪,而非阻塞线程。当先前请求的所有 Sky 值都可用时,SkyFrame 会重启 SkyFunction。

在引入 SkyKeyComputeState 之前,处理重启的传统方法是完全重新运行计算。尽管这具有二次复杂性,但以这种方式编写的函数最终会完成,因为每次重新运行,返回 null 的查询更少。借助 SkyKeyComputeState,可以将手动指定的检查点数据与 SkyFunction 相关联,从而节省大量重新计算。

StateMachine 是位于 SkyKeyComputeState 内的对象,通过公开挂起和恢复执行钩子,在 SkyFunction 重启时,几乎消除了所有重新计算(假设 SkyKeyComputeState 没有从缓存中移出)。

SkyKeyComputeState 中的有状态计算

从面向对象的设计的角度来看,可以考虑将计算对象存储在 SkyKeyComputeState 中,而不是存储纯数据值。在 Java 中,对具有行为的对象的最低描述是功能接口,事实证明此类描述已足够。StateMachine 具有以下奇怪递归定义2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 接口类似于 SkyFunction.Environment,但它是专为异步而设计的,并添加了对逻辑并发子任务的支持3

step 的返回值是另一个 StateMachine,允许以归纳方式指定一系列步骤。当 StateMachine 完成后,step 会返回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

描述了 StateMachine,其输出如下。

hello
world

请注意,由于 step2 满足 StateMachine 的函数接口定义,因此方法引用 this::step2 也是一个 StateMachine。方法引用是在 StateMachine 中指定下一个状态的最常用方法。

暂停和恢复

直观地说,将计算分解为 StateMachine 步(而不是单体式函数)可提供挂起和恢复计算所需的钩子。suspendsuspendStateMachine.step 返回时,有一个明确的挂起点。由返回的 StateMachine 值指定的接续是显式 resume 点。因此,可以避免重新计算,因为可以从中断的地方继续进行计算。

回调、接续和异步计算

在技术术语中,StateMachine 充当了“延续”,用于确定要执行的后续计算。continuationStateMachine 可以从 step 函数返回,从而主动suspend,后者会将控制权转回给 Driver 实例。然后,Driver 可以切换到就绪的 StateMachine,或者将控制权移回给 Skyframe。

传统上,回调和接续会混淆成一个概念。callbackscallbacks不过,StateMachine 保留了这两者之间的区别。

  • 回调 - 描述异步计算结果的存储位置。
  • 接续 - 指定下一个执行状态。

调用异步操作时需要回调,这意味着实际操作不会在调用该方法后立即发生,就像在 SkyValue 查询中一样。回调应尽可能简单。

接续StateMachineStateMachine 返回值,可封装所有异步计算都解析完成后进行的复杂执行。这种结构化方法有助于让回调的复杂性保持在可管理的范围内。

任务

Tasks 接口为 StateMachine 提供了用于通过 SkyKey 查找 SkyValues 并调度并发子任务的 API。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查询

StateMachine 使用 Tasks.lookUp 重载来查找 SkyValues。它们类似于 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,并且具有类似的异常处理语义。该实现不会立即执行查找,而是会尽可能多地批量执行4查询。该值可能不会立即可用(例如,需要重启 Skyframe),因此调用方会使用回调指定如何处理结果值。

StateMachine 处理器(Driver 并桥接至 SkyFrame)可保证值在下一个状态开始之前可用。相关示例如下。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上面的示例中,第一步是查找 new Key(),并将 this 作为使用方传递。之所以能这么做,是因为 DoesLookup 实现了 Consumer<SkyValue>

根据协定,在下一个状态 DoesLookup.processValue 开始之前,对 DoesLookup.step 的所有查询都已完成。因此,在 processValue 中对其进行访问时,value 可用。

子任务

Tasks.enqueue 请求执行逻辑上并发的子任务。子任务也是 StateMachine,可以执行常规 StateMachine 可以执行的任何操作,包括以递归方式创建更多子任务或查找 SkyValues。与 lookUp 类似,状态机驱动程序会确保所有子任务都已完成,然后再继续执行下一步。相关示例如下。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

虽然 Subtask1Subtask2 在逻辑上是并发的,但所有内容都在单个线程中运行,因此 i 的“并发”更新不需要任何同步。

结构化并发

由于每个 lookUpenqueue 都必须在进入下一个状态之前进行解析,这意味着并发自然受限于树结构。可以创建分层5 并发,如以下示例所示。

结构化并发

UML 很难判断并发结构构成了树。 我们有一个备用视图,可以更好地显示树结构。

非结构化并发

结构化并发更容易推断。

组合和控制流模式

本部分通过几个示例来说明如何组合多个 StateMachine,以及解决某些控制流问题。

顺序状态

这是最常见且最直接的控制流模式。相关示例请参见 SkyKeyComputeState 内的有状态计算

分支

StateMachine 中的分支状态可通过使用常规 Java 控制流返回不同的值来实现,如以下示例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

某些分支返回 DONE 表示提前完成的情况很常见。

高级顺序组合

由于 StateMachine 控制结构是无内存的,因此将 StateMachine 定义共享为子任务有时可能会很尴尬。让 M1M2 为共用 StateMachineSStateMachine 实例,其中 M1M2 分别是序列 <A, S, B><X, S, Y>。问题在于,S 不知道在完成后是继续执行 B 还是 Y,并且 StateMachine 没有完全保留调用堆栈。本部分将介绍实现这一目标的一些技巧。

StateMachine 作为终端序列元素

这并不能解决最初出现的问题。仅当共享 StateMachine 为序列中的终端时,它才会演示顺序组合。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身就是一个复杂的状态机,上述方法仍然适用。

依序组合的子任务

由于已加入队列的子任务保证在下一个状态之前完成,因此有时可能会稍微滥用6子任务机制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有时,无法滥用 Tasks.enqueue,因为在执行 S 之前,必须完成其他并行子任务或 Tasks.lookUp 调用。在这种情况下,将 runAfter 参数注入 S 可用于告知 S 后续步骤。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

此方法比滥用子任务更简洁。不过,过于宽松地采用此方法(例如,使用 runAfter 嵌套多个 StateMachine)才是通向 Callback Hell 的通道。最好改为拆分具有普通顺序状态的序列 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替换为以下内容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止访问替代方案:runAfterUnlessError

在之前的草稿中,我们考虑过会在出错时提前中止的 runAfterUnlessError。这是因为错误通常最终会被检查两次,一次是由具有 runAfter 引用的 StateMachine 检查,另一次是由 runAfter 机器本身检查。

经过一番考虑,我们认为代码的一致性比删除重复的错误检查更加重要。如果 runAfter 机制与始终需要错误检查的 tasks.enqueue 机制不一致,则会令人困惑。

直接委托

每次出现正式状态转换时,主 Driver 循环都会前进。根据合同,推进状态意味着之前加入队列的所有 SkyValue 查询和子任务在下一个状态执行之前解决。有时,委托 StateMachine 的逻辑会使阶段推进变得不必要或会适得其反。例如,如果委托的第一个 step 执行可以与委托状态查找并行处理的 SkyKey 查找,则阶段推进会使这些查询依序进行。更合理的做法是执行直接委托,如以下示例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

数据流

前面讨论的重点一直是控制流的管理。本部分介绍了数据值的传播。

实现 Tasks.lookUp 回调

您可以在 SkyValue 查询中实现 Tasks.lookUp 回调的示例。本部分提供处理多个 SkyValues 的理由并推荐了几种方法。

Tasks.lookUp 回调

Tasks.lookUp 方法接受回调 sink 作为参数。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

惯用的方法是使用 Java lambda 来实现这一点:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是执行查找的 StateMachine 实例的成员变量。不过,与在 StateMachine 实现中实现 Consumer<SkyValue> 接口相比,lambda 需要额外的内存分配。当多次查找模糊不清时,lambda 仍然非常有用。

此外,还有错误处理 Tasks.lookUp 过载,它类似于 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

实现示例如下所示。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

与不处理错误的查询一样,让 StateMachine 类直接实现回调可以节省 lamba 的内存分配。

错误处理提供了更多详细信息,但从本质上讲,错误的传播与正常值的传播没有太大区别。

使用多个 Sky 值

通常需要多次查找 SkyValue 值。启用 SkyValue 的类型是一种在大部分时间里行之有效的方法。以下是从原型设计正式版代码中简化的示例。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

Consumer<SkyValue> 回调实现可以明确共享,因为值类型不同。如果情况并非如此,则回退到基于 lambda 的实现或实现相应回调的完整内部类实例是可行的。

StateMachine 之间传播值

到目前为止,本文档仅介绍了如何排列子任务中的工作,但子任务还需要将值报告给调用方。由于子任务在逻辑上是异步的,因此系统会使用回调将其结果传回给调用方。为此,子任务会定义通过其构造函数注入的接收器接口。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

调用方 StateMachine 将如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上面的示例演示了几个方面。Caller 必须回传其结果并定义自己的 Caller.ResultSinkCaller 会实现 BarProducer.ResultSink 回调。恢复后,processResult 会检查 value 是否为 null,以确定是否发生了错误。这是接受子任务或 SkyValue 查询的输出后的一种常见行为模式。

请注意,根据错误冒泡的要求,acceptBarError 的实现会立即将结果转发到 Caller.ResultSink

Driver 与 SkyFunction 的桥接中介绍了顶级 StateMachine 的替代方案。

错误处理

Tasks.lookUp 回调StateMachines 之间传播值中已有几个错误处理示例。系统不会抛出 InterruptedException 以外的异常,而是作为值通过回调进行传递。此类回调通常具有独占或语义,且只传递值或错误之一。

下一部分将介绍与 Skyframe 错误处理之间的一种微妙但非常重要的交互。

错误提示 (--nokeep_going)

在错误冒泡期间,即使并非所有请求的 SkyValues 都可用,SkyFunction 也可能会重新启动。在这种情况下,根据 Tasks API 协定,后续状态将永远不会达到。不过,StateMachine 应仍会传播异常。

由于无论是否达到下一个状态,都必须进行传播,因此错误处理回调必须执行此任务。对于内部 StateMachine,这可以通过调用父回调来实现。

在与 SkyFunction 对接的顶级 StateMachine 中,可通过调用 ValueOrExceptionProducersetException 方法来完成此操作。然后,即使缺少 SkyValues,ValueOrExceptionProducer.tryProduceValue 也会抛出异常。

如果直接使用 Driver,请务必检查 SkyFunction 中是否已传播错误,即使机器尚未完成处理也是如此。

事件处理

对于需要发出事件的 SkyFunction,会将 StoredEventHandler 注入 SkyKeyComputeState,并进一步注入需要这些事件的 StateMachine。过去,需要使用 StoredEventHandler,因为 Skyframe 会丢弃某些事件,除非重播,但随后此问题得到了修复。StoredEventHandler 注入得以保留,因为它简化了从错误处理回调发出的事件的实现。

Driver 和桥接 SkyFunction

Driver 负责管理从指定的根 StateMachine 开始的 StateMachine 的执行。由于 StateMachine 能够以递归方式将子任务 StateMachine 加入队列,因此单个 Driver 可以管理大量子任务。借助结构化并发机制,这些子任务会创建树结构。Driver 可跨子任务批量进行 SkyValue 查找,以提高效率。

使用以下 API 围绕 Driver 构建了许多类。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 接受单个根 StateMachine 作为参数。调用 Driver.drive 会在不重启 Skyframe 的情况下,尽可能执行 StateMachine。它在 StateMachine 完成时返回 true,否则返回 false,这表示并非所有值都可用。

Driver 可保持 StateMachine 的并发状态,它非常适合嵌入 SkyKeyComputeState 中。

正在直接实例化 Driver

StateMachine 实现通常会通过回调来传达其结果。您可以直接实例化 Driver,如以下示例所示。

Driver 与相应 ResultSink 的实现(进一步定义)一起嵌入到 SkyKeyComputeState 实现中。在顶层,State 对象是适合计算结果的接收器,因为它的存在时间肯定比 Driver 更长。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下代码描绘了 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

用于延迟计算结果的代码可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入 Driver

如果 StateMachine 生成一个值并且未引发任何异常,则嵌入 Driver 是另一种可能的实现,如以下示例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 的代码可能如下所示(其中 State 是特定于函数的 SkyKeyComputeState 类型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 实现中嵌入 Driver 更适合 Skyframe 的同步编码样式。

可能会产生异常的 StateMachines

否则,存在 SkyKeyComputeState 嵌入的 ValueOrExceptionProducerValueOrException2Producer 类,它们具有与同步 SkyFunction 代码匹配的同步 API。

ValueOrExceptionProducer 抽象类包括以下方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

它包含一个嵌入式 Driver 实例,与嵌入驱动程序中的 ResultProducer 类非常相似,并以类似方式与 SkyFunction 进行交互。当出现上述任何一种情况时,实现都会调用 setValuesetException,而不是定义 ResultSink。如果两种情况都发生,系统会优先处理异常。tryProduceValue 方法将异步回调代码桥接到同步代码,并在设置了同步代码时抛出异常。

如前所述,在错误冒泡期间,即使机器尚未完成,也可能发生错误,因为并非所有输入都可用。为了适应这种情况,tryProduceValue 会抛出任何设置的异常,即使在机器完成之前也是如此。

结语:最终移除回调

StateMachine 是一种执行异步计算的高效但样板密集型方法。接续(特别是以传递给 ListenableFutureRunnable 的形式)在 Bazel 代码的某些部分很常见,但在分析 SkyFunction 中并不普遍。分析主要受 CPU 限制,对于磁盘 I/O,没有高效的异步 API。最终,最好优化退出回调,因为此类回调有一定的学习曲线,会影响可读性。

Java 虚拟线程是最有潜力的替代方案之一。您不必编写回调,而是会全部替换为同步的阻塞调用。之所以能够这样做,是因为与平台线程不同,绑定虚拟线程资源的费用应该更低。但是,即使使用虚拟线程,用线程创建和同步基元替换简单的同步操作也代价过高。我们执行了从 StateMachineJava 虚拟线程的迁移,这些线程速度减慢了几个数量级,导致端到端分析延迟时间几乎增加了 3 倍。由于虚拟线程仍是一项预览版功能,因此可以在性能有所提升的晚些时候执行此迁移。

另一种可以考虑的方法是等待 Loom 协程(如果它们变得可用)。这种方法的优势在于,您可以使用协作式多任务处理来减少同步开销。

如果所有这些都失败,低级别字节码重写也可能是一种可行的替代方案。通过充分优化,或许可以实现接近手写回调代码的性能。

附录

回调地狱

回调地狱是使用回调的异步代码中一个令人恶名的问题。这源自后续步骤的接续嵌套在前一步中。如果步骤有很多,这种嵌套可能会非常深层。如果与控制流结合使用,代码将变得无法管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

嵌套实现的优势之一是可以保留外部步骤的堆栈帧。在 Java 中,捕获的 lambda 变量必须是有效的最终变量,因此使用此类变量可能会很麻烦。通过将方法引用作为接续而不是 lambda 返回,可以避免深度嵌套,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 注入模式使用得过于密集,也可能会发生回调情况,但可以通过按顺序依次进行注入来避免这种情况。

示例:链式 SkyValue 查询

通常的情况是,应用逻辑需要 SkyValue 查询的从属链,例如,如果第二个 SkyKey 依赖于第一个 SkyValue。简单地考虑一下,这会导致产生一个复杂的深度嵌套回调结构。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不过,由于接续被指定为方法引用,因此代码在状态转换之间看起来是程序的:step2 跟在 step1 后面。请注意,此处使用 lambda 分配 value2。这使代码的顺序与计算从上到下的顺序一致。

其他提示

可读性:执行排序

为了提高可读性,请尽量让 StateMachine.step 实现按执行顺序,并且回调实现紧跟在代码中传递之后的位置。在控制流分支时,这并不总是可行。在这种情况下,其他注释可能会有所帮助。

示例:链接的 SkyValue 查询中,为实现此目标,创建了一个中间方法引用。这样会牺牲少量的性能来换取可读性,在这里或许很值得。

生成假设

存在时间中等的 Java 对象打破了 Java 垃圾回收器的生成假设,后者旨在处理存在时间很短的对象或永久存在的对象。根据定义,SkyKeyComputeState 中的对象违反了这个假设。此类对象包含由所有仍在运行的 StateMachine 构成的树,并且位于 Driver 根位置,在挂起时,它们在等待异步计算完成时具有中间生命周期。

这在 JDK19 中似乎没有那么糟糕,但在使用 StateMachine 时,有时可能会观察到 GC 时间增加,即使实际生成的垃圾回收量大幅减少。由于 StateMachine 的有效期处于中间阶段,因此可以升级到旧代,导致其用时更快,因此需要清理开销更高的主要 GC 或完整 GC。

最初的预防措施是尽量减少使用 StateMachine 变量,但这并非总是可行的,例如,在多个状态中需要某个值时。在可能的情况下,本地堆栈 step 变量是新生代变量,并且会进行高效的 GC。

对于 StateMachine 变量,将内容分解为子任务并遵循StateMachine 之间传播值的推荐模式也很有帮助。请注意,如果遵循该模式,只有子级 StateMachine 引用父级 StateMachine,反之亦然。这意味着,当子项使用结果回调完成并使用结果回调更新父项时,子项会自然地超出作用域,符合 GC 的条件。

最后,在某些情况下,早期状态需要 StateMachine 变量,但后期状态不需要该变量。一旦确定不再需要大型对象的引用,将其设为 null 会非常有用。

命名状态

为方法命名时,通常可以针对该方法内发生的行为命名方法。目前尚不清楚如何在 StateMachine 中执行此操作,因为没有堆栈。例如,假设方法 foo 调用子方法 bar。在 StateMachine 中,可将其转换为状态序列 foo,后跟 barfoo 不再包含 bar 行为。因此,状态的方法名称往往范围较小,可能反映了本地行为。

并发树图

下面是结构化并发示意图的另一种视图,该图更好地描述了树结构。这些木块构成了一棵小树。

结构化并发 3D


  1. 这与 SkyFrame 在值不可用时从头开始重启的惯例相反。

  2. 请注意,允许 step 抛出 InterruptedException,但示例省略了这个设置。Bazel 代码中有一些较低的方法会抛出此异常,并且这些异常会传播到运行 StateMachineDriver(稍后将进行说明)。可以不将其声明为在不需要时抛出。

  3. 并发子任务由 ConfiguredTargetFunction 驱动,后者为每个依赖项执行“独立”工作。每个依赖项都有自己独立的 StateMachine,而不是操纵可同时处理所有依赖项的复杂数据结构,从而导致效率低下。

  4. 单个步骤中的多个 tasks.lookUp 调用会一起进行批处理。通过在并发子任务中发生的查询可以创建其他批处理。

  5. 这在概念上与 Java 的结构化并发 jeps/428 类似。

  6. 这类似于生成线程,然后联接线程以实现顺序组合。