Skyframe StateMachine 指南

报告问题 查看来源 Nightly · 8.3 · 8.2 · 8.1 · 8.0 · 7.6

概览

Skyframe StateMachine 是位于堆上的解构函数对象。当所需值不是立即可用而是异步计算时,它支持灵活的评估,且不会出现冗余1StateMachine 在等待时无法占用线程资源,而必须暂停和恢复。因此,解构会公开显式重新进入点,以便跳过先前的计算。

StateMachine 可用于表达序列、分支、结构化逻辑并发,并且专门针对 Skyframe 交互量身定制。StateMachine 可以组合成更大的 StateMachine,并共享子 StateMachine。并发始终是分层结构,并且是纯逻辑的。每个并发子任务都在单个共享父 SkyFunction 线程中运行。

简介

本部分简要介绍了 java.com.google.devtools.build.skyframe.state 软件包中的 StateMachine

简要介绍 Skyframe 重启

Skyframe 是一个可并行评估依赖关系图的框架。图中的每个节点都对应于对 SkyFunction 的评估,其中 SkyKey 指定其参数,SkyValue 指定其结果。计算模型是这样的:SkyFunction 可以通过 SkyKey 查找 SkyValue,从而触发对其他 SkyFunction 的递归并行评估。如果请求的 SkyValue 尚未就绪(因为计算的某个子图不完整),请求的 SkyFunction 会观察到 null getValue 响应,并应返回 null 而不是 SkyValue,以表明由于缺少输入而导致不完整。当所有之前请求的 SkyValue 都可用时,Skyframe 会重新启动 SkyFunction。

在引入 SkyKeyComputeState 之前,处理重启的传统方式是完全重新运行计算。虽然这种方式具有二次复杂度,但以这种方式编写的函数最终会完成,因为每次重新运行时,返回 null 的查找次数都会减少。借助 SkyKeyComputeState,可以将手动指定的检查点数据与 SkyFunction 相关联,从而节省大量重新计算。

StateMachine是位于 SkyKeyComputeState 内的对象,通过公开暂停和恢复执行钩子,可在 SkyFunction 重新启动时消除几乎所有重新计算。SkyKeyComputeState

SkyKeyComputeState 中的有状态计算

从面向对象的设计角度来看,将计算对象存储在 SkyKeyComputeState 中而不是纯数据值中是有意义的。在 Java 中,携带行为的对象的最低限度描述是函数式接口,事实证明这已经足够了。StateMachine 具有以下令人好奇的递归定义2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 接口类似于 SkyFunction.Environment,但它是为异步性设计的,并增加了对逻辑并发子任务的支持3

step 的返回值是另一个 StateMachine,从而可以归纳地指定一系列步骤。当 StateMachine 完成时,step 会返回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

描述了 StateMachine,输出如下。

hello
world

请注意,由于 step2 满足 StateMachine 的功能接口定义,因此方法引用 this::step2 也是一个 StateMachine。方法引用是指定 StateMachine 中下一个状态的最常见方式。

暂停和恢复

直观地说,将计算分解为 StateMachine 个步骤,而不是使用单一函数,可提供暂停和恢复计算所需的钩子。StateMachine.step 返回时,会有一个明确的暂停点。返回的 StateMachine 值指定的延续是显式恢复点。这样一来,由于计算可以从中断处继续进行,因此可以避免重新计算。

回调、延续和异步计算

从技术角度来说,StateMachine 可作为延续,用于确定要执行的后续计算。StateMachine 可以通过从 step 函数返回来主动挂起,从而将控制权转移回 Driver 实例。然后,Driver 可以切换到就绪的 StateMachine 或将控制权交还给 Skyframe。

传统上,回调延续被混为一谈。 不过,StateMachine 会区分这两者。

  • 回调 - 描述了存储异步计算结果的位置。
  • 延续 - 指定下一个执行状态。

调用异步操作时需要回调,这意味着实际操作不会像 SkyValue 查找那样在调用方法后立即发生。回调应尽可能简单。

延续StateMachineStateMachine 返回值,用于封装在所有异步计算完成后的复杂执行。这种结构化方法有助于控制回调的复杂性。

Tasks

Tasks 接口为 StateMachine 提供了一个 API,用于按 SkyKey 查找 SkyValue 和安排并发子任务。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查询

StateMachine使用 Tasks.lookUp 重载来查找 SkyValue。它们类似于 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,并且具有类似的异常处理语义。该实现不会立即执行查找,而是尽可能批量处理查找。该值可能不会立即提供,例如需要重新启动 Skyframe,因此调用方使用回调指定如何处理结果值。

StateMachine 处理器(Driver 和桥接到 SkyFrame)可保证在下一个状态开始之前,该值可用。示例如下。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上面的示例中,第一步会查找 new Key(),并将 this 作为使用方传递。之所以能实现这一点,是因为 DoesLookup 实现了 Consumer<SkyValue>

根据合同,在下一个状态 DoesLookup.processValue 开始之前,所有 DoesLookup.step 的查找都已完成。因此,当在 processValue 中访问 value 时,value 可用。

子任务

Tasks.enqueue 请求执行逻辑上并发的子任务。子任务也是 StateMachine,可以执行常规 StateMachine 可以执行的任何操作,包括递归创建更多子任务或查找 SkyValue。 与 lookUp 类似,状态机驱动程序可确保所有子任务都已完成,然后再继续执行下一步。示例如下。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

虽然 Subtask1Subtask2 在逻辑上是并发的,但所有内容都在单个线程中运行,因此 i 的“并发”更新不需要任何同步。

结构化并发

由于每个 lookUpenqueue 都必须在进入下一个状态之前得到解决,这意味着并发自然会限制为树结构。可以创建分层5并发,如以下示例所示。

结构化并发

UML 中很难看出并发结构形成树。您还可以使用其他视图,该视图可以更好地显示树状结构。

非结构化并发

结构化并发更易于推断。

组合和控制流模式

本部分介绍了如何组合多个 StateMachine,并提供了某些控制流问题的解决方案。

顺序状态

这是最常见、最直接的控制流模式。SkyKeyComputeState 内的有状态计算中展示了一个相关示例。

分支

通过使用常规 Java 控制流返回不同的值,可以在 StateMachine 中实现分支状态,如以下示例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

某些分支返回 DONE 以提前完成是很常见的。

高级顺序组合

由于 StateMachine 控制结构是无记忆的,因此将 StateMachine 定义作为子任务共享有时会很麻烦。假设 M1M2 是共享 StateMachineStateMachine 实例,S,其中 M1M2 分别是序列 <A, S, B><X, S, Y>。问题在于,S 在完成之后不知道是继续执行 B 还是 Y,而 StateMachine 并不完全保留调用堆栈。本部分将介绍一些实现此目标的技巧。

StateMachine 作为终端序列元素

这并不能解决最初提出的问题。仅当共享 StateMachine 是序列中的终端时,它才演示顺序组合。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是一个复杂的状态机,此方法也适用。

顺序组合的子任务

由于入队子任务保证在下一个状态之前完成,因此有时可以稍微滥用6 子任务机制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有时,滥用 Tasks.enqueue 是不可能的,因为在执行 S 之前,必须先完成其他并行子任务或 Tasks.lookUp 调用。在这种情况下,将 runAfter 参数注入 S 可用于告知 S 下一步该做什么。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

这种方法比滥用子任务更简洁。不过,如果过度使用此功能,例如通过嵌套多个 StateMachinerunAfter,则会导致回调地狱。最好使用普通的顺序状态来分隔连续的 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替换为以下内容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止替代项:runAfterUnlessError

在之前的草稿中,我们曾考虑过一种 runAfterUnlessError,它会在出现错误时提前中止。这是因为错误通常会被检查两次,一次由具有 runAfter 引用的 StateMachine 检查,另一次由 runAfter 机器本身检查。

经过一番考虑,我们认为代码的统一性比重复数据删除错误检查更重要。如果 runAfter 机制与始终需要进行错误检查的 tasks.enqueue 机制不能以一致的方式运行,则会造成混淆。

直接委托

每次发生正式状态转换时,主 Driver 循环都会推进。 根据合同,推进状态意味着所有之前已加入队列的 SkyValue 查找和子任务都会在执行下一个状态之前得到解决。有时,委托 StateMachine 的逻辑会使阶段推进变得不必要或适得其反。例如,如果委托的第一个 step 执行的 SkyKey 查找可以与委托状态的查找并行化,那么阶段推进会使它们按顺序执行。执行直接委托可能更有意义,如下例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

数据流

之前的讨论重点在于管理控制流。本部分介绍了数据值的传播。

实现 Tasks.lookUp 回调

SkyValue 查找中有一个实现 Tasks.lookUp 回调的示例。本部分提供了处理多个 SkyValue 的理由和建议方法。

Tasks.lookUp 回调

Tasks.lookUp 方法接受回调 sink 作为参数。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

惯用方法是使用 Java lambda 来实现此目的:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中,myValue 是执行查找的 StateMachine 实例的成员变量。不过,与在 StateMachine 实现中实现 Consumer<SkyValue> 接口相比,lambda 需要额外的内存分配。如果存在多个可能会造成歧义的查找,lambda 仍然非常有用。

Tasks.lookUp 也有类似于 SkyFunction.Environment.getValueOrThrow 的错误处理重载。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

下面展示了一个实现示例。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

与不含错误处理的查找一样,让 StateMachine 类直接实现回调可节省 lambda 的内存分配。

错误处理提供了更多详细信息,但从本质上讲,错误和正常值的传播没有太大区别。

使用多个 SkyValue

通常需要多次 SkyValue 查找。一种在大多数情况下都适用的方法是切换 SkyValue 的类型。以下示例是从原型生产代码中简化而来的。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

由于值类型不同,因此可以明确地共享 Consumer<SkyValue> 回调实现。如果不是这种情况,则可以回退到基于 lambda 的实现或实现相应回调的完整内部类实例。

StateMachine 之间传播值

到目前为止,本文档仅介绍了如何在子任务中安排工作,但子任务还需要向调用方报告值。由于子任务在逻辑上是异步的,因此其结果会通过回调传回给调用方。为了实现此目的,子任务定义了一个通过其构造函数注入的接收器接口。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

调用方 StateMachine 将如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上述示例演示了以下几点。Caller 必须将结果向后传播,并定义自己的 Caller.ResultSinkCaller 实现 BarProducer.ResultSink 回调。恢复时,processResult 会检查 value 是否为 null,以确定是否发生了错误。这是接受子任务或 SkyValue 查找的输出后常见的行为模式。

请注意,acceptBarError 的实现会根据错误冒泡的要求,急切地将结果转发给 Caller.ResultSink

有关顶级 StateMachine 的替代方案,请参阅 Driver 和桥接到 SkyFunctions

错误处理

Tasks.lookUp 回调StateMachines 之间传播值中,已经有几个错误处理示例。除了 InterruptedException 之外的异常不会被抛出,而是通过回调作为值传递。此类回调通常具有异或语义,即只传递值或错误中的一个。

下一部分将介绍与 Skyframe 错误处理的细微但重要的互动。

错误冒泡 (--nokeep_going)

在错误冒泡期间,即使并非所有请求的 SkyValue 都可用,SkyFunction 也可能会重启。在这种情况下,由于 Tasks API 协定,永远不会达到后续状态。不过,StateMachine 仍应传播异常。

由于无论是否达到下一个状态,都必须进行传播,因此错误处理回调必须执行此任务。对于内部 StateMachine,可通过调用父回调来实现此目的。

在与 SkyFunction 交互的顶级 StateMachine 中,可以通过调用 ValueOrExceptionProducersetException 方法来完成此操作。然后,即使缺少 SkyValue,ValueOrExceptionProducer.tryProduceValue 也会抛出异常。

如果直接使用 Driver,则必须检查 SkyFunction 中传播的错误,即使机器尚未完成处理也是如此。

事件处理

对于需要发出事件的 SkyFunction,系统会将 StoredEventHandler 注入到 SkyKeyComputeState 中,并进一步注入到需要它们的 StateMachine 中。从历史上看,由于 Skyframe 会舍弃某些事件(除非重新播放),因此需要 StoredEventHandler,但此问题后来已得到修复。保留了 StoredEventHandler 注入,因为它可以简化从错误处理回调发出的事件的实现。

Driver 和桥接到 SkyFunctions

Driver 负责管理 StateMachine 的执行,从指定的根 StateMachine 开始。由于 StateMachine 可以递归地将子任务 StateMachine 排入队列,因此单个 Driver 可以管理众多子任务。这些子任务会创建树状结构,这是结构化并发的结果。Driver 批量处理各个子任务的 SkyValue 查找,以提高效率。

有许多类围绕 Driver 构建,具有以下 API。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 接受单个根 StateMachine 作为参数。调用 Driver.drive 会尽可能执行 StateMachine,而无需重启 Skyframe。当 StateMachine 完成时,它会返回 true,否则返回 false,表示并非所有值都可用。

Driver 可维护 StateMachine 的并发状态,非常适合嵌入到 SkyKeyComputeState 中。

直接实例化 Driver

StateMachine 实现通常通过回调来传达其结果。您可以直接实例化 Driver,如以下示例所示。

Driver 嵌入在 SkyKeyComputeState 实现中,同时还嵌入了相应 ResultSink 的实现(稍后会进一步定义)。在顶层,State 对象是计算结果的合适接收器,因为它可以保证比 Driver 存活更长时间。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下代码简要介绍了 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

然后,用于延迟计算结果的代码可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入 Driver

如果 StateMachine 生成一个值且未引发任何异常,则嵌入 Driver 是另一种可能的实现方式,如以下示例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction 可能包含如下所示的代码(其中 StateSkyKeyComputeState 的特定于函数的类型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 实现中嵌入 Driver 更符合 Skyframe 的同步编码风格。

可能会产生异常的 StateMachine

否则,还有 SkyKeyComputeState 可嵌入的 ValueOrExceptionProducerValueOrException2Producer 类,它们具有同步 API 来匹配同步 SkyFunction 代码。

ValueOrExceptionProducer 抽象类包含以下方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

它包含一个嵌入式 Driver 实例,与嵌入驱动程序中的 ResultProducer 类非常相似,并以类似的方式与 SkyFunction 交互。实现会调用 setValuesetException,而不是定义 ResultSink。如果两者同时发生,则以异常为准。tryProduceValue 方法将异步回调代码桥接到同步代码,并在设置异步回调代码时抛出异常。

如前所述,在错误冒泡期间,即使机器尚未完成,也可能会发生错误,因为并非所有输入都可用。为了适应这一点,tryProduceValue 会抛出任何设置的异常,即使在机器完成之前也是如此。

后记:最终移除回调

StateMachine 是一种高效但样板代码密集的方式,用于执行异步计算。延续(特别是以传递给 ListenableFutureRunnable 的形式)在 Bazel 代码的某些部分很常见,但在分析 SkyFunction 中并不普遍。分析主要受 CPU 限制,并且没有用于磁盘 I/O 的高效异步 API。最终,最好优化掉回调,因为回调有学习曲线,并且会影响可读性。

其中一种最有前景的替代方案是 Java 虚拟线程。无需编写回调,一切都替换为同步阻塞调用。之所以能这么做,是因为绑定虚拟线程资源(与平台线程不同)的成本应该很低。不过,即使使用虚拟线程,用线程创建和同步原语替换简单的同步操作的开销也太高。我们从 StateMachine 迁移到 Java 虚拟线程,但速度慢了几个数量级,导致端到端分析延迟时间增加了近 3 倍。由于虚拟线程仍是一项预览功能,因此可能会在性能有所提升后,于日后执行此迁移。

另一种可考虑的方法是等待 Loom 协程(如果它们最终可用)。这样做的好处是,可以通过协作式多任务处理来减少同步开销。

如果所有其他方法都失败了,低级字节码重写也可能是一种可行的替代方案。经过充分优化后,或许可以实现接近手写回调代码的性能。

附录

回调地狱

回调地狱是使用回调的异步代码中一个臭名昭著的问题。这是因为后续步骤的延续嵌套在之前的步骤中。如果步骤很多,这种嵌套可能会非常深。如果与控制流耦合,代码将变得难以管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

嵌套实现的一个优势在于,可以保留外部步骤的堆栈帧。在 Java 中,捕获的 lambda 变量必须是有效的 final 变量,因此使用此类变量可能很麻烦。通过返回方法引用作为续传来避免深度嵌套,而不是返回 lambda,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 注入模式使用得过于密集,也可能会出现回调地狱,但可以通过在注入之间穿插顺序步骤来避免这种情况。

示例:链式 SkyValue 查找

应用逻辑通常需要依赖的 SkyValue 查找链,例如,如果第二个 SkyKey 依赖于第一个 SkyValue。从简单的角度来看,这会产生一个复杂的深层嵌套回调结构。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不过,由于延续指定为方法引用,因此代码在状态转换过程中看起来是过程式的:step2 紧跟在 step1 之后。请注意,此处使用 lambda 来分配 value2。这样一来,代码的顺序就与从上到下的计算顺序一致了。

其他提示

可读性:执行顺序

为了提高可读性,请尽量使 StateMachine.step 实现按执行顺序排列,并使回调实现紧随其在代码中传递的位置。在控制流分支的情况下,这并不总是可行。在这种情况下,添加注释可能会有所帮助。

示例:链式 SkyValue 查找中,创建了一个中间方法引用来实现此目的。这会牺牲少量性能来换取可读性,在此处可能值得。

代际假设

中等生命周期的 Java 对象会打破 Java 垃圾收集器的代际假设,该假设旨在处理生命周期很短的对象或生命周期无限长的对象。根据定义,SkyKeyComputeState 中的对象违反了这一假设。此类对象包含以 Driver 为根的所有仍在运行的 StateMachine 的构建树,由于它们会暂停,等待异步计算完成,因此具有中间生命周期。

在 JDK19 中,这种情况似乎有所改善,但使用 StateMachine 时,有时可能会发现 GC 时间增加,即使实际生成的垃圾大幅减少也是如此。由于 StateMachine 的生命周期介于两者之间,因此可能会被提升为旧代,导致旧代更快填满,从而需要执行更昂贵的主要 GC 或完整 GC 来清理。

最初的预防措施是尽量减少 StateMachine 变量的使用,但有时这是不可行的,例如,如果需要在多个状态中使用某个值。在可能的情况下,本地堆栈 step 变量是年轻代变量,可以高效地进行垃圾回收。

对于 StateMachine 变量,将任务分解为子任务并遵循StateMachine 之间传播值的推荐模式也很有帮助。请注意,按照此模式,只有子 StateMachine 具有对父 StateMachine 的引用,反之则不然。这意味着,当子级完成并使用结果回调更新父级时,子级自然会超出范围并符合垃圾回收条件。

最后,在某些情况下,较早的状态需要 StateMachine 变量,但较晚的状态不需要。一旦确定不再需要大型对象,将其引用设为 null 会很有益。

命名状态

在命名方法时,通常可以根据方法内的行为来命名方法。在 StateMachine 中,由于没有堆栈,因此如何执行此操作不太明确。例如,假设方法 foo 调用子方法 bar。在 StateMachine 中,这可以转换为状态序列 foo,然后是 barfoo 不再包含行为 bar。因此,状态的方法名称往往范围较窄,可能反映的是局部行为。

并发树状图

下图是结构化并发中图表的另一种视图,可更好地描绘树状结构。 这些块构成了一棵小树。

结构化并发 3D


  1. 与 Skyframe 在值不可用时从头开始重新启动的惯例形成对比。 

  2. 请注意,step 允许抛出 InterruptedException,但示例中省略了这一点。Bazel 代码中有一些低级方法会抛出此异常,并且该异常会传播到运行 StateMachineDriver(稍后会介绍)。如果不需要,可以不声明它会抛出异常。 

  3. 并发子任务的动机是 ConfiguredTargetFunction,它会为每个依赖项执行独立的工作。每个依赖项都有自己的独立 StateMachine,而不是操作一次性处理所有依赖项的复杂数据结构,从而避免效率低下。 

  4. 单个步骤中的多个 tasks.lookUp 调用会分批处理。在并发子任务中进行查找时,可以创建额外的批处理。 

  5. 从概念上讲,这与 Java 的结构化并发 jeps/428 类似。 

  6. 这样做类似于生成一个线程并将其加入以实现顺序组合。