概要
Skyframe StateMachine
は、ヒープにある分解された関数オブジェクトです。必要な値がすぐに利用できず、非同期で計算される場合、冗長性のない柔軟な評価をサポートします1。StateMachine
は待機中にスレッド リソースを結合できませんが、代わりに一時停止して再開する必要があります。したがって、逆コンストラクションにより明示的な再入ポイントが公開され、以前の計算をスキップできます。
StateMachine
は、シーケンス、分岐、構造化論理同時実行を表現するために使用でき、Skyframe のインタラクションに合わせて調整されています。StateMachine
は、より大きな StateMachine
にコンポーズし、サブ StateMachine
を共有できます。コンカレンスは、常に構造的に階層的であり、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。
はじめに
このセクションでは、java.com.google.devtools.build.skyframe.state
パッケージにある StateMachine
について簡単に説明します。
Skyframe の再起動の概要
Skyframe は、依存関係グラフを並列評価するフレームワークです。グラフの各ノードは、パラメータを指定する SkyKey と結果を指定する SkyValue を持つ SkyFunction の評価に対応しています。計算モデルは、SkyFunction が SkyKey で SkyValue を検索し、追加の SkyFunction の再帰的な並列評価をトリガーするように設計されています。計算の一部のサブグラフが不完全であるために要求された SkyValue の準備ができていない場合、ブロックではなく、要求された SkyFunction が null
getValue
レスポンスを監視し、SkyValue の代わりに null
を返し、入力がないために不完全であることを知らせます。以前にリクエストされたすべての SkyValues が利用可能になると、Skyframe は SkyFunctions を再起動します。
SkyKeyComputeState
の導入前は、再起動を処理する従来の方法は、計算を完全に再実行することでした。これは複雑さは二次関数ですが、この方法で記述された関数は、再実行するたびに、null
を返すルックアップが減るため、最終的には完了します。SkyKeyComputeState
を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることができるため、再計算を大幅に削減できます。
StateMachine
は SkyKeyComputeState
内に存在するオブジェクトであり、停止と再開の実行フックを公開することで、SkyFunction の再起動時にほぼすべての再計算を排除します(SkyKeyComputeState
がキャッシュから外れないことを前提としています)。
SkyKeyComputeState
内のステートフル計算
オブジェクト指向設計の観点からは、純粋なデータ値ではなく、計算オブジェクトを SkyKeyComputeState
内に保存することを検討するのが理にかなっています。Java では、動作を伝播するオブジェクトの最小限の記述は関数インターフェースであり、これで十分です。StateMachine
には、次の再帰的な定義があります2。
@FunctionalInterface
public interface StateMachine {
StateMachine step(Tasks tasks) throws InterruptedException;
}
Tasks
インターフェースは SkyFunction.Environment
に似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3。
step
の戻り値は別の StateMachine
であり、一連のステップを帰納的に指定できます。StateMachine
が完了すると、step
は DONE
を返します。例:
class HelloWorld implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
System.out.println("hello");
return this::step2; // The next step is HelloWorld.step2.
}
private StateMachine step2(Tasks tasks) {
System.out.println("world");
// DONE is special value defined in the `StateMachine` interface signaling
// that the computation is done.
return DONE;
}
}
は、次の出力で StateMachine
を記述します。
hello
world
step2
が StateMachine
の関数インターフェース定義を満たしているため、メソッド参照 this::step2
も StateMachine
です。メソッド参照は、StateMachine
で次の状態を指定する最も一般的な方法です。
直感的に、計算をモノリシックな関数ではなく StateMachine
ステップに分割すると、計算を停止して再開するために必要なフックが提供されます。StateMachine.step
が返されると、明示的な停止ポイントがあります。返された StateMachine
値で指定された連続性は、明示的な再開ポイントです。計算は中断したところから正確に再開できるため、再計算を回避できます。
コールバック、継続、非同期計算
技術的な用語では、StateMachine
は継続として機能し、実行される後続の計算を決定します。StateMachine
は、ブロックする代わりに、step
関数から戻ることで自発的に停止できます。これにより、制御が Driver
インスタンスに戻ります。Driver
は、準備が整った StateMachine
に切り替えるか、Skyframe に制御を返すことができます。
従来、コールバックと継続は 1 つのコンセプトに統合されていました。ただし、StateMachine
では 2 つの区別が維持されます。
- コールバック - 非同期計算の結果を保存する場所を指定します。
- 継続 - 次の実行状態を指定します。
非同期オペレーションを呼び出す場合はコールバックが必要です。つまり、SkyValue ルックアップの場合と同様に、メソッドを呼び出した直後に実際のオペレーションは行われません。コールバックはできるだけシンプルにする必要があります。
継続は StateMachine
の StateMachine
戻り値で、すべての非同期計算が解決された後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。
タスク
Tasks
インターフェースは、SkyKey で SkyValues を検索し、同時実行サブタスクをスケジュールするための API を StateMachine
に提供します。
interface Tasks {
void enqueue(StateMachine subtask);
void lookUp(SkyKey key, Consumer<SkyValue> sink);
<E extends Exception>
void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);
// lookUp overloads for 2 and 3 exception types exist, but are elided here.
}
StateMachine
SkyValue ルックアップ
StateMachine
は、Tasks.lookUp
オーバーロードを使用して SkyValues を検索します。これらは SkyFunction.Environment.getValue
や SkyFunction.Environment.getValueOrThrow
に類似しており、例外処理のセマンティクスも類似しています。この実装では、ルックアップはすぐに実行されず、可能な限り多くのルックアップをバッチ処理してから実行されます。値がすぐに利用できない場合があります(Skyframe の再起動が必要な場合など)。そのため、呼び出し元はコールバックを使用して、結果の値をどのように処理するかを指定します。4
StateMachine
プロセッサ(Driver
と SkyFrame へのブリッジング)は、次の状態が開始される前に値が使用可能であることを保証します。以下の例を
class DoesLookup implements StateMachine, Consumer<SkyValue> {
private Value value;
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
return this::processValue;
}
// The `lookUp` call in `step` causes this to be called before `processValue`.
@Override // Implementation of Consumer<SkyValue>.
public void accept(SkyValue value) {
this.value = (Value)value;
}
private StateMachine processValue(Tasks tasks) {
System.out.println(value); // Prints the string representation of `value`.
return DONE;
}
}
上記の例では、最初のステップで new Key()
を検索し、コンシューマーとして this
を渡します。これが可能なのは、DoesLookup
が Consumer<SkyValue>
を実装しているためです。
契約により、次の状態 DoesLookup.processValue
が開始される前に、DoesLookup.step
のすべてのルックアップが完了します。したがって、value
は processValue
でアクセスされた場合に使用できます。
サブタスク
Tasks.enqueue
は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine
であり、通常の StateMachine
ができることは何でもできます。たとえば、サブタスクを再帰的に作成したり、SkyValue を検索したりできます。lookUp
と同様に、ステートマシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を
class Subtasks implements StateMachine {
private int i = 0;
@Override
public StateMachine step(Tasks tasks) {
tasks.enqueue(new Subtask1());
tasks.enqueue(new Subtask2());
// The next step is Subtasks.processResults. It won't be called until both
// Subtask1 and Subtask 2 are complete.
return this::processResults;
}
private StateMachine processResults(Tasks tasks) {
System.out.println(i); // Prints "3".
return DONE; // Subtasks is done.
}
private class Subtask1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
i += 1;
return DONE; // Subtask1 is done.
}
}
private class Subtask2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
i += 2;
return DONE; // Subtask2 is done.
}
}
}
Subtask1
と Subtask2
は論理的には同時実行されますが、すべてが単一スレッドで実行されるため、i
の「同時実行」更新に同期は必要ありません。
構造化された同時実行
すべての lookUp
と enqueue
は、次の状態に進む前に解決する必要があります。つまり、同時実行は当然、ツリー構造に制限されます。次の例に示すように、階層型の 5 同時実行を作成できます。
UML からは、同時実行構造がツリーを形成していることを区別するのは困難です。ツリー構造をよりわかりやすく示す代替ビューもあります。
構造化された同時実行は、簡単に推測できます。
コンポジションと制御フロー パターン
このセクションでは、複数の StateMachine
をコンポーズする方法の例と、特定の制御フローの問題の解決策について説明します。
シーケンシャル ステータス
これは、最も一般的な直感的な制御フロー パターンです。例については、SkyKeyComputeState
内のステートフル計算をご覧ください。
ブランチ
StateMachine
の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。
class Branch implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
// Returns different state machines, depending on condition.
if (shouldUseA()) {
return this::performA;
}
return this::performB;
}
…
}
特定のブランチが早期に完了するために DONE
を返すことは非常に一般的です。
高度な連続コンポーズ
StateMachine
制御構造はメモリレスであるため、StateMachine
定義をサブタスクとして共有すると不便な場合があります。M1 と M2 を、StateMachine
インスタンスとし、StateMachine
S を共有します。M1 と M2 は、それぞれシーケンス <A, S, B> と <X, S, Y> です。問題は、S が完了後に B に進むか Y に進むかを認識しておらず、StateMachine
が呼び出しスタックを保持していないことです。このセクションでは、これを実現するための手法について説明します。
StateMachine
を終端シーケンス要素として使用
これでは、最初に提示された問題は解決しません。共有 StateMachine
がシーケンスの終端にある場合にのみ、シーケンシャル コンポジションを示します。
// S is the shared state machine.
class S implements StateMachine { … }
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
return new S();
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
return new S();
}
}
これは、S 自体が複雑なステートマシンであっても機能します。
連続コンポジションのサブタスク
キューに登録されたサブタスクは、次の状態になる前に必ず完了するため、サブタスク メカニズムを少し悪用する6こともできます。
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
// S starts after `step` returns and by contract must complete before `doB`
// begins. It is effectively sequential, inducing the sequence < A, S, B >.
tasks.enqueue(new S());
return this::doB;
}
private StateMachine doB(Tasks tasks) {
performB();
return DONE;
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
// Similarly, this induces the sequence < X, S, Y>.
tasks.enqueue(new S());
return this::doY;
}
private StateMachine doY(Tasks tasks) {
performY();
return DONE;
}
}
runAfter
インジェクション
S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp
呼び出しがあるため、Tasks.enqueue
を悪用できない場合があります。この場合、runAfter
パラメータを S に挿入することで、次に行うべきことを S に通知できます。
class S implements StateMachine {
// Specifies what to run after S completes.
private final StateMachine runAfter;
@Override
public StateMachine step(Tasks tasks) {
… // Performs some computations.
return this::processResults;
}
@Nullable
private StateMachine processResults(Tasks tasks) {
… // Does some additional processing.
// Executes the state machine defined by `runAfter` after S completes.
return runAfter;
}
}
class M1 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performA();
// Passes `this::doB` as the `runAfter` parameter of S, resulting in the
// sequence < A, S, B >.
return new S(/* runAfter= */ this::doB);
}
private StateMachine doB(Tasks tasks) {
performB();
return DONE;
}
}
class M2 implements StateMachine {
@Override
public StateMachine step(Tasks tasks) {
performX();
// Passes `this::doY` as the `runAfter` parameter of S, resulting in the
// sequence < X, S, Y >.
return new S(/* runAfter= */ this::doY);
}
private StateMachine doY(Tasks tasks) {
performY();
return DONE;
}
}
このアプローチは、サブタスクを不正使用するよりもクリーンです。ただし、複数の StateMachine
を runAfter
でネストするなど、十分にこれを適用することは、Callback Hell への道のりになります。代わりに、通常の連続状態を使用して連続した runAfter
を分割することをおすすめします。
return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))
これを次のように置き換えることができます。
private StateMachine step1(Tasks tasks) {
doStep1();
return new S(/* runAfter= */ this::intermediateStep);
}
private StateMachine intermediateStep(Tasks tasks) {
return new T(/* runAfter= */ this::nextStep);
}
禁止の代替: runAfterUnlessError
以前のドラフトでは、エラー発生時に早期に中止する runAfterUnlessError
を検討していました。これは、エラーが 2 回チェックされることがよくあるという事実に基づいています。1 回は runAfter
参照を持つ StateMachine
によって、もう 1 回は runAfter
マシン自体によってチェックされます。
慎重に検討した結果、エラーチェックの重複除去よりもコードの統一性が重要であると判断しました。runAfter
メカニズムが tasks.enqueue
メカニズムと整合して動作しない場合、エラー チェックが常に必要になります。
直接委任
正式な状態遷移が発生するたびに、メイン Driver
ループが進行します。契約に基づき、状態を進めるということは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine
のロジックにより、フェーズ進展が不要または逆効果になることがあります。たとえば、委任先の最初の step
が、委任元の状態の検索と並列化できる SkyKey 検索を実行する場合、フェーズ アドバンスにより、検索が順序付けられます。次の例に示すように、直接委任を行う方が適切な場合があります。
class Parent implements StateMachine {
@Override
public StateMachine step(Tasks tasks ) {
tasks.lookUp(new Key1(), this);
// Directly delegates to `Delegate`.
//
// The (valid) alternative:
// return new Delegate(this::afterDelegation);
// would cause `Delegate.step` to execute after `step` completes which would
// cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
return new Delegate(this::afterDelegation).step(tasks);
}
private StateMachine afterDelegation(Tasks tasks) {
…
}
}
class Delegate implements StateMachine {
private final StateMachine runAfter;
Delegate(StateMachine runAfter) {
this.runAfter = runAfter;
}
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new Key2(), this);
return …;
}
// Rest of implementation.
…
private StateMachine complete(Tasks tasks) {
…
return runAfter;
}
}
データフロー
これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。
Tasks.lookUp
コールバックの実装
SkyValue ルックアップで Tasks.lookUp
コールバックを実装する例があります。このセクションでは、複数の SkyValue を処理する理由とアプローチについて説明します。
Tasks.lookUp
コールバック
Tasks.lookUp
メソッドは、コールバック sink
をパラメータとして受け取ります。
void lookUp(SkyKey key, Consumer<SkyValue> sink);
慣用的な方法では、Java ラムダを使用してこれを実装します。
tasks.lookUp(key, value -> myValue = (MyValueClass)value);
ここで、myValue
はルックアップを行う StateMachine
インスタンスのメンバー変数です。ただし、ラムダでは、StateMachine
実装で Consumer<SkyValue>
インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要になります。このラムダは、あいまいになるような複数のルックアップがある場合にも有用です。
SkyFunction.Environment.getValueOrThrow
と同様に、Tasks.lookUp
のエラー処理のオーバーロードもあります。
<E extends Exception> void lookUp(
SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);
interface ValueOrExceptionSink<E extends Exception> {
void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
}
実装例を以下に示します。
class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
private MyValue value;
private MyException error;
@Override
public StateMachine step(Tasks tasks) {
tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
return this::processResult;
}
@Override
public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
if (value != null) {
this.value = (MyValue)value;
return;
}
if (exception != null) {
this.error = exception;
return;
}
throw new IllegalArgumentException("Both parameters were unexpectedly null.");
}
private StateMachine processResult(Tasks tasks) {
if (exception != null) {
// Handles the error.
…
return DONE;
}
// Processes `value`, which is non-null.
…
}
}
エラー処理のないルックアップと同様に、StateMachine
クラスがコールバックを直接実装すると、ラムバのメモリ割り当てを節約できます。
エラー処理ではもう少し詳しく説明しますが、基本的には、エラーの伝播と通常の値に大きな違いはありません。
複数の SkyValue を使用する
多くの場合、複数の SkyValue ルックアップが必要になります。ほとんどの場合、SkyValue のタイプをオンにすると問題が解決します。以下は、プロトタイプの本番環境コードから簡略化した例です。
@Nullable
private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
var configurationKey = configuredTarget.getConfigurationKey();
if (configurationKey != null) {
tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
}
var packageId = configuredTarget.getLabel().getPackageIdentifier();
tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);
return this::constructResult;
}
@Override // Implementation of `Consumer<SkyValue>`.
public void accept(SkyValue value) {
if (value instanceof BuildConfigurationValue) {
this.configurationValue = (BuildConfigurationValue) value;
return;
}
if (value instanceof PackageValue) {
this.pkg = ((PackageValue) value).getPackage();
return;
}
throw new IllegalArgumentException("unexpected value: " + value);
}
値の型が異なるため、Consumer<SkyValue>
コールバックの実装は明確に共有できます。そうでない場合は、ラムダベースの実装または適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックできます。
StateMachine
間での値の伝播
このドキュメントでは、サブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値を報告する必要もあります。サブタスクは論理的に非同期であるため、その結果はコールバックを使用して呼び出し元に通知されます。これを実現するため、サブタスクでは、コンストラクタを介して挿入されるシンク インターフェースを定義します。
class BarProducer implements StateMachine {
// Callers of BarProducer implement the following interface to accept its
// results. Exactly one of the two methods will be called by the time
// BarProducer completes.
interface ResultSink {
void acceptBarValue(Bar value);
void acceptBarError(BarException exception);
}
private final ResultSink sink;
BarProducer(ResultSink sink) {
this.sink = sink;
}
… // StateMachine steps that end with this::complete.
private StateMachine complete(Tasks tasks) {
if (hasError()) {
sink.acceptBarError(getError());
return DONE;
}
sink.acceptBarValue(getValue());
return DONE;
}
}
呼び出し元 StateMachine
は次のようになります。
class Caller implements StateMachine, BarProducer.ResultSink {
interface ResultSink {
void acceptCallerValue(Bar value);
void acceptCallerError(BarException error);
}
private final ResultSink sink;
private Bar value;
Caller(ResultSink sink) {
this.sink = sink;
}
@Override
@Nullable
public StateMachine step(Tasks tasks) {
tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
return this::processResult;
}
@Override
public void acceptBarValue(Bar value) {
this.value = value;
}
@Override
public void acceptBarError(BarException error) {
sink.acceptCallerError(error);
}
private StateMachine processResult(Tasks tasks) {
// Since all enqueued subtasks resolve before `processResult` starts, one of
// the `BarResultSink` callbacks must have been called by this point.
if (value == null) {
return DONE; // There was a previously reported error.
}
var finalResult = computeResult(value);
sink.acceptCallerValue(finalResult);
return DONE;
}
}
上記の例は、いくつかの点を示しています。Caller
は結果を伝播し、独自の Caller.ResultSink
を定義する必要があります。Caller
は BarProducer.ResultSink
コールバックを実装します。再開時に、processResult
は value
が null かどうかを確認し、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップの出力を受け入れた後の一般的な動作パターンです。
acceptBarError
の実装では、エラーの再帰で必要とされるように、結果が Caller.ResultSink
にエアリーフされます。
トップレベルの StateMachine
の代替方法については、Driver
と SkyFunctions へのブリッジをご覧ください。
エラー処理
Tasks.lookUp
コールバックと StateMachines
間での値のプロパゲーションに、エラー処理の例がいくつかあります。InterruptedException
以外の例外はスローされず、代わりにコールバックを介して値として渡されます。多くの場合、このようなコールバックには排他的またはセマンティクスがあり、値またはエラーのいずれか 1 つのみが渡されます。
次のセクションでは、Skyframe エラー処理との微妙な、しかし重要な相互作用について説明します。
エラーの再帰(--nokeep_going)
エラーの再帰処理中、リクエストされたすべての SkyValue が使用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks
API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine
は引き続き例外を伝播する必要があります。
次の状態に到達したかどうかにかかわらず伝播が行われる必要があるため、エラー処理コールバックがこのタスクを実行する必要があります。内部 StateMachine
の場合、これは親コールバックを呼び出すことで実現できます。
SkyFunction とインターフェースする最上位の StateMachine
では、ValueOrExceptionProducer
の setException
メソッドを呼び出します。その後、SkyValues が欠落している場合でも、ValueOrExceptionProducer.tryProduceValue
は例外をスローします。
Driver
が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。
イベント処理
イベントを出力する必要がある SkyFunctions の場合、StoredEventHandler
が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine
に挿入されます。以前は、SkyFrame で特定のイベントがリプレイされない限りドロップすることが原因で、StoredEventHandler
が必要でしたが、これは後で修正されました。StoredEventHandler
インジェクションが保持されるのは、エラー処理コールバックから出力されるイベントの実装が簡素化されるためです。
Driver
と SkyFunctions へのブリッジ
Driver
は、指定されたルート StateMachine
で始まる StateMachine
の実行を管理します。StateMachine
はサブタスク StateMachine
を再帰的にキューに追加できるため、1 つの Driver
で複数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果であるツリー構造を作成します。Driver
は、サブタスク間で SkyValue ルックアップを一括処理して効率を高めます。
Driver
を中心に構築されたクラスはいくつかあり、次の API があります。
public final class Driver {
public Driver(StateMachine root);
public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}
Driver
は、単一のルート StateMachine
をパラメータとして受け取ります。Driver.drive
を呼び出すと、Skyframe を再起動せずに StateMachine
が実行されます。StateMachine
が完了した場合は true を返し、完了しない場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。
Driver
は StateMachine
の同時実行状態を維持し、SkyKeyComputeState
への埋め込みに適しています。
Driver
を直接インスタンス化する
StateMachine
の実装では、通常、コールバックを介して結果が通知されます。次の例に示すように、Driver
を直接インスタンス化できます。
Driver
は、対応する ResultSink
の実装とともに SkyKeyComputeState
実装に埋め込まれます。この実装は、後ほど定義します。最上位レベルでは、State
オブジェクトは Driver
よりも長く存続することが保証されているため、計算結果の適切なレシーバーです。
class State implements SkyKeyComputeState, ResultProducer.ResultSink {
// The `Driver` instance, containing the full tree of all `StateMachine`
// states. Responsible for calling `StateMachine.step` implementations when
// asynchronous values are available and performing batched SkyFrame lookups.
//
// Non-null while `result` is being computed.
private Driver resultProducer;
// Variable for storing the result of the `StateMachine`
//
// Will be non-null after the computation completes.
//
private ResultType result;
// Implements `ResultProducer.ResultSink`.
//
// `ResultProducer` propagates its final value through a callback that is
// implemented here.
@Override
public void acceptResult(ResultType result) {
this.result = result;
}
}
以下のコードは、ResultProducer
をスケッチします。
class ResultProducer implements StateMachine {
interface ResultSink {
void acceptResult(ResultType value);
}
private final Parameters parameters;
private final ResultSink sink;
… // Other internal state.
ResultProducer(Parameters parameters, ResultSink sink) {
this.parameters = parameters;
this.sink = sink;
}
@Override
public StateMachine step(Tasks tasks) {
… // Implementation.
return this::complete;
}
private StateMachine complete(Tasks tasks) {
sink.acceptResult(getResult());
return DONE;
}
}
この場合、結果を遅延計算するコードは次のようになります。
@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
throws InterruptedException {
if (state.result != null) {
return state.result;
}
if (state.resultProducer == null) {
state.resultProducer = new Driver(new ResultProducer(
new Parameters(), (ResultProducer.ResultSink)state));
}
if (state.resultProducer.drive(env)) {
// Clears the `Driver` instance as it is no longer needed.
state.resultProducer = null;
}
return state.result;
}
エンベディング Driver
StateMachine
が値を生成し、例外を発生させない場合は、次の例に示すように、Driver
を埋め込むこともできます。
class ResultProducer implements StateMachine {
private final Parameters parameters;
private final Driver driver;
private ResultType result;
ResultProducer(Parameters parameters) {
this.parameters = parameters;
this.driver = new Driver(this);
}
@Nullable // Null when a Skyframe restart is needed.
public ResultType tryProduceValue( SkyFunction.Environment env)
throws InterruptedException {
if (!driver.drive(env)) {
return null;
}
return result;
}
@Override
public StateMachine step(Tasks tasks) {
… // Implementation.
}
SkyFunction には、次のようなコードが含まれている場合があります(ここで、State
は SkyKeyComputeState
の関数固有の型です)。
@Nullable // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
throws InterruptedException {
if (state.result != null) {
return state.result;
}
if (state.resultProducer == null) {
state.resultProducer = new ResultProducer(new Parameters());
}
var result = state.resultProducer.tryProduceValue(env);
if (result == null) {
return null;
}
state.resultProducer = null;
return state.result = result;
}
Driver
を StateMachine
実装に埋め込むことは、Skyframe の同期コーディング スタイルに適しています。
例外を生成する可能性がある StateMachine
それ以外の場合は、SkyKeyComputeState
に埋め込める ValueOrExceptionProducer
クラスと ValueOrException2Producer
クラスがあり、同期 SkyFunction コードに対応する同期 API があります。
ValueOrExceptionProducer
抽象クラスには次のメソッドが含まれています。
public abstract class ValueOrExceptionProducer<V, E extends Exception>
implements StateMachine {
@Nullable
public final V tryProduceValue(Environment env)
throws InterruptedException, E {
… // Implementation.
}
protected final void setValue(V value) { … // Implementation. }
protected final void setException(E exception) { … // Implementation. }
}
これには埋め込みの Driver
インスタンスが含まれており、埋め込みドライバの ResultProducer
クラスによく似ており、同様の方法で SkyFunction とインターフェースがあります。実装では、ResultSink
を定義する代わりに、どちらかが発生したときに setValue
または setException
を呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue
メソッドは、非同期コールバック コードを同期コードにブリッジし、例外が設定されている場合は例外をスローします。
前述のように、エラーの発生時に、すべての入力が使用できないため、マシンがまだ完了していない場合でもエラーが発生する可能性があります。これに対応するため、tryProduceValue
は、マシンが完了する前でも、設定された例外をスローします。
エピローグ: コールバックの削除
StateMachine
は、非同期計算を実行するための非常に効率的ですが、ボイラープレートを多用する方法です。継続(特に ListenableFuture
に渡される Runnable
の形式)は Bazel コードの特定の部分で広く使用されていますが、分析 SkyFunctions では一般的ではありません。分析はほとんど CPU バウンドであり、ディスク I/O に効率的な非同期 API はありません。最終的には、コールバックは習得に時間がかかり、読みやすさを損なうため、最適化することをおすすめします。
最も有望な選択肢の一つは、Java 仮想スレッドです。コールバックを記述する代わりに、すべてが同期ブロッキング呼び出しに置き換えられます。これが可能になるのは、プラットフォーム スレッドとは異なり、仮想スレッド リソースのタイアップは安価であるためです。ただし、仮想スレッドを使用する場合でも、単純な同期オペレーションをスレッド作成と同期のプリミティブに置き換えると、コストがかかりすぎることになります。StateMachine
から Java 仮想スレッドに移行しましたが、速度が数桁遅くなり、エンドツーエンドの分析レイテンシがほぼ 3 倍になりました。仮想スレッドはまだプレビュー機能であるため、後日、パフォーマンスが改善されたときにこの移行を実施できる可能性があります。
検討すべきもう 1 つの方法は、Loom コルーチンが利用可能になったら待機することです。この方法の利点は、協調的マルチタスクを使用して同期のオーバーヘッドを削減できることです。
それでも問題が解決しない場合は、低レベルのバイトコードの書き換えも有効な方法です。十分に最適化すると、手書きのコールバック コードに近いパフォーマンスを実現できる場合があります。
付録
コールバック地獄
コールバック ヘルは、コールバックを使用する非同期コードでよく見られる問題です。これは、後続のステップの継続が前のステップ内にネストされているという事実に起因しています。ステップが多い場合、このネストはかなり深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。
class CallbackHell implements StateMachine {
@Override
public StateMachine step(Tasks task) {
doA();
return (t, l) -> {
doB();
return (t1, l2) -> {
doC();
return DONE;
};
};
}
}
ネストされた実装の利点の 1 つは、外側のステップのスタックフレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に最終的である必要があるため、このような変数を使用するのは面倒です。次のように、メソッド参照をラムダではなく継続として返すことで、深いネストを回避できます。
class CallbackHellAvoided implements StateMachine {
@Override
public StateMachine step(Tasks task) {
doA();
return this::step2;
}
private StateMachine step2(Tasks tasks) {
doB();
return this::step3;
}
private StateMachine step3(Tasks tasks) {
doC();
return DONE;
}
}
runAfter
挿入パターンが過密に使用されている場合にも、コールバック ヘルが発生することがあります。これは、挿入を連続したステップに散りばめることで回避できます。
例: 連鎖した SkyValue ルックアップ
多くの場合、アプリ ロジックで SkyValue ルックアップの依存チェーンが必要となることがあります。たとえば、2 つ目の SkyKey が 1 つ目の SkyValue に依存している場合などです。単純に考えると、複雑でネストされたコールバック構造が作成されます。
private ValueType1 value1;
private ValueType2 value2;
private StateMachine step1(...) {
tasks.lookUp(key1, (Consumer<SkyValue>) this); // key1 has type KeyType1.
return this::step2;
}
@Override
public void accept(SkyValue value) {
this.value1 = (ValueType1) value;
}
private StateMachine step2(...) {
KeyType2 key2 = computeKey(value1);
tasks.lookup(key2, this::acceptValueType2);
return this::step3;
}
private void acceptValueType2(SkyValue value) {
this.value2 = (ValueType2) value;
}
ただし、継続はメソッド参照として指定されるため、コードは状態遷移全体で手続き型に見えます。step2
は step1
に続くものです。ここでは、ラムダを使用して value2
を割り当てます。これにより、コードの順序が上から下の計算の順序と一致するようになります。
その他のヒント
読みやすさ: 実行順序
読みやすくするために、StateMachine.step
実装を実行順序に維持し、コールバック実装をコード内で渡される場所の直後に配置するようにしてください。制御フローが分岐する場合は、これが常に可能であるとは限りません。そのような場合は、追加のコメントがあると役に立つかもしれません。
例: 連鎖された SkyValue ルックアップでは、この目的で中間メソッド参照が作成されます。これにより、パフォーマンスがわずかに低下しますが、読みやすさが向上します。
世代仮説
中程度の存続期間の Java オブジェクトは、非常に短い存続期間のオブジェクトまたは永続的に存続するオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState
内のオブジェクトはこの仮説に違反します。このようなオブジェクトには、Driver
をルートとする、まだ実行中のすべての StateMachine
の作成済みツリーが含まれます。このオブジェクトは、非同期計算が完了するのを待機しながら一時停止するため、中間の存続期間があります。
JDK19 では問題が軽減されているようですが、StateMachine
を使用すると、生成される実際のガベージが大幅に減少しても、GC 時間が長くなることがあります。StateMachine
の存続期間は中程度であるため、古い世代に昇格させるとすぐにいっぱいになります。そのため、より高価なメジャー GC またはフル GC をクリーンアップする必要があります。
最初の注意事項は、StateMachine
変数の使用を最小限に抑えることですが、複数の状態にわたって値が必要な場合など、必ずしも実現可能ではありません。可能であれば、ローカルスタック step
変数は若い世代の変数であり、効率的に GC されます。
StateMachine
変数の場合は、サブタスクに分割し、StateMachine
間で値を伝播する推奨パターンに従うことも役立ちます。このパターンに従うと、子 StateMachine
のみが親 StateMachine
を参照し、その逆は参照しません。つまり、子が完了して結果コールバックを使用して親を更新すると、子は自然にスコープから外れ、GC の対象になります。
最後に、StateMachine
変数は初期状態では必要ですが、後半の状態では必要ない場合もあります。大規模なオブジェクトが不要になったことが判明したら、その参照を null にすると便利です。
状態に名前を付ける
メソッドに名前を付けるときは、通常、そのメソッド内で発生する動作にメソッド名を付けることができます。スタックがないため、StateMachine
でこれを行う方法は明確ではありません。たとえば、メソッド foo
がサブメソッド bar
を呼び出すとします。StateMachine
では、これは状態シーケンス foo
に翻訳され、その後に bar
が続きます。foo
に動作 bar
が含まれなくなりました。その結果、状態のメソッド名は範囲が狭くなる傾向があり、ローカルの動作を反映している可能性もあります。
同時実行ツリーの図
次の図は、構造化同時実行の図の別のビューで、ツリー構造をより適切に示しています。ブロックは小さな木を形成します。
-
これは、値が使用できない場合に最初から再起動する Skyframe の規則とは対照的です。 ↩
-
step
はInterruptedException
をスローできますが、この例では省略しています。Bazel コードには、この例外をスローし、StateMachine
を実行するDriver
(後述)に伝播するローメソッドがいくつかあります。不要なときにスローされるように宣言しなくても問題ありません。↩ -
同時実行サブタスクは、依存関係ごとに独立した処理を実行する
ConfiguredTargetFunction
によって動作します。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性を導入するのではなく、各依存関係に独自の独立したStateMachine
があります。 ↩ -
1 つのステップ内の複数の
tasks.lookUp
呼び出しはまとめてバッチ処理されます。追加のバッチ処理は、同時実行サブタスク内で発生するルックアップによって作成できます。 ↩ -
これは、スレッドを生成して結合し、順序付きコンポジションを実現する方法に似ています。 ↩