Skyframe 状态机指南

报告问题 查看源代码

概览

Skyframe StateMachine 是位于堆上的解构函数对象。当所需的值不是立即可用而采用异步计算方式时,它支持灵活且无冗余的评估1StateMachine 在等待时无法绑定线程资源,但必须暂停和恢复。因此,解构公开了明确的重新入口点,以便可以跳过之前的计算。

StateMachine 可用于表达序列、分支、结构化逻辑并发,并且是专为 Skyframe 交互而定制的。StateMachine 可组合成更大的 StateMachine 并共享子 StateMachine。并发始终具有构造层次结构,并且完全是逻辑的。每个并发子任务都在单个共享的父级 Skyky 线程中运行。

简介

本部分简要介绍并引入了 java.com.google.devtools.build.skyframe.state 软件包中的 StateMachine

Skyframe 重启简介

Skyframe 是对依赖关系图并行执行评估的框架。 图中的每个节点对应于一个 Sky Functions 的求值,SkyKey 指定其参数,SkyValue 指定其结果。此计算模型允许 Sky Functions 通过 SkyKey 查找 SkyValues,从而触发以递归方式并行评估其他 SkyFunction。当所请求的 SkyValue 由于一些计算子图尚未完成时,而不是阻塞(这会连接线程),所请求的 Skyky 函数会观察到 null getValue 响应,并应返回 null(而不是 SkyValue),表示由于输入缺失而导致该调用不完整。当之前请求的所有 SkyValue 均可用时,Skyframe 会重启 SkyFunction。

在引入 SkyKeyComputeState 之前,处理重启的传统方法是完全重新运行计算。虽然这种方法存在二次复杂性,但以这种方式编写的函数最终会完成,因为每次重新运行查找的次数较少都会返回 null。借助 SkyKeyComputeState,您可以将手动指定的检查点数据与 SkyFunction 相关联,从而节省大量重新计算时间。

StateMachine 是位于 SkyKeyComputeState 内的对象,当 SkyFunction 重启(假设 SkyKeyComputeState 不会超出缓存)时,会通过公开挂起和恢复执行钩子来移除几乎所有重新计算。

SkyKeyComputeState 内的有状态计算

从面向对象的设计角度来看,可以考虑将计算对象存储在 SkyKeyComputeState 内,而不是存储在纯数据值中。在 Java 中,对对象携带行为的极简描述是函数接口,事实证明就足够了。StateMachine 具有以下奇怪的递归定义2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 接口类似于 SkyFunction.Environment,但它是为异步设计的,并且添加了对逻辑并发子任务的支持3

step 的返回值是另一个 StateMachine,允许归纳规范一系列步骤。step 会在 StateMachine 完成后返回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

描述具有以下输出的 StateMachine

hello
world

请注意,由于 step2 满足 StateMachine 的功能接口定义,因此方法引用 this::step2 也属于 StateMachine。方法引用是在 StateMachine 中指定下一个状态的最常用方法。

暂停和恢复

直观上,将计算细分为 StateMachine 步骤(而不是单体函数),可提供暂停恢复计算所需的钩子。当 StateMachine.step 返回时,存在明确的挂起点。返回的 StateMachine 值指定的延续是明确的恢复点。这样就可以避免重新计算,因为可以从上次停止的地方继续计算。

回调、接续和异步计算

从技术角度而言,StateMachine 充当后续操作,决定了要执行的后续计算。StateMachine 可以通过从 step 函数返回来挂起(而不是挂起),后者会将控件转移回 Driver 实例。然后,Driver 可以切换到就绪的 StateMachine,或放弃控制权,回到 Skyframe。

过去,“回调”和“延续”往往会整合到一个概念中。不过,StateMachine 对这两者是有区别的。

  • 回调 - 描述异步计算结果的存储位置。
  • 接续 - 指定下一个执行状态。

调用异步操作时需要进行回调,这意味着在调用该方法时,实际操作不会立即发生,像 SkyValue 查询一样。回调应尽可能简单。

延续是 StateMachineStateMachine 返回值,它会封装所有异步计算解析后的复杂执行。这种结构化方法有助于确保回调的复杂程度易于管理。

任务

Tasks 接口为 StateMachine 提供了一个 API,以便通过 SkyKey 查找 SkyValues 并安排并发子任务。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查询

StateMachine 使用 Tasks.lookUp 过载来查找 SkyValues。它们类似于 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,并且具有类似的异常处理语义。实现不会立即执行查询,而是在执行查询前尽可能批量查询4。此值可能无法立即使用,例如需要重启 Skyframe,因此调用方会使用回调指定对生成的值执行什么操作。

StateMachine 处理器(Driver 以及与桥接桥接)可保证该值在下一个状态开始之前可用。请参阅以下示例。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上面的示例中,第一步是查找 new Key(),并将 this 作为使用方传递。之所以能这么做,是因为 DoesLookup 实现了 Consumer<SkyValue>

根据合同规定,在下一个状态 DoesLookup.processValue 开始前,已完成 DoesLookup.step 的所有查找。因此,在 processValue 中访问 value 时可用。

子任务

Tasks.enqueue 用于请求执行逻辑并发子任务。子任务也是 StateMachine,可以执行常规 StateMachine 可以执行的任何操作,包括以递归方式创建更多子任务,或者查询 SkyValues。与 lookUp 非常相似,状态机驱动程序可确保在完成下一步之前所有子任务都已完成。请参阅以下示例。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

虽然 Subtask1Subtask2 在逻辑上是并发的,但所有内容都在单个线程中运行,因此 i 的“并发”更新不需要任何同步。

结构化并发

由于每个 lookUpenqueue 都必须在进入下一个状态之前进行解析,这意味着并发自然地受限于树结构。如以下示例所示,可以创建分层 5 并发。

结构化并发

很难从 UML 中看出并发结构形成一棵树。系统提供了备用视图,可以更好地显示树结构。

非结构化并发

结构化并发的推理要容易得多。

组合和控制流模式

本部分将举例说明如何组合多个 StateMachine 以及某些控制流问题的解决方案。

顺序状态

这是最常见、最直接的控制流模式。SkyKeyComputeState 内的有状态计算中展示了此操作的一个示例。

分支

通过使用常规 Java 控制流返回不同的值,可以在 StateMachine 中实现分支状态,如以下示例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

对于某些分支,为提前完成而返回 DONE 的情况很常见。

高级序列组合

由于 StateMachine 控件结构是无内存的,因此将 StateMachine 定义作为子任务共享有时可能会显得比较棘手。以 M1M2StateMachineS 实例,其中 M1M2 分别是序列 <A, S, B><X, S, Y>StateMachine问题在于,S 不知道在完成之后是继续处于 B 还是 Y,且 StateMachine 没有完全保留调用堆栈。本部分将介绍实现这一目标的一些技巧。

使用 StateMachine 作为终端序列元素

这并不能解决初始问题。仅当序列中的共享 StateMachine 是终端时,它才会显示依序组合。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是一个复杂的状态机,这种方法也适用。

用于依序组合的子任务

由于加入队列的子任务一定会在下一个状态之前完成,因此有时可能会滥用6子任务机制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注射

有时,滥用 Tasks.enqueue 是无法实现的,因为在 S 执行之前必须完成其他并行子任务或 Tasks.lookUp 调用。在这种情况下,将 runAfter 参数注入 S 可用于告知 S 后续步骤。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

这种方法比滥用子任务更简洁。不过,如果应用过于自由(例如,使用 runAfter 嵌套多个 StateMachine,则是指向 Callback Hell 的路)。最好改为拆分具有普通序列状态的序列 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替换为以下内容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止访问替代方案:runAfterUnlessError

在之前的草稿中,我们考虑过会在错误时提前中止的 runAfterUnlessError。原因在于,系统通常会对错误进行两次检查,一次由具有 runAfter 引用的 StateMachine,一次由 runAfter 机器本身检查。

经过一些考虑后,我们判定代码的一致性比删除错误检查结果更重要。如果 runAfter 机制与 tasks.enqueue 机制的运行方式不一致,可能会造成混淆,因为后者始终需要错误检查。

直接委托

每当有正式状态转换时,主 Driver 循环都会前进。根据合同,推进状态意味着之前加入队列的所有 SkyValue 查询和子任务会在下一个状态执行之前解析。有时,委托 StateMachine 的逻辑会使阶段推进变得不必要的或适得其反。例如,如果委托的第一个 step 执行与委托状态查找并行化的 SkyKey 查找,那么相位进阶会使它们依序执行。下例中所示的可能是直接执行委托。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

数据流

前面的讨论侧重于如何管理控制流。本部分介绍数据值的传播。

实现 Tasks.lookUp 回调

下面是一个在 SkyValue 查询中实现 Tasks.lookUp 回调的示例。本部分介绍了根本原因,并提供了处理多个 SkyValues 的方法。

Tasks.lookUp 回调

Tasks.lookUp 方法接受回调 sink 作为参数。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

惯用方法是使用 Java lambda 来实现这一点:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中,myValue 是执行查询的 StateMachine 实例的成员变量。不过,与在 StateMachine 实现中实现 Consumer<SkyValue> 接口相比,lambda 需要额外的内存分配。如果存在多个不明确的查询,lambda 仍非常有用。

此外,还存在处理 Tasks.lookUp 过载的错误,这些情况类似于 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

示例实现如下所示。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

与没有错误处理的查找一样,让 StateMachine 类直接实现回调可为 lamba 节省内存分配。

错误处理提供了更多详细信息,但从本质上讲,错误的传播和正常值的差异不大。

使用多个 SkyValues

通常需要多次 SkyValue 查询。在大多数情况下,其中一种方法是启用 SkyValue 类型。以下是一个从原型生产代码简化的示例。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

由于值类型不同,因此 Consumer<SkyValue> 回调实现可以明确共享。如果不是这样,回退到基于 lambda 的实现或回退到实现适当回调的完整内部类实例。

StateMachine 之间传播值

到目前为止,本文档仅介绍了如何在子任务中安排工作,但子任务还需要向调用方报告值。由于子任务在逻辑上是异步执行的,因此它们的结果会使用回调进行告知给调用方。为此,子任务定义了通过其构造函数注入的接收器接口。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

那么,调用方 StateMachine 将如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上面的示例演示了一些设置。Caller 必须传播其结果并定义自己的 Caller.ResultSinkCaller 会实现 BarProducer.ResultSink 回调。恢复时,processResult 会检查 value 是否为 null,以确定是否发生了错误。这是在接受子任务或 SkyValue 查找的输出后的常见行为模式。

请注意,根据错误气泡的要求,acceptBarError 的实现会急切地将结果转发到 Caller.ResultSink

Driver 以及与 SkyFunctions 桥接中介绍了顶级 StateMachine 的替代方法。

错误处理

Tasks.lookUp 回调StateMachines 之间传播值中提供了几个错误处理示例。系统不会抛出 InterruptedException 以外的异常,而是将它们作为值通过回调进行传递。此类回调通常具有排他性或语义,且只会传递一个值或错误。

下一部分将介绍与 Skyframe 错误处理之间的细微但非常重要的互动。

冒泡错误 (--nokeep_going)

在出现错误的气泡中,即使并非请求的所有 Skyky 值都可用,SkyFunction 也可能会重启。在这种情况下,由于 Tasks API 协定,绝不会达到后续状态。但是,StateMachine 应该仍然会传播该异常。

由于无论是否实现了下一个状态,传播都必须发生,因此错误处理回调必须执行此任务。对于内部 StateMachine,这是通过调用父回调实现的。

在顶层 StateMachine(与 SkyFunction 进行交互)中,这可以通过调用 ValueOrExceptionProducersetException 方法完成。然后,即使缺少 SkyValues,ValueOrExceptionProducer.tryProduceValue 也会抛出异常。

如果 Driver 被直接使用,就必须检查是否存在来自 Sky Functions 的传播错误,即使机器尚未完成处理也是如此。

事件处理

对于需要发出事件的 SkyFunction,StoredEventHandler 会注入到 SkyKeyComputeState 中,然后又注入需要它们的 StateMachine 中。过去,需要 StoredEventHandler,因为 Skyframe 会丢弃某些事件,除非重放这些事件,但随后修复了此问题。保留 StoredEventHandler 注入,因为它简化了从错误处理回调发出的事件的实现。

Driver 和桥接至 SkyFunctions

Driver 负责管理 StateMachine 的执行,从指定的根 StateMachine 开始。由于 StateMachine 可以递归地将子任务 StateMachine 加入队列,因此单个 Driver 可以管理众多子任务。这些子任务是通过结构化并发创建树结构。Driver 可跨子任务批量处理 SkyValue 查询,以提高效率。

围绕 Driver 构建了一些类,具有以下类。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 接受单个根 StateMachine 作为参数。调用 Driver.drive 会尽可能多地执行 StateMachine,而不使用 Skyframe 重启。当 StateMachine 完成时返回 true,否则返回 false,表示并非所有值都可用。

Driver 会维护 StateMachine 的并发状态,这种方法非常适合嵌入 SkyKeyComputeState

直接实例化 Driver

StateMachine 实现通常会通过回调传达其结果。您可以直接实例化 Driver,如以下示例所示。

Driver 与相应 ResultSink 的实现一起嵌入到 SkyKeyComputeState 实现中,后者的定义会更底层。在顶层,State 对象是计算结果的适当接收器,因为它肯定存在 Driver

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下代码展示了 ResultProducer 的草图。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

那么,用于延迟计算结果的代码可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入“Driver

如果 StateMachine 生成一个值并且没有引发异常,则嵌入 Driver 是另一种可能的实现,如以下示例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 的代码可能如下所示(其中 State 是函数专用的 SkyKeyComputeState 类型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 实现中嵌入 Driver 更适合 Skyky 的同步编码样式。

可能引发异常的 StateMachine

否则,便可嵌入 SkyKeyComputeStateValueOrExceptionProducerValueOrException2Producer 类,这些类具有与同步的 SkyFunction 代码匹配的同步 API。

ValueOrExceptionProducer 抽象类包含以下方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

它包含一个嵌入式 Driver 实例,类似于嵌入驱动程序中的 ResultProducer 类,并采用类似方式与 Sky Functions 进行交互。无论发生何种情况,实现都会调用 setValuesetException,而不是定义 ResultSink。如果两种情况均出现,则优先考虑例外情况。tryProduceValue 方法可将异步回调代码桥接到同步代码,并在已设置该代码时抛出异常。

如前所述,在出错气泡期间,即使机器尚未完成,也可能会出错,因为并非所有输入都可用。为了适应这一点,tryProduceValue 甚至在机器完成之前还会抛出所有已设置的异常。

结语:最终移除回调

StateMachine 是执行异步计算的高效方式,但样板密集型方式执行。延续(具体而言,形式为传递给 ListenableFutureRunnable)在 Bazel 代码的某些部分广泛传播,但在分析 SkyFunctions 中并不普遍。分析主要受 CPU 限制,并且没有适用于磁盘 I/O 的高效异步 API。最终,最好优化回调,因为它们有学习曲线,并且会阻碍可读性。

最具潜力的替代方案之一是 Java 虚拟线程。所有代码都无需编写回调,而是全部替换为同步阻塞调用。之所以能这么做,是因为与平台线程不同,将虚拟线程资源捆绑到本地应该很便宜。但是,即使对于虚拟线程,使用线程创建和同步基元替换简单的同步操作开销也非常高昂。我们执行了从 StateMachineJava 虚拟线程的迁移,而且迁移速度慢了几个数量级,这使得端到端分析延迟时间几乎增加了 3 倍。由于虚拟线程仍然是预览功能,因此有可能在以后性能提升时执行此迁移。

另一种可考虑的方法是等待 Loom 协程可用(如果可用)。这种方法的优势在于,可以通过使用协同多任务处理来减少同步开销。

如果所有其他方法都失败,也可使用低级别的字节码重写方式。通过充分优化,或许能够达到接近手写回调的性能。

附录

回头见

回调问题属于使用回调的异步代码存在的问题。这源于如下事实:后续步骤的延续嵌套在上一步中。如果嵌套的步骤有很多,这种嵌套可能会极其深层。如果与控制流结合使用,代码会变得无法管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

嵌套实现的好处之一是,可以保留外部步骤的堆栈帧。在 Java 中,捕获的 lambda 变量必须是最终变量,因此使用此类变量可能会比较麻烦。通过将方法引用作为延续(而不是 lambda)返回,可以避免深度嵌套,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 注入模式使用得过于密集,也可能会出现回调地现象,但可以通过交错注入依序执行的步骤来避免这种情况。

示例:链式 SkyValue 查询

通常,应用逻辑需要依赖的 Skyy 查询链,例如,第二个 SkyKey 依赖于第一个 SkyValue。简单来讲,这会产生一个复杂、深层嵌套的回调结构。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不过,由于接续被指定为方法引用,因此代码在各状态转换中看起来像是程序:step2 跟在 step1 后面。请注意,此处使用 lambda 分配 value2。这会使代码的顺序与从上到下计算的顺序保持一致。

其他提示

可读性:执行顺序

为了提高可读性,请尽量将 StateMachine.step 实现保持在执行顺序中,并使回调实现紧跟在它们在代码中的传入位置之后。控制流分支并不总是能够做到这一点。在这种情况下,额外的评论可能有用。

示例:链式 SkyValue 查找中,我们创建了中间方法引用来实现这一点。这样做只会牺牲少量性能,以提高可读性,因此可能值得一试。

生成假设

中期存在的 Java 对象突破了 Java 垃圾回收器的代代假设,这种假设旨在处理持续时间很短的对象或永久存在的对象。根据定义,SkyKeyComputeState 中的对象违反了此假设。此类包含包含所有仍在运行且位于根 DriverStateMachine 的构造树在暂停时会具有中间生命周期,等待异步计算完成。

在 JDK19 中,这样做似乎不太好,但使用 StateMachine 时,有时可能会观察到 GC 时间增加,即使实际垃圾回收次数大幅减少也是如此。由于 StateMachine 具有中间生命周期,因此可能会提升到旧代,从而导致其填充速度变快,从而导致清理成本较高的主要 GC 或完整 GC。

最初的预防措施是尽量减少使用 StateMachine 变量,但这有时并不可行,例如,如果需要跨多个状态需要某个值。如果可能,本地堆栈 step 变量是新生代变量,实际上是 GC 的变量。

对于 StateMachine 变量,将事项分解为子任务并遵循StateMachine 之间传播值的建议模式也会很有帮助。请注意,在遵循该模式之后,只有子 StateMachine 对父 StateMachine 进行引用,反之亦然。这意味着,当子项使用结果回调完成并更新子项时,子项自然会超出范围并符合使用 GC 的条件。

最后,在某些情况下,需要在早期状态下使用 StateMachine 变量,但在后期阶段则不需要。在知道不再需要大型对象后,清空这些对象会很有帮助。

命名状态

为方法命名时,通常就可以为该方法中发生的行为命名方法。我们不清楚如何在 StateMachine 中执行此操作,因为没有堆栈。例如,假设方法 foo 调用子方法 bar。在 StateMachine 中,这可以转换为状态序列 foo,后跟 barfoo 不再包含 bar 行为。因此,状态的方法名称往往需要更严格的范围,这样可能反映了本地行为。

并发树图

以下是结构化并发示意图的另一种视图,更好地描述了树结构。方块会形成一棵小树。

结构化并发 3D


  1. Skyframe 不支持在值不可用时从头开始重启。 

  2. 请注意,允许 step 抛出 InterruptedException,但这些示例中省略了这一点。Bazel 代码中有一些会抛出此异常并传播到 Driver(稍后将介绍)的低方法,该方法会运行 StateMachine。您也可以在不必要时将其声明为抛出。🎁?

  3. ConfiguredTargetFunction 是并发子任务的,它会为每个依赖项执行独立的任务。每个依赖项都具有自己的独立 StateMachine,而不是操控同时处理所有依赖项的复杂数据结构。🎁?

  4. 一个步骤中的多个 tasks.lookUp 调用会批量处理。通过并发子任务中进行查找,可以创建其他批处理操作。

  5. 它在概念上类似于 Java 的结构化并发 jeps/428

  6. 这样做类似于衍生线程并连接该线程以实现顺序组合。