Skyframe StateMachines のガイド

問題を報告 ソースを表示 Nightly · 7.4 . 7.3 · 7.2 · 7.1 · 7.0 · 6.5

概要

Skyframe StateMachine は、ヒープ上に存在する分解された関数オブジェクトです。必要な値がすぐに利用できず、非同期で計算される場合、冗長性のない柔軟な評価をサポートします1StateMachine は待機中にスレッド リソースを占有することはできません。代わりに、一時停止して再開する必要があります。したがって、逆コンストラクションにより明示的な再入ポイントが公開され、以前の計算をスキップできます。

StateMachine は、シーケンス、分岐、構造化論理同時実行を表現するために使用でき、Skyframe の操作用に特別に調整されています。StateMachine は、より大きな StateMachine にコンポーズしてサブ StateMachine を共有できます。コンカレンスは、常に構造的に階層的であり、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。

はじめに

このセクションでは、java.com.google.devtools.build.skyframe.state パッケージにある StateMachine について簡単に説明します。

Skyframe の再起動の概要

Skyframe は、依存関係グラフの並列評価を実行するフレームワークです。グラフの各ノードは、パラメータを指定する SkyKey と結果を指定する SkyValue を持つ SkyFunction の評価に対応しています。計算モデルは、SkyFunction が SkyKey で SkyValue を検索し、追加の SkyFunction の再帰的な並列評価をトリガーするように設計されています。計算のサブグラフの一部が不完全であるために、リクエストされた SkyValue がまだ準備できていない場合、リクエスト元の SkyFunction は null getValue レスポンスを検出し、SkyValue ではなく null を返して、入力がないため不完全であることを通知します。以前にリクエストされたすべての SkyValues が利用可能になると、Skyframe は SkyFunctions を再起動します。

SkyKeyComputeState が導入される前は、再起動を処理する従来の方法は、計算を完全に再実行することでした。これは複雑さは二次関数ですが、この方法で記述された関数は、再実行するたびに、null を返すルックアップが減るため、最終的には完了します。SkyKeyComputeState を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることができるため、再計算を大幅に削減できます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトであり、停止と再開の実行フックを公開することで、SkyFunction の再起動時にほぼすべての再計算を排除します(SkyKeyComputeState がキャッシュから外れないことを前提としています)。

SkyKeyComputeState 内のステートフル計算

オブジェクト指向設計の観点からは、純粋なデータ値ではなく、計算オブジェクトを SkyKeyComputeState 内に保存することを検討するのが理にかなっています。Java では、動作を伝播するオブジェクトの最小限の記述は関数インターフェースであり、これで十分です。StateMachine には、不思議なほど再帰的な定義2 があります。

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment に似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であり、帰納的に一連のステップを指定できます。StateMachine が完了すると、stepDONE を返します。例:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

は、次の出力で StateMachine を記述します。

hello
world

step2StateMachine の関数インターフェース定義を満たしているため、メソッド参照 this::step2StateMachine です。メソッド参照は、StateMachine で次の状態を指定する最も一般的な方法です。

一時停止と再開

直感的には、計算をモノリシック関数ではなく StateMachine ステップに分割することで、計算の一時停止と再開に必要なフックが提供されます。StateMachine.step が返されると、明示的な停止ポイントがあります。返された StateMachine 値で指定された連続性は、明示的な再開ポイントです。計算は中断したところから正確に再開できるため、再計算を回避できます。

コールバック、継続、非同期計算

技術的な用語では、StateMachine は継続として機能し、実行される後続の計算を決定します。StateMachine は、ブロックする代わりに、step 関数から戻ることで自発的に停止できます。これにより、制御が Driver インスタンスに戻ります。Driver は、準備が整った StateMachine に切り替えるか、Skyframe に制御を返すことができます。

従来、コールバック継続は 1 つのコンセプトに統合されていました。ただし、StateMachine は両者の区別を維持します。

  • コールバック - 非同期計算の結果を保存する場所を指定します。
  • 継続 - 次の実行状態を指定します。

非同期処理を呼び出す場合はコールバックが必要です。つまり、SkyValue 検索の場合のように、メソッド呼び出し直後に実際の処理が実行されることはありません。コールバックはできるだけシンプルにする必要があります。

継続処理StateMachineStateMachine 戻り値であり、すべての非同期計算が解決された後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。

タスク

Tasks インターフェースは、SkyKey で SkyValue を検索し、同時実行サブタスクをスケジュールする API を StateMachine に提供します。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue ルックアップ

StateMachineTasks.lookUp オーバーロードを使用して SkyValue を検索します。SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow に類似しており、例外処理のセマンティクスも同様です。この実装では、ルックアップはすぐに実行されず、可能な限り多くのルックアップをバッチ処理してから実行されます。値がすぐに利用できない場合があります(Skyframe の再起動が必要な場合など)。そのため、呼び出し元はコールバックを使用して、結果の値をどのように処理するかを指定します。4

StateMachine プロセッサ(Driver と SkyFrame へのブリッジ)は、次の状態が開始される前に値が使用可能であることを保証します。以下の例を

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() を検索し、コンシューマーとして this を渡します。これは、DoesLookupConsumer<SkyValue> を実装しているため可能です。

契約により、次の状態 DoesLookup.processValue が始まる前に DoesLookup.step のすべてのルックアップが完了します。したがって、valueprocessValue でアクセスされた場合に使用できます。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine であり、通常の StateMachine ができることは何でもできます。たとえば、サブタスクを再帰的に作成したり、SkyValue を検索したりできます。lookUp と同様に、ステートマシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時実行されますが、すべてが単一スレッドで実行されるため、i の「同時実行」更新に同期は必要ありません。

構造化された同時実行

すべての lookUpenqueue は、次の状態に進む前に解決する必要があります。つまり、同時実行は当然、ツリー構造に制限されます。次の例に示すように、階層型の 5 同時実行を作成できます。

構造化された同時実行

UML からは、同時実行構造がツリーを形成していることを区別するのは困難です。ツリー構造をより適切に示す代替ビューがあります。

非構造化の同時実行

構造化された同時実行は、推論がはるかに簡単です。

コンポジションと制御フロー パターン

このセクションでは、複数の StateMachine を作成する方法の例と、特定の制御フローの問題に対する解決策について説明します。

シーケンシャル ステータス

これは最も一般的でわかりやすい制御フロー パターンです。例については、SkyKeyComputeState 内のステートフル計算をご覧ください。

ブランチ

StateMachine の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

一部のブランチでは、早期に完了するために DONE を返すのが一般的です。

高度な連続コンポーズ

StateMachine 制御構造はメモリレスであるため、StateMachine 定義をサブタスクとして共有するのは難しい場合があります。M1M2 を、StateMachine インスタンスとし、StateMachine S を共有します。M1M2 は、それぞれシーケンス <A, S, B><X, S, Y> です。問題は、S が完了後に B に進むか Y に進むかを認識しておらず、StateMachine が呼び出しスタックを保持していないことです。このセクションでは、これを実現するための手法について説明します。

StateMachine を終端シーケンス要素として使用

これでは、最初に提示された問題は解決しません。共有 StateMachine がシーケンスの終端にある場合にのみ、シーケンシャル コンポジションを示します。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑な状態マシンであっても機能します。

連続コンポジションのサブタスク

キューに登録されたサブタスクは、次の状態になる前に必ず完了するため、サブタスク メカニズムを少し悪用する6こともできます。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp 呼び出しがあるため、Tasks.enqueue を悪用できない場合があります。この場合、runAfter パラメータを S に挿入して、次に何を行うかを S に通知できます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを不正使用するよりもクリーンです。ただし、複数の StateMachinerunAfter でネストするなど、これを過度に適用すると、コールバック ヘルルに陥る可能性があります。代わりに、連続した runAfter を通常の連続状態で分割することをおすすめします。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

は次のように置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、エラーが発生した時点で早期に中止する runAfterUnlessError を検討していました。これは、エラーが runAfter 参照を持つ StateMachine によって 1 回、runAfter マシン自体によって 1 回、合計 2 回チェックされることがあるためです。

慎重に検討した結果、エラーチェックの重複除去よりもコードの統一性が重要であると判断しました。runAfter メカニズムが tasks.enqueue メカニズムと整合して動作しない場合、エラー チェックが常に必要になります。

直接委任

正式な状態遷移が発生するたびに、メインの Driver ループが進みます。契約に基づき、状態を進めるということは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine のロジックが原因で、フェーズを進める必要がなくなり、逆効果になることがあります。たとえば、委任先の最初の step が、委任元の状態の検索と並列化できる SkyKey 検索を実行する場合、フェーズ アドバンスにより、検索が順序付けられます。次の例に示すように、直接委任を行う方が適切な場合があります。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

データフロー

これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

以下は、SkyValue ルックアップTasks.lookUp コールバックを実装する例です。このセクションでは、複数の SkyValue を処理する理由とアプローチについて説明します。

Tasks.lookUp コールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的なアプローチは、Java ラムダを使用してこれを実装することです。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

ここで、myValue はルックアップを行う StateMachine インスタンスのメンバー変数です。ただし、ラムダでは、StateMachine 実装で Consumer<SkyValue> インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要になります。ラムダは、あいまいな複数のルックアップがある場合に便利です。

SkyFunction.Environment.getValueOrThrow と同様に、Tasks.lookUp のエラー処理のオーバーロードもあります。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

実装例を以下に示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

エラー処理なしのルックアップと同様に、StateMachine クラスでコールバックを直接実装することで、ラムバのメモリ割り当てを節約できます。

エラー処理ではもう少し詳しく説明しますが、基本的には、エラーの伝播と通常の値に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要になります。SkyValue のタイプに切り替えるのが、ほとんどの場合で有効なアプローチです。次の例は、プロトタイプの本番環境コードから簡素化したものです。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

値の型が異なるため、Consumer<SkyValue> コールバックの実装は明確に共有できます。そうでない場合は、ラムダベースの実装または適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックできます。

StateMachine 間での値の伝播

このドキュメントでは、サブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値を報告する必要もあります。サブタスクは論理的に非同期であるため、結果はコールバックを使用して呼び出し元に返されます。これを実現するため、サブタスクでは、コンストラクタを介して挿入されるシンク インターフェースを定義します。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

この場合、呼び出し元の StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例では、いくつかの点を示しています。Caller は結果を伝播し、独自の Caller.ResultSink を定義する必要があります。CallerBarProducer.ResultSink コールバックを実装します。再開時に、processResultvalue が null かどうかを確認し、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップの出力を受け入れた後の一般的な動作パターンです。

acceptBarError の実装では、エラーの再帰で必要とされるように、結果が Caller.ResultSink にエアリーフされます。

トップレベルの StateMachine の代替方法については、Driver と SkyFunctions へのブリッジをご覧ください。

エラー処理

Tasks.lookUp コールバックStateMachines 間での値のプロパゲーションに、エラー処理の例がいくつかあります。InterruptedException 以外の例外はスローされず、代わりに値としてコールバックを介して渡されます。このようなコールバックには、値またはエラーのいずれか 1 つが渡される排他的論理和のセマンティクスがよくあります。

次のセクションでは、Skyframe のエラー処理との微妙ながらも重要な操作について説明します。

エラーの再帰(--nokeep_going)

エラーの再帰処理中、リクエストされたすべての SkyValue が使用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine は引き続き例外を伝播する必要があります。

伝播は次の状態に到達したかどうかに関係なく行われる必要があるため、エラー処理コールバックはこのタスクを実行する必要があります。内部 StateMachine の場合、これは親コールバックを呼び出すことで実現されます。

SkyFunction とインターフェースするトップレベルの StateMachine では、ValueOrExceptionProducersetException メソッドを呼び出すことでこれを行えます。その後、SkyValues が欠落している場合でも、ValueOrExceptionProducer.tryProduceValue は例外をスローします。

Driver が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。

イベント処理

イベントを出力する必要がある SkyFunctions の場合、StoredEventHandler が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine に挿入されます。以前は、Skyframe が特定のイベントを再生しない限りドロップするため、StoredEventHandler が必要でしたが、これはその後修正されました。StoredEventHandler 注入は保持されます。これは、エラー処理コールバックから出力されるイベントの実装を簡素化するためです。

Driver と SkyFunctions へのブリッジ

Driver は、指定されたルート StateMachine から始まる StateMachine の実行を管理します。StateMachine はサブタスク StateMachine を再帰的にキューに追加できるため、1 つの Driver で複数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果としてツリー構造を作成します。Driver は、サブタスク間で SkyValue ルックアップを一括処理して効率を高めます。

次の API を使用して、Driver を中心にいくつかのクラスが構築されています。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。Driver.drive を呼び出すと、Skyframe を再起動せずに StateMachine が実行されます。StateMachine が完了すると true を返し、それ以外の場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。

DriverStateMachine の同時実行状態を維持し、SkyKeyComputeState への埋め込みに適しています。

Driver を直接インスタンス化する

StateMachine の実装では、通常、コールバックを介して結果が通知されます。次の例に示すように、Driver を直接インスタンス化できます。

Driver は、対応する ResultSink の実装とともに SkyKeyComputeState 実装に埋め込まれます。この実装は、後ほど定義します。トップレベルでは、State オブジェクトは Driver よりも長く存続することが保証されているため、計算結果のレシーバとして適しています。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードでは、ResultProducer をスケッチしています。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

結果を遅延計算するコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

Driver の埋め込み

StateMachine が値を生成し、例外を発生させない場合は、次の例に示すように、Driver を埋め込むこともできます。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(ここで、StateSkyKeyComputeState の関数固有の型です)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

DriverStateMachine 実装に埋め込むと、Skyframe の同期コーディング スタイルに適しています。

例外を生成する可能性がある StateMachine

それ以外の場合は、SkyKeyComputeState に埋め込める ValueOrExceptionProducer クラスと ValueOrException2Producer クラスがあり、同期 SkyFunction コードに対応する同期 API があります。

ValueOrExceptionProducer 抽象クラスには次のメソッドが含まれています。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

埋め込まれた Driver インスタンスが含まれており、埋め込みドライバResultProducer クラスによく似ており、SkyFunction と同様の方法でインターフェースします。実装では、ResultSink を定義する代わりに、どちらかが発生したときに setValue または setException を呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue メソッドは、非同期コールバック コードを同期コードにブリッジし、例外が設定されている場合は例外をスローします。

前述のように、エラーの発生時に、すべての入力が使用できないため、マシンがまだ完了していない場合でもエラーが発生する可能性があります。これに対応するため、tryProduceValue は、マシンが完了する前でも、設定された例外をスローします。

エピローグ: コールバックの削除

StateMachine は、非同期計算を実行するための非常に効率的な方法ですが、ボイラープレートが大量に必要になります。継続(特に ListenableFuture に渡される Runnable の形式)は Bazel コードの特定の部分で広く使用されていますが、分析 SkyFunctions では一般的ではありません。分析はほとんど CPU バウンドであり、ディスク I/O に効率的な非同期 API はありません。コールバックは習得に時間がかかり、可読性を損なうため、最終的には最適化して削除することをおすすめします。

最も有望な代替手段の 1 つは、Java 仮想スレッドです。コールバックを記述する代わりに、すべてが同期ブロッキング呼び出しに置き換えられます。これは、仮想スレッド リソースはプラットフォーム スレッドとは異なり、コストが低いと想定されているためです。ただし、仮想スレッドを使用しても、単純な同期オペレーションをスレッド作成と同期プリミティブに置き換えると、コストが高すぎます。StateMachine から Java 仮想スレッドに移行しましたが、速度が数桁遅くなり、エンドツーエンドの分析レイテンシがほぼ 3 倍になりました。仮想スレッドはまだプレビュー版の機能であるため、パフォーマンスが向上した後でこの移行を実行することもできます。

検討すべきもう 1 つの方法は、Loom コルーチンが利用可能になったら待機することです。この方法の利点は、協調的マルチタスクを使用して同期のオーバーヘッドを削減できることです。

それでも問題が解決しない場合は、低レベルのバイトコードの書き換えも有効な方法です。十分に最適化すると、手書きのコールバック コードに近いパフォーマンスを実現できる場合があります。

付録

コールバック地獄

コールバック ヘルは、コールバックを使用する非同期コードでよく見られる問題です。これは、後続のステップの継続が前のステップ内にネストされているという事実に起因しています。ステップが多い場合、このネストはかなり深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の一つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に最終的なものであるため、そのような変数を使用するのが面倒になる場合があります。次のように、ラムダではなく継続としてメソッド参照を返すことで、深いネストを回避できます。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクション パターンが高密度に使用された場合にもコールバック HEll が発生することがありますが、これは、連続するステップを注入することで回避できます。

例: 連鎖した SkyValue ルックアップ

アプリケーション ロジックで、2 番目の SkyKey が最初の SkyValue に依存している場合など、SkyValue ルックアップの依存チェーンが必要なことがよくあります。単純に考えると、複雑でネストされたコールバック構造が作成されます。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されるため、コードは状態遷移全体で手続き型に見えます。step2step1 に続くものです。ここでは、ラムダを使用して value2 を割り当てます。これにより、コードの順序が上から下の計算の順序と一致するようになります。

その他のヒント

読みやすさ: 実行順序

可読性を高めるには、StateMachine.step の実装を実行順に保ち、コールバックの実装はコード内で渡される場所の直後に配置します。制御フローが分岐する場合は、これが常に可能であるとは限りません。このような場合は、追加のコメントが役立つ場合があります。

例: 連鎖の SkyValue ルックアップでは、これを実現するために中間メソッド参照を作成しています。これにより、パフォーマンスがわずかに低下しますが、読みやすさが向上します。

世代仮説

中程度の存続期間の Java オブジェクトは、非常に短い存続期間のオブジェクトまたは永続的に存続するオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState 内のオブジェクトはこの仮説に違反します。このようなオブジェクトには、Driver をルートとする、まだ実行中のすべての StateMachine の作成済みツリーが含まれます。このオブジェクトは、非同期計算が完了するのを待機しながら一時停止するため、中間の存続期間があります。

JDK19 では問題が軽減されているようですが、StateMachine を使用すると、生成される実際のガベージが大幅に減少しても、GC 時間が長くなることがあります。StateMachine は中程度の存続期間があるため、オールド ジェネレーションに昇格し、オールド ジェネレーションがより速くいっぱいになる可能性があります。そのため、クリーンアップにメジャー GC またはフル GC が必要になります。

最初の注意事項は、StateMachine 変数の使用を最小限に抑えることですが、複数の状態にわたって値が必要な場合など、必ずしも実現可能ではありません。可能であれば、ローカルスタック step 変数は若い世代の変数であり、効率的に GC されます。

StateMachine 変数の場合は、サブタスクに分割し、StateMachine 間で値を伝播する推奨パターンに従うことも役立ちます。このパターンに従うと、子 StateMachine のみが親 StateMachine を参照し、その逆は参照しません。つまり、子が完了して結果コールバックを使用して親を更新すると、子は自然にスコープから外れ、GC の対象になります。

最後に、StateMachine 変数は初期状態では必要ですが、後半の状態では必要ない場合もあります。大規模なオブジェクトが不要になったことが判明したら、その参照を null にすると便利です。

状態の命名

メソッドに名前を付ける場合、通常は、そのメソッド内で行われる動作に合わせてメソッドに名前を付けることができます。スタックが存在しないため、StateMachine で行う方法はわかりにくい。たとえば、メソッド foo がサブメソッド bar を呼び出すとします。StateMachine では、これは状態シーケンス foo に翻訳され、その後に bar が続きます。foo に動作 bar が含まれなくなりました。その結果、状態のメソッド名は範囲が狭くなる傾向があり、ローカルの動作を反映している可能性もあります。

同時実行のツリー図

以下は、ツリー構造をより良く表している構造化された同時実行の図の別のビューです。ブロックが小さな木を形成しています。

構造化された同時実行 3D


  1. 値が使用できない場合に最初から再開するという Skyframe の規則とは対照的です。 

  2. stepInterruptedException をスローできますが、この例では省略されています。Bazel コードには、この例外をスローし、StateMachine を実行する Driver(後述)に伝播するローメソッドがいくつかあります。不要なときにスローされるように宣言しなくても問題ありません。

  3. 同時実行サブタスクは、依存関係ごとに独立した処理を行う ConfiguredTargetFunction を基に作成されました。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性が発生するのではなく、各依存関係に独自の独立した StateMachine があります。 

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しは、まとめてバッチ処理されます。追加のバッチ処理は、同時実行サブタスク内で発生するルックアップによって作成できます。 

  5. これは、Java の構造化コンカレンシーの jeps/428 に概念的に似ています。 

  6. これは、スレッドを生成して結合し、順序付きコンポジションを実現する方法に似ています。