Skyframe StateMachines のガイド

問題を報告する ソースを表示 夜間 · 7.3 · 7.2 · 7.1 · 7.0 · 6.5

概要

Skyframe StateMachine は、以下に存在する分解された関数オブジェクトです。 表示されます。冗長性のない柔軟な評価に対応1 必要な値はすぐには使用できませんが、非同期で計算されます。「 StateMachine は待機中にスレッド リソースを結合できませんが、解放する必要があります。 一時停止および再開できますしたがって、分解により明示的な再入口が 事前の計算をスキップできるようにする必要があります。

StateMachine は、シーケンス、分岐、構造化論理を表現するために使用できます。 SkyFrame での操作に合わせて調整されています。 StateMachine をより大規模な StateMachine に構成して共有できます。 サブ StateMachine。同時実行は常に構成によって階層化され、 説明しました。すべての同時実行サブタスクは単一の共有親で実行される SkyFunction スレッド。

はじめに

このセクションでは、StateMachine java.com.google.devtools.build.skyframe.state パッケージ化されています。

Skyframe の再起動の概要

Skyframe は、依存関係グラフを並行して評価するフレームワークです。 グラフ内の各ノードは、SkyFunction の評価に対応しています。 パラメータを指定する SkyKey と結果を指定する SkyValue です。「 計算モデルでは、SkyFunction が SkyKey で SkyValues を検索する、 追加の SkyFunction の再帰的な並列評価をトリガーします。以前の ブロッキング。これは、要求された SkyValue がまだリクエストされていない場合に、スレッドを結合します。 まだ完了していません。リクエストしている計算の一部のサブグラフが不完全で、 SkyFunction が null getValue レスポンスを監視し、null を返す必要がある SkyValue ではなく、入力が欠けているために不完全であることを知らせます。 以前にリクエストされたすべての SkyValues がある場合、Skyframe が SkyFunctions を再起動します。 利用できるようになります

SkyKeyComputeState を導入する前、従来の処理方法 計算を完全に再実行することでした。これは二次関数ですが、 この方法で記述された関数は最終的に完成します。これは、 これが少ないと、null が返されます。SkyKeyComputeState を使用すると、以下のことが可能になります。 手動で指定したチェックポイント データを SkyFunction に関連付けて、 行われます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトで、 SkyFunction が再起動すると、ほぼすべての再計算が行われます( SkyKeyComputeState がキャッシュから抜けないようにする) 使用できます。

SkyKeyComputeState 内のステートフル計算

オブジェクト指向の設計の観点からは、ストレージクラスを 計算オブジェクトを SkyKeyComputeState 内部に置く必要がありました。 Java では、オブジェクトを保持する動作の最小限の記述として、 機能インターフェースがあり、これで十分であることがわかりました。StateMachine は 次の不思議な再帰の定義2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment に似ていますが、 非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であるため、 一連のステップを帰納的に学習させるものです次の場合、stepDONE を返します。 StateMachine が完了しました。例:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

次の出力で StateMachine を記述します。

hello
world

なお、次の理由により、メソッド参照 this::step2StateMachine です。 StateMachine の関数インターフェース定義を満たす step2。方法 参照は、特定のコンポーネントで次の状態を指定する最も一般的な StateMachine

一時停止と再開

直感的には、計算を StateMachine ステップに分割し、 モノリシック関数であり、インスタンスの一時停止再開に必要なフックを提供します。 説明します。StateMachine.step が返されると、明示的な一時停止になります。 あります返される StateMachine 値で指定された継続は、 明示的な再開ポイント。したがって、再計算は回避できます。つまり、 中断したところから 再開できます

コールバック、継続、非同期計算

専門用語では、StateMachine は継続として機能し、 後続の計算が実行されます。StateMachine は、ブロックする代わりに次のことができます。 step 関数から戻り、自発的に一時停止し、 Driver インスタンスに戻すことができます。Driver は、 その後、使用可能な StateMachine に切り替えるか、制御を Skyframe に戻します。

従来は「コールバック」と「継続」を 1 つのコンセプトにまとめています。 ただし、StateMachine は両者の区別を維持します。

  • Callback - 非同期処理の結果を格納する場所を記述します。 説明します。
  • 継続 - 次の実行状態を指定します。

非同期オペレーションを呼び出す場合はコールバックが必要です。つまり、 実際のオペレーションは、 SkyValue ルックアップですコールバックはできる限りシンプルにする必要があります。

継続は、StateMachineStateMachine 戻り値で、 すべての非同期処理に続く複雑な実行を 計算が解決します。この構造化されたアプローチにより、インフラストラクチャの複雑さを コールバックを管理しやすくなります。

タスク

Tasks インターフェースは、SkyValues を検索する API を StateMachine に提供します。 同時実行サブタスクのスケジュールを設定できます。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue ルックアップ

StateMachine は、Tasks.lookUp オーバーロードを使用して SkyValues を検索します。内容は次のとおりです。 SkyFunction.Environment.getValue に似ています。また、 SkyFunction.Environment.getValueOrThrow であり、同様の例外処理があります。 学びました。実装はすぐに検索を行うのではなく、 その前に、できるだけ多くの検索をバッチ処理4します。この値 スカイフレームの再起動が必要な場合や、 そのため、呼び出し元は、コールバックを使用して、生成された値の処理方法を指定します。

StateMachine プロセッサ(Driver と SkyFrame など)では、値の使用を 次の状態が開始されます。以下の例を

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() を検索し、 コンシューマとして thisDoesLookup が実装されているため、これが可能です。 Consumer<SkyValue>

契約により、次のステータス DoesLookup.processValue が開始される前に、すべての DoesLookup.step のルックアップが完了しました。したがって、value は次の場合に利用できます。 processValue でアクセスします。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。 サブタスクも StateMachine で、通常の StateMachine であれば何でもできます サブタスクを再帰的に作成したり SkyValues を検索したりすることもできます。 lookUp と同様に、ステートマシン ドライバは、すべてのサブタスクが 完了してから次のステップに進みます。以下の例を

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時ですが、すべてが 「同時」で実行されるため、i のアップデートは何も必要ありません 同期できます。

構造化された同時実行

すべての lookUpenqueue を解決してから次のレベルに進む必要があるためです。 つまり、同時実行は当然ながらツリー構造に限定されます。です。 次に示すように、階層型5 同時実行を作成できます。 例です。

構造化同時実行

UML では、同時実行構造がツリーを形成していることを区別するのは困難です。 より適切に表示される別のビューがある 学びます。

非構造化同時実行

構造化された同時実行は、簡単に推測できます。

構成と制御フローのパターン

このセクションでは、複数の StateMachine の構成例を示します。 特定の制御フローの問題を解決できます

シーケンシャル ステータス

これは最も一般的でわかりやすい制御フロー パターンです。たとえば これについては、Google Cloud 内の SkyKeyComputeState

分岐

StateMachine の分岐状態は、 値を渡すには、通常の Java 制御フローを使用します。次の例をご覧ください。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

一部のブランチでは、早期完了のために DONE を返すのが一般的です。

高度な順次合成

StateMachine 制御構造はメモリレスであるため、StateMachine を共有する サブタスクとして定義するのは面倒な場合があります。M1M2StateMachineインスタンスのStateMachineSM1 および M2 は、<A, S, B> および <X, S, Y> になります。問題は、S はトレーニング中にデータを 完了後に B または Y に進み、StateMachine で あります。このセクションでは、これを実現するためのいくつかの手法について説明します。

終端シーケンス要素としての StateMachine

これでは最初に提起された問題を解決できません。シーケンシャルなもののみを示しています 共有 StateMachine がシーケンスの終端である場合のコンポジション。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑なステートマシンであっても機能します。

シーケンシャル コンポジションのサブタスク

キューに登録されたサブタスクは次の状態の前に完了することが保証されているため、 サブタスク メカニズムをわずかに乱用する可能性もあります6

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

場合によっては、Tasks.enqueue を不正使用することが不可能なこともあります。これは、 S の前に完了する必要がある並列サブタスクまたは Tasks.lookUp 呼び出し 実行されます。この場合、SrunAfter パラメータを注入することで、 S に次にすべきことを伝えます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを乱用するよりもクリーンです。しかしこれも適用すれば たとえば、複数の StateMachinerunAfter でネストすると、次のようになります。 Callback Hell への道のりです。シーケンシャルな表現は分割し、 代わりに、通常の連続した状態を持つ runAfter を作成します。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

これを次のように置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、中止する runAfterUnlessError を検討していました。 エラーを早期に発見できますこれは、エラーが発生することが多いというのが理由です。 2 回チェックされます。1 回目は runAfter 参照を持つ StateMachine によるチェックと、 runAfter マシン自体によって 1 回だけ実行されます。

検討を重ねた結果、コードの均一性を優先して エラーチェックの重複除去よりも重要な意味を持ちます。もし、 runAfter メカニズムは、 tasks.enqueue メカニズム。常にエラーチェックが必要です。

直接委任

正式な状態遷移が発生するたびに、メイン Driver ループが進行します。 契約に従い、ステータスが進むと、以前にキューに追加されたすべての SkyValue ルックアップとサブタスクは、次の状態が実行される前に解決されます。場合によってはロジックが デリゲート StateMachine を使用すると、フェーズを進める必要がなくなり、 逆効果です。たとえば、委任の最初の step が次の処理を実行すると、 委任状態のルックアップと並列化できる SkyKey ルックアップ フェーズを進めると、順番になります。新しい P-MAX キャンペーンを 直接委任を実行します。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

データフロー

ここまでの説明では、制御フローの管理に焦点を当ててきました。この セクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

Tasks.lookUp コールバックを SkyValue に実装する例を次に示します。 ルックアップ。このセクションでは、その理由と推奨事項を示します。 複数の SkyValues を処理できます。

Tasks.lookUp コールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的なアプローチは、Java ラムダを使用してこれを実装することです。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

myValueStateMachine インスタンスのメンバー変数で、 あります。ただし、ラムダを使用するには、 StateMachineConsumer<SkyValue> インターフェースを実装する 説明します。ラムダは、複数のルックアップを実行する場合にも 曖昧になります

次のように、Tasks.lookUp のオーバーロードのエラー処理もあります。 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

実装例を以下に示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

エラー処理なしのルックアップと同様に、StateMachine クラスを直接使用する コールバックを実装することで、ラムバのメモリ割り当てを節約できます。

エラー処理ではもう少し詳しく説明しますが エラーの伝播と通常の値に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要になります。この手法は、インフラストラクチャの SkyValue のタイプをオンにします。次の例は、 プロトタイプの本番環境コードから簡略化されています。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

Consumer<SkyValue> コールバックの実装を明確に共有できる 値の型が異なるためです。そうでない場合は ラムダベースの実装、または 適切にコールバックできるかどうかを判断します。

StateMachine 間の値の伝播

ここまで、このドキュメントではサブタスクでの作業の配置方法について説明しましたが、 サブタスクも呼び出し元に値を報告する必要があります。サブタスクは 論理的に非同期で、結果が呼び出し元に返されるように、 コールバック。これを機能させるために、サブタスクでシンク インターフェースを定義します。 挿入されます。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

この場合、呼び出し元の StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例では、いくつかの点を示しています。Caller が伝播する必要がある 独自の Caller.ResultSink を定義します。Caller は、 BarProducer.ResultSink コールバック。再開時に、processResult は次のことを確認します。 value は null であり、エラーが発生したかどうかを判断します。これは一般的な動作です。 サブタスクまたは SkyValue ルックアップからの出力を受け入れた後のパターンを返します。

acceptBarError を実装すると、結果が積極的に Caller.ResultSinkエラーバブリングで必須)

トップレベルの StateMachine の代替手段については、Driver と SkyFunctions へのブリッジを行います。

エラー処理

エラー処理の例は、Tasks.lookUp にすでにいくつかあります。 と、イベント間の値の伝播StateMachines。例外 InterruptedException はスローされず、そのまま渡されます。 値として使用されます。多くの場合、このようなコールバックには排他的またはセマンティクスがあり、 値またはエラーのいずれか一方だけを 渡さなければなりません

次のセクションでは、Skyframe との微妙ながらも重要な Skyframe の操作について説明します。 エラー処理を行います

エラーバブリング(--nokeep_going)

エラーバブリング中に、リクエストされていないものであっても SkyFunction が再起動することがある SkyValues が利用可能です。この場合、後続の状態は Tasks API 契約に基づき、この制限数に達しました。ただし、StateMachine は 引き続き例外を伝播します。

伝播は、次の状態に到達したかどうかに関係なく行われる必要があるため、 このタスクを実行する必要があります。内部 StateMachine の場合、 これを行うには親コールバックを呼び出します。

SkyFunction とやり取りする最上位の StateMachine では、次のことができます。 これを行うには、ValueOrExceptionProducersetException メソッドを呼び出します。 その後、ValueOrExceptionProducer.tryProduceValue は例外をスローします。 SkyValues が欠落している場合。

Driver を直接使用している場合は、 マシンが終了していなくても、SkyFunction からエラーを伝播 あります。

イベント処理

イベントを出力する必要がある SkyFunctions では、StoredEventHandler が挿入されます。 SkyKeyComputeState に読み込まれ、さらに必要な StateMachine に できます。これまでは、スカイフレームのドロップにより StoredEventHandler が必要でした。 これは後で修正されましたが、 StoredEventHandler インジェクションは、 エラー処理コールバックから出力されるイベントの実装。

Driver と SkyFunctions へのブリッジ

Driver は、StateMachine の実行を管理します。 指定されたルート StateMachine で始まる。StateMachine さんが可能な範囲 サブタスク StateMachine を再帰的にキューに追加する(単一の Driver で管理可能) 多数のサブタスクを実行できますこれらのサブタスクはツリー構造を作成し、 構造化された同時実行Driver は SkyValue をバッチ処理します。 サブタスク間でルックアップを行い、効率を向上させます。

次の API を使用して、Driver を中心にいくつかのクラスが構築されています。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。発信中 Driver.drive は、StateMachine を スカイフレームの再起動。StateMachine が完了すると true、false が返されます。 すべての値が利用できないことを示します。

DriverStateMachine の同時状態を維持しており、状態は良好です。 SkyKeyComputeState への埋め込みに適しています。

Driver の直接インスタンス化

StateMachine の実装では、通常、以下を介して結果を伝えます。 使用します。以下に示すように、Driver を直接インスタンス化できます。 見てみましょう。

Driver は、SkyKeyComputeState の実装に埋め込まれます。 対応する ResultSink の実装(もう少し詳しく定義) できます。トップレベルでは、State オブジェクトは Driver よりも長く存続することが保証されているため、この計算の結果よりも優れています。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードでは、ResultProducer をスケッチしています。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

この場合、結果を遅延計算するコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

Driver の埋め込み

StateMachine が値を生成し、例外を生成しない場合、埋め込みは 次の例に示すように、Driver も可能な実装です。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(State は 関数固有の型の SkyKeyComputeState)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

DriverStateMachine 実装に埋め込むと、より適している SkyFrame の同期コーディング スタイル。

例外を生成する可能性がある StateMachine

それ以外の場合は、SkyKeyComputeState 埋め込み可能な ValueOrExceptionProducer があります。 同期 API を持つ ValueOrException2Producer クラスは、 同期 SkyFunction コード。

ValueOrExceptionProducer 抽象クラスには、次のメソッドが含まれます。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

これには埋め込みの Driver インスタンスが含まれており、 エンベディング ドライバとインターフェースの ResultProducer クラス 同様の方法で SkyFunction で作成できます。ResultSink を定義する代わりに、 実装は、setValue または setException のいずれかが発生したときに呼び出します。 両方が発生した場合は、例外が優先されます。tryProduceValue メソッド 非同期コールバック コードを同期コードにブリッジし、 設定時は例外です。

前述のように、エラーバブリング中に、 すべての入力を利用できるわけではないため、機械がまだ完了していなくても宛先 これに対応するため、tryProduceValue は、 完了です。

エピローグ: 最終的にコールバックを削除

StateMachine は、非常に効率的ですが、ボイラープレートを多用する実行方法です。 非同期計算です継続(特に Runnable の形式) ListenableFuture に渡されるもの)が Bazel コードの特定の部分で広く使用されています。 SkyFunctions の分析ではあまり使われていません。分析のほとんどは CPU の制約を受けており、 ディスク I/O 用の効率的な非同期 API はありません。最終的には、 習得に時間がかかり、学習を妨げるため、コールバックの最適化に最適 できます。

最も有望な選択肢の一つは、Java 仮想スレッドです。以前の すべての処理が同期ブロック ブロックに置き換えられます。 できます。これが可能になるのは、 安価に利用できるはずですただし 仮想スレッドを使用する場合でも 単純な同期オペレーションをスレッドの作成と同期に置き換える コストが高すぎますStateMachine から Java 仮想スレッドであり、桁違いに低速であるため、 エンドツーエンドの分析レイテンシが 3 倍近く増加しました仮想スレッドは まだプレビュー機能であるため、この移行は 後でパフォーマンスが改善します。

もう 1 つの検討すべきアプローチは、Loom コルーチンがある場合にそれを待つことです。 利用できるようになりますこの方法の利点は、リソースの マルチタスクによる同期のオーバーヘッドを低減できます。

他のどれでもうまくいかない場合は、低レベルのバイトコードの書き換えも有効です。 あります。十分に最適化すれば、次の目標を達成できる可能性があります。 優れたパフォーマンスを実現できるからです

付録

コールバック地獄

コールバックの地獄は、コールバックを使用する非同期コード内の悪名高い問題です。 これは、後続のステップの継続がネストされていることに起因します。 確認してください。ステップ数が多い場合、このネストは 深く理解できます。制御フローと組み合わせると、コードを管理できなくなります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の一つは、 保持できます。Java では、キャプチャされたラムダ変数は 変数を使用するのは面倒な作業になりかねません。深いネストは、 メソッド参照をラムダではなく継続として返すことで回避できます。 できます。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクションの実行時にもコールバック HEll が発生することがある 使用されるのが高密度ですが、これは、注入を散在させることで回避できます。 示しています

例: 連鎖されている SkyValue ルックアップ

多くの場合、アプリケーション ロジックには、アプリケーション リソースの依存チェーン SkyValue ルックアップ。たとえば、2 つ目の SkyKey が 1 つ目の SkyValue に依存している場合など。 単純なことを考えてみると、複雑で深くネストされた 呼び出すことができます。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されるため、コードは 状態遷移全体のプロシージャル: step2step1 に従います。ここでは、 ラムダを使用して value2 を割り当てます。これにより、コードの順序が 計算の順序付けを行います。

その他のヒント

読みやすさ: 実行順序

読みやすくするために、StateMachine.step の実装を維持するよう努めます。 コールバック実装がその直後にある 渡されます。制御フローがすべてのレイヤで 作成します。そのような場合は、追加のコメントがあると役に立つかもしれません。

例: 連鎖の SkyValue ルックアップに、 これを実現するために、中間メソッド リファレンスが作成されます。これにより、少額の パフォーマンスも向上します これはおそらく 価値があるでしょう

世代別仮説

存続期間が中程度の Java オブジェクトは Java の世代仮説を破る ガベージ コレクタという 2 つの方法は、 オブジェクト。定義上、プロジェクト内のオブジェクトは SkyKeyComputeState はこの仮説に違反しています。このようなオブジェクトには、 Driver をルートとする、実行中のすべての StateMachine の構築されたツリーが、 一時停止して非同期計算を待機している間の存続期間 完了するまでに時間がかかります

JDK19 ではそれほど問題ないように思えるが、StateMachine を使用すると 大幅な減少があっても、GC 時間が 生成される実際のガベージの割合ですStateMachine の有効期間は中間であるため 旧世代に昇格できるため、すぐに満員になる クリーンアップするにはより高価なメジャー GC またはフル GC が必要です

最初の予防措置は StateMachine 変数の使用を最小限にすることですが、 それが可能なとは限りません。たとえば、1 つの値が複数の 構成されます。可能な場合、ローカル スタックの step 変数は若い世代です。 実行され、効率的に GC されます。

StateMachine 変数の場合は、サブタスクに分割して、 キャンペーン間での値を StateMachine も役立ちます。次のことを確認します。 パターンに従うと、子 StateMachine のみが親への参照を持ちます。 StateMachine です。その逆はできません。つまり 子が学んでいて 結果コールバックを使用して親を更新すると、子は自然に GC の対象になります。

最後に、以前の状態で StateMachine 変数が必要になる場合があります。 後で行うことはできません。大規模なデータセットの参照を null にすると、 そのオブジェクトが不要であることがわかっていると、

状態に名前を付ける

メソッドに名前を付ける場合、通常は、その動作に対応するメソッドに名前を付けることができます。 そのメソッド内で発生しますやり方がわかりにくいので、 StateMachine。スタックが存在しないため。たとえば、メソッド foo があるとします。 サブメソッド bar を呼び出す。StateMachine では、これは次のように変換できます。 状態シーケンス foo、その後に barfoo に次の動作が含まれなくなりました bar。その結果、状態のメソッド名はスコープが狭くなりがちですが、 地域的な行動を反映している可能性があります

同時実行ツリーの図

以下は [Structured 同時実行を使用して、ツリー構造をより的確に表しています。 ブロックは小さな木を形成します。

構造化同時実行 3D


  1. SkyFrame の規則では最初からやり直しますが、 値は使用できません。 

  2. stepInterruptedException をスローできますが、 この例では省略されています。Bazel コードには、スローするメソッドがいくつかあります。 この例外は、後述の Driver まで伝播します。 StateMachine を実行する関数です。次の場合にスローすることを宣言しなくても問題ありません。 不要です

  3. 同時実行サブタスクは、ConfiguredTargetFunction によって 依存関係ごとに独立した作業を実行します。データを操作する代わりに、 すべての依存関係を一度に処理する複雑なデータ構造 非効率をもたらすため、各依存関係には独自の独立した依存関係が StateMachine

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しは、まとめてバッチ処理されます。 同時実行内で行われるルックアップによって、追加のバッチ処理を作成できます。 サブタスクを管理できます。 

  5. これは概念的には、Java の構造化された同時実行と jeps/428。 

  6. これは、スレッドを生成して参加させることに 連続的な合成です。