Skyframe StateMachines のガイド

問題を報告 ソースを表示

概要

Skyframe StateMachine は、ヒープ上に存在する分解された関数オブジェクトです。必要な値がすぐに利用できないものの、非同期で計算される場合に、冗長性のない柔軟な評価1をサポートします。StateMachine は待機中にスレッド リソースを結合できませんが、代わりに一時停止して再開する必要があります。したがって、分解により明示的な再エントリポイントが公開されるため、以前の計算をスキップできます。

StateMachine は、シーケンス、分岐、構造化論理同時実行を表現するために使用でき、Skyframe のインタラクションに合わせて調整されています。StateMachine は、より大きな StateMachine にコンポーズし、サブ StateMachine を共有できます。同時実行は常に構成によって階層化され、純粋に論理的です。すべての同時実行サブタスクは、単一の共有の親 SkyFunction スレッドで実行されます。

はじめに

このセクションでは、java.com.google.devtools.build.skyframe.state パッケージにある StateMachine について簡単に説明します。

Skyframe の再起動の概要

Skyframe は、依存関係グラフを並行して評価するフレームワークです。グラフ内の各ノードは、パラメータを指定する SkyKey とその結果を指定する SkyValue による SkyFunction の評価に対応しています。この計算モデルでは、SkyFunction が SkyKey で SkyValues を検索し、追加の SkyFunction の再帰的な並列評価をトリガーします。計算のサブグラフが不完全であるために要求された SkyValue の準備がまだできていない場合、ブロックではなく、要求された SkyFunction が null getValue レスポンスを監視し、SkyValue の代わりに null を返し、入力がないために不完全であることを知らせます。以前にリクエストされた SkyValues がすべて使用可能になると、SkyFrame は SkyFunctions を再起動します。

SkyKeyComputeState が導入される前は、再起動を処理する従来の方法は、計算を完全に再実行することでした。これには 2 次的な複雑さがありますが、このように記述された関数は、再実行のたびに null を返すルックアップが少なくなるため、最終的に完了します。SkyKeyComputeState を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることで、再計算を大幅に削減できます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトです。一時停止フックと再開実行フックを公開することで、SkyFunction の再起動時に(SkyKeyComputeState がキャッシュから抜けないことが前提)、実質的にすべての再計算を回避できます。

SkyKeyComputeState 内のステートフル計算

オブジェクト指向の設計の観点からは、純粋なデータ値ではなく、SkyKeyComputeState 内に計算オブジェクトを格納することは理にかなっています。Java では、オブジェクトを処理する動作の最低限の記述は関数インターフェースであり、これだけで十分であることがわかりました。StateMachine には、不思議なほど再帰的な定義2 があります。

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment に似ていますが、非同期に対応するように設計されており、論理的に同時実行するサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であり、帰納的に一連のステップを指定できます。StateMachine が完了すると、stepDONE を返します。次に例を示します。

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

次の出力で StateMachine を記述します。

hello
world

step2StateMachine の関数インターフェース定義を満たしているため、メソッド参照 this::step2StateMachine です。メソッド参照は、StateMachine で次の状態を指定する最も一般的な方法です。

一時停止と再開

直感的には、計算をモノリシック関数ではなく StateMachine ステップに分割することで、計算の一時停止と再開suspendに必要なフックが提供されます。suspendStateMachine.step が返されると、明示的な一時停止ポイントがあります。返される StateMachine 値で指定された継続は、明示的な再開ポイントです。中断したところから計算を再開できるため、再計算を回避できます。

コールバック、継続、非同期計算

技術的には、StateMachine は継続continuationとして機能し、後に実行する計算を決定します。StateMachine は、ブロックを行う代わりに step 関数から戻ることで自発的にsuspendできます。この関数は制御を Driver インスタンスに戻します。その後、Driver は準備完了の StateMachine に切り替えるか、制御を Skyframe に戻すことができます。

従来、コールバックと継続は 1 つのコンセプトに統合されていました。callbackscallbacksただし、StateMachine は両者の区別を維持します。

  • コールバック - 非同期計算の結果を保存する場所を記述します。
  • 継続 - 次の実行状態を指定します。

非同期処理を呼び出す場合はコールバックが必要です。つまり、SkyValue 検索の場合のように、メソッド呼び出し直後に実際の処理が実行されることはありません。コールバックはできる限りシンプルにする必要があります。

継続は StateMachineStateMachine 戻り値で、すべての非同期計算が解決された後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。

タスク

Tasks インターフェースは、SkyKey で SkyValues を検索し、同時実行サブタスクをスケジュールするための API を StateMachine に提供します。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue ルックアップ

StateMachine は、Tasks.lookUp オーバーロードを使用して SkyValues を検索します。これらは SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow に類似しており、例外処理のセマンティクスも類似しています。この実装では、ルックアップはすぐには実行されず、その前に可能な限り多くのルックアップ4 がバッチ処理されます。Skyframe の再起動が必要な場合など、値はすぐに利用できない場合があるため、呼び出し元はコールバックを使用して結果の値をどのように処理するかを指定します。

StateMachine プロセッサ(Driver と SkyFrame へのブリッジ)は、次の状態が開始される前に値が使用可能であることを保証します。以下に例を示します。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() を検索し、コンシューマとして this を渡します。これが可能なのは、DoesLookupConsumer<SkyValue> を実装しているためです。

契約により、次の状態 DoesLookup.processValue が始まる前に DoesLookup.step のすべてのルックアップが完了します。したがって、valueprocessValue でアクセスされるときに使用できます。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine であり、追加のサブタスクの再帰的な作成や SkyValues の検索など、通常の StateMachine で実行できるすべての操作を実行できます。lookUp と同様に、ステートマシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下に例を示します。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時ですが、すべてが 1 つのスレッドで実行されるため、i の「同時」更新に同期は必要ありません。

構造化された同時実行

すべての lookUpenqueue は次の状態に進む前に解決する必要があるため、同時実行は当然ながらツリー構造に限定されます。次の例に示すように、階層型5 同時実行を作成できます。

構造化同時実行

UML からは、同時実行構造がツリーを形成していることを区別するのは困難です。ツリー構造をより適切に示す代替ビューもあります。

非構造化同時実行

構造化された同時実行は、簡単に推測できます。

構成と制御フローのパターン

このセクションでは、複数の StateMachine を作成する方法の例と、特定の制御フローの問題に対する解決策を紹介します。

シーケンシャル ステータス

これは最も一般的でわかりやすい制御フロー パターンです。例については、SkyKeyComputeState 内のステートフル計算をご覧ください。

分岐

StateMachine の分岐状態は、通常の Java 制御フローを使用して異なる値を返すことで実現できます。次の例をご覧ください。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

一部のブランチでは、早期完了のために DONE を返すのが一般的です。

高度な順次合成

StateMachine 制御構造はメモリレスであるため、StateMachine 定義をサブタスクとして共有するのは難しい場合があります。M1M2StateMachineS を共有する StateMachine インスタンスとします。M1M2 はそれぞれシーケンス <A, S, B><X, S, Y> です。問題は、S が完了後に BY のどちらを継続するのかわからず、StateMachine がコールスタックを完全に保持しないことです。このセクションでは、これを実現するためのいくつかの手法について説明します。

終端シーケンス要素としての StateMachine

これでは最初に提起された問題を解決できません。共有 StateMachine がシーケンスの終端である場合にのみ、シーケンシャル コンポジションを示します。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑なステートマシンであっても機能します。

シーケンシャル コンポジションのサブタスク

キューに登録されたサブタスクは次の状態の前に完了することが保証されるため、サブタスク メカニズムがわずかに乱れる可能性があります6

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

S の実行前に他の並列サブタスクや Tasks.lookUp 呼び出しを完了しなければならないため、Tasks.enqueue の乱用が不可能な場合もあります。この場合、runAfter パラメータを S に挿入することで、次に行うべきことを S に通知できます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを乱用するよりもクリーンです。しかし、複数の StateMachinerunAfter でネストするなど、かなり広く適用することは、Callback Hell への道のりです。代わりに、連続した runAfter を通常の連続状態で分割することをおすすめします。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

これを次のように置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、エラー発生時に早期に中止する runAfterUnlessError を検討していました。これは、エラーが runAfter 参照を持つ StateMachine によって 1 回、runAfter マシン自体によって 1 回、合計 2 回チェックされることがよくあるためです。

検討を重ねた結果、エラーチェックの重複除去よりもコードの均一性の方が重要であると判断しました。runAfter メカニズムが tasks.enqueue メカニズム(常にエラーチェックを必要とする)と一貫して動作しないと、混乱を招きます。

直接委任

正式な状態遷移が発生するたびに、メイン Driver ループが進行します。契約に従い、状態が進行すると、以前にキューに登録された SkyValue ルックアップとサブタスクはすべて、次の状態が実行される前に解決されます。デリゲート StateMachine のロジックが原因で、フェーズを進める必要がなくなり、逆効果になることがあります。たとえば、デリゲートの最初の step が委任状態のルックアップと並列化できる SkyKey ルックアップを実行する場合、フェーズを進めると、これらはシーケンシャルになります。次の例に示すように、直接委任のほうが合理的です。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

データフロー

ここまでの説明では、制御フローの管理に焦点を当ててきました。このセクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

以下は、SkyValue ルックアップTasks.lookUp コールバックを実装する例です。このセクションでは、複数の SkyValue を処理する理由を説明し、提案します。

Tasks.lookUp コールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的なアプローチは、Java ラムダを使用してこれを実装することです。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

myValue は、ルックアップを行う StateMachine インスタンスのメンバー変数です。ただし、ラムダには、StateMachine 実装で Consumer<SkyValue> インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要です。このラムダは、あいまいになるような複数のルックアップがある場合にも有用です。

また、SkyFunction.Environment.getValueOrThrow と同様に、Tasks.lookUp のオーバーロードのエラー処理もあります。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

実装例を以下に示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

エラー処理なしのルックアップと同様に、StateMachine クラスでコールバックを直接実装することで、ラムバのメモリ割り当てを節約できます。

エラー処理ではもう少し詳しく説明しますが、基本的には、エラーの伝播と通常の値に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要になります。SkyValue のタイプに切り替えるのが、ほとんどの場合で有効なアプローチです。以下は、プロトタイプの本番環境コードから簡略化した例です。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

値の型が異なるため、Consumer<SkyValue> コールバックの実装は明確に共有できます。そうでない場合は、ラムダベースの実装または適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックできます。

StateMachine 間の値の伝播

ここまで、このドキュメントではサブタスクでの作業の配置方法について説明しましたが、サブタスクでも呼び出し元に値を報告する必要があります。サブタスクは論理的に非同期であるため、結果はコールバックを使用して呼び出し元に返されます。これを機能させるために、サブタスクはコンストラクタを介して挿入されるシンク インターフェースを定義します。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

この場合、呼び出し元の StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例では、いくつかの点を示しています。Caller は結果を伝播し、独自の Caller.ResultSink を定義する必要があります。CallerBarProducer.ResultSink コールバックを実装します。再開時に、processResultvalue が null かどうかを確認し、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップからの出力を受け入れた後の一般的な動作パターンです。

acceptBarError の実装は、エラーバブリングで要求されているように、結果を Caller.ResultSink に積極的に転送します。

トップレベルの StateMachine の代替方法については、Driver と SkyFunctions へのブリッジをご覧ください。

エラー処理

すでに Tasks.lookUp コールバックにエラー処理の例と StateMachines 間での値の伝播の例がいくつか含まれています。InterruptedException 以外の例外はスローされず、代わりにコールバックを介して値として渡されます。多くの場合、このようなコールバックには排他的またはセマンティクスがあり、値またはエラーのいずれか 1 つのみが渡されます。

次のセクションでは、Skyframe のエラー処理との微妙ながらも重要な操作について説明します。

エラーバブリング(--nokeep_going)

エラーバブリング中に、リクエストされたすべての SkyValue が使用可能でなくても、SkyFunction が再起動されることがあります。この場合、Tasks API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine は引き続き例外を伝播する必要があります。

伝播は次の状態に到達したかどうかに関係なく行われる必要があるため、エラー処理コールバックはこのタスクを実行する必要があります。内部 StateMachine の場合、これは親コールバックを呼び出すことで実現できます。

SkyFunction とインターフェースする最上位の StateMachine では、ValueOrExceptionProducersetException メソッドを呼び出します。SkyValues が欠落している場合でも、ValueOrExceptionProducer.tryProduceValue は例外をスローします。

Driver を直接使用している場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。

イベント処理

イベントを出力する必要がある SkyFunctions の場合、StoredEventHandler が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine に挿入されます。これまで、Skyframe で特定のイベントがリプレイされない限りドロップすることが原因で、StoredEventHandler が必要でしたが、これは後で修正されました。StoredEventHandler インジェクションが保持されるのは、エラー処理コールバックから出力されるイベントの実装が簡素化されるためです。

Driver と SkyFunctions へのブリッジ

Driver は、指定されたルート StateMachine で始まる StateMachine の実行を管理します。StateMachine はサブタスク StateMachine を再帰的にキューに登録できるため、1 つの Driver で多数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果であるツリー構造を作成します。Driver は、効率向上のためにサブタスク間で SkyValue ルックアップをバッチ処理します。

次の API を使用して、Driver を中心にいくつかのクラスが構築されています。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。Driver.drive を呼び出すと、Skyframe の再起動なしで可能な限り StateMachine が実行されます。StateMachine が完了した場合は true を返し、完了しない場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。

DriverStateMachine の同時実行状態を維持できるため、SkyKeyComputeState への埋め込みに適しています。

Driver の直接インスタンス化

StateMachine 実装は通常、コールバックを介して結果を伝えます。次の例に示すように、Driver を直接インスタンス化できます。

Driver は、対応する ResultSink の実装とともに SkyKeyComputeState の実装に埋め込まれます。この実装は後ほど定義します。トップレベルでは、State オブジェクトは Driver よりも長く存続することが保証されているため、計算結果のレシーバとして適しています。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードでは、ResultProducer をスケッチしています。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

この場合、結果を遅延計算するコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

Driver の埋め込み

StateMachine が値を生成し、例外を発生させない場合は、次の例に示すように、Driver を埋め込むこともできます。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(StateSkyKeyComputeState の関数固有の型です)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

DriverStateMachine 実装に埋め込むと、Skyframe の同期コーディング スタイルに適しています。

例外を生成する可能性がある StateMachine

それ以外の場合は、同期 SkyFunction コードと一致する同期 API を持つ SkyKeyComputeState 埋め込み可能な ValueOrExceptionProducer クラスと ValueOrException2Producer クラスがあります。

ValueOrExceptionProducer 抽象クラスには、次のメソッドが含まれます。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

これには埋め込みの Driver インスタンスが含まれており、埋め込みドライバResultProducer クラスによく似ており、同様の方法で SkyFunction とインターフェースがあります。実装では、ResultSink を定義する代わりに、setValue または setException のいずれかが発生したときに呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue メソッドは、非同期コールバック コードを同期コードにブリッジし、設定されると例外をスローします。

前述のように、エラーバブリング中は、すべての入力が使用可能ではないため、マシンがまだ完了していなくてもエラーが発生する可能性があります。これに対応するため、tryProduceValue は、マシンが完了する前であっても、設定された例外をスローします。

エピローグ: 最終的にコールバックを削除

StateMachine は、非同期計算を実行するための非常に効率的ですが、ボイラープレートを多用する方法です。継続(特に ListenableFuture に渡される Runnable の形式)は、Bazel コードの特定部分では広く使用されていますが、SkyFunctions 分析ではあまり使用されていません。分析のほとんどは CPU の制約を受けており、ディスク I/O 用の効率的な非同期 API はありません。最終的には、コールバックは習得に時間がかかり、読みやすさを損なうため、最適化することをおすすめします。

最も有望な選択肢の一つは、Java 仮想スレッドです。コールバックを書き込む必要はなく、すべて同期のブロッキング呼び出しに置き換えられます。これが可能になるのは、プラットフォーム スレッドとは異なり、仮想スレッド リソースのタイアップは安価であるためです。ただし、仮想スレッドを使用する場合でも、単純な同期オペレーションをスレッド作成と同期のプリミティブに置き換えると、コストがかかりすぎることになります。StateMachine から Java 仮想スレッドへの移行を実行したところ、桁違いに速度が低下し、エンドツーエンドの分析レイテンシが約 3 倍増加しました。仮想スレッドはまだプレビュー機能であるため、パフォーマンスが改善した後、この移行を実施できる可能性があります。

考慮すべきもう 1 つのアプローチは、Loom コルーチンが使用可能になるのを待つことです。この方法の利点は、協調型マルチタスクを使用することで同期のオーバーヘッドを削減できる可能性があることです。

他のどれでもうまくいかない場合は、低レベルのバイトコードの書き換えが有効な代替手段になる可能性があります。十分な最適化を行うことで、手書きのコールバック コードに近いパフォーマンスを実現できる場合があります。

付録

コールバック地獄

コールバックの地獄は、コールバックを使用する非同期コードにおける悪名高い問題です。これは、後続のステップの継続が前のステップ内にネストされていることに起因しています。多数のステップがある場合、このネストは非常に深くなる可能性があります。制御フローと組み合わせると、コードを管理できなくなります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の一つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に最終的なものであるため、そのような変数を使用するのが面倒な場合があります。次のように、メソッド参照をラムダではなく継続として返すことで、深いネストを回避できます。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクション パターンを密かに使用しすぎた場合にもコールバック ヘルルが発生する可能性がありますが、これは、連続したステップで注入を散在させることで回避できます。

例: 連鎖されている SkyValue ルックアップ

多くの場合、アプリ ロジックで SkyValue ルックアップの依存チェーンが必要となることがあります。たとえば、2 つ目の SkyKey が 1 つ目の SkyValue に依存している場合などです。単純なことを考えてみると、複雑で深くネストされたコールバック構造になる可能性があります。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されるため、コードは状態遷移をまたいで手続き型に見えるため、step2step1 に従います。ここでは、ラムダを使用して value2 を割り当てます。これにより、コードの順序が上から下の順に計算の順序と一致します。

その他のヒント

読みやすさ: 実行順序

読みやすくするために、StateMachine.step 実装を実行順序に維持し、コールバック実装をコード内で渡される場所の直後に配置するようにしてください。これは、制御フローが分岐する場所では常に実現できるとは限りません。そのような場合は、追加のコメントがあると役に立つかもしれません。

例: 連鎖の SkyValue ルックアップでは、これを実現するために中間メソッド参照を作成しています。これにより、読みやすさと引き換えに若干のパフォーマンスがもたらされます。

世代別仮説

存続期間が中程度の Java オブジェクトは、Java ガベージ コレクタの世代的な仮説を破ります。これは、非常に短い期間存続するオブジェクトや永久に存続するオブジェクトを処理するように設計されています。定義上、SkyKeyComputeState 内のオブジェクトはこの仮説に違反します。Driver をルートとして、実行中のすべての StateMachine の構成ツリーを含むこのようなオブジェクトは、非同期計算の完了を待機する中間の存続期間を持ちます。

JDK19 ではそれほど問題ないように見えますが、StateMachine を使用すると、実際に生成されるガベージが劇的に減少しても、GC 時間が増加することがあります。StateMachine の存続期間は中程度であるため、古い世代に昇格させるとすぐに満杯になり、より高価なメジャー GC またはフル GC をクリーンアップする必要があります。

最初の予防策は StateMachine 変数の使用を最小限に抑えることですが、複数の状態にわたって値が必要な場合などは、常に実現できるとは限りません。可能な場合、ローカル スタックの step 変数は新しい世代の変数であり、効率的に GC されます。

StateMachine 変数の場合は、物事をサブタスクに分割し、StateMachine 間の値の伝播の推奨パターンに従うことも有効です。このパターンに従うと、子 StateMachine のみが親 StateMachine を参照しており、その逆はないことに注目してください。つまり、子が結果のコールバックを使用して親を完了して更新すると、子は自然にスコープ外になり、GC の対象になります。

最後に、以前の状態では StateMachine 変数が必要で、後の状態では必要ない場合があります。大きなオブジェクトが不要であることが判明したら、そのオブジェクトの参照を null にすると有益な場合があります。

状態に名前を付ける

メソッドに名前を付ける場合、通常は、そのメソッド内で行われる動作に合わせてメソッドに名前を付けることができます。スタックが存在しないため、StateMachine で行う方法はわかりにくい。たとえば、メソッド foo がサブメソッド bar を呼び出すとします。StateMachine では、これを状態シーケンス foo、その後に bar に変換できます。foobar の動作が含まれなくなりました。その結果、状態のメソッド名は範囲が狭くなる傾向があり、ローカルの動作を反映している可能性もあります。

同時実行ツリーの図

以下は、ツリー構造をより良く表している構造化された同時実行の図の別のビューです。ブロックは小さな木を形成します。

構造化同時実行 3D


  1. 値が使用できない場合に最初から再開するという Skyframe の規則とは対照的です。 

  2. stepInterruptedException をスローできますが、この例では省略されています。Bazel コードには、この例外をスローし、StateMachine を実行する Driver(後述)に伝播するローメソッドがいくつかあります。不要なときにスローされるように宣言しなくても問題ありません。

  3. 同時実行サブタスクは、依存関係ごとに独立した処理を実行する ConfiguredTargetFunction によって動作します。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性が発生するのではなく、各依存関係に独自の独立した StateMachine があります。

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しは、まとめてバッチ処理されます。同時実行サブタスク内で行われるルックアップによって、追加のバッチ処理を作成できます。 

  5. これは概念的には、Java の構造化された同時実行 jeps/428 に類似しています。 

  6. これは、スレッドを生成して結合してシーケンシャル コンポジションを実現する方法と似ています。