SkyFrame StateMachines 指南

回報問題 查看來源

總覽

SkyFrame StateMachine 是位於堆積上的「已拆解」函式物件。當無法立即取得必要值,並以非同步方式計算時,可支援彈性和評估,而且無須備援1StateMachine 在等待期間無法連結執行緒資源,但必須暫停並重新啟用。解構作業因此會公開明確的重新進入點,以便略過先前的運算作業。

StateMachine 可用來表示序列、分支、結構化邏輯並行,專為 SkyFrame 互動而設計。StateMachine 可組成更大的 StateMachine 並共用子 StateMachine。並行是一種結構性且純粹的邏輯。每個並行子工作都會在單一共用父項 SkyFunction 執行緒中執行。

簡介

本節會簡要說明並介紹 java.com.google.devtools.build.skyframe.state 套件中的 StateMachine

SkyFrame 重新啟動的簡短說明

SkyFrame 是對依附元件圖表執行平行評估的架構。圖表中的每個節點都會對應至一個 SkyFunction 的評估結果,該函式會使用 SkyKey 指定參數,以及指定其結果的 SkyValue。運算模型會讓 SkyFunction 查詢 SkyValue 來查詢 SkyValues,以觸發其他 SkyFunction 的遞迴且平行評估。當要求的 SkyValue 尚未完成,且要求的 SkyValue 尚未準備就緒時,不會封鎖,也不會封鎖該執行緒,而是會觀察 null getValue 回應,並應傳回 null,而非 SkyValue,表示因為缺少輸入內容,而導致該回應不完整。先前所有要求的 SkyValues 可供使用時,SkyFrame 會重新啟動 SkyFunction。

SkyKeyComputeState 導入之前,傳統的重新啟動處理方式是重新執行運算。雖然這具有二次複雜性,但由於每次重新執行時,以這種方式寫入的函式最終會完成,但傳回的查詢較少 null。使用 SkyKeyComputeState 可將手動指定的查核點資料與 SkyFunction 建立關聯,以節省大量重新計算費用。

StateMachine 是位於 SkyKeyComputeState 中的物件,當 SkyFunction 重新啟動時 (假設 SkyKeyComputeState 不會從快取中消失),會公開暫停和繼續執行掛鉤,這類物件幾乎所有重新運算作業。

SkyKeyComputeState 內的有狀態運算

從物件導向的設計的角度來看,考慮將運算物件儲存在 SkyKeyComputeState 中,而不是只儲存純資料值,在 Java 中,攜帶物件的行為最低描述是「功能介面」,而這已足夠。StateMachine 具有以下又好奇又遞迴的定義2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 介面與 SkyFunction.Environment 類似,但其設計是針對非同步作業,並新增支援邏輯並行子工作3

step 的傳回值是另一個 StateMachine,允許指定一系列步驟。StateMachine 完成時,step 會傳回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

說明具有下列輸出內容的 StateMachine

hello
world

請注意,由於 step2 符合 StateMachine 的功能介面定義,因此方法參照 this::step2 也是 StateMachine。如要在 StateMachine 中指定下一個狀態,最常見的方法參照方法參照。

暫停與繼續

以直覺來說,將運算作業細分為 StateMachine 步驟,而非單體式函式,會提供「暫停」和「恢復」suspend運算所需的掛鉤。suspendStateMachine.step 傳回時,會有明確的暫停點。回傳的 StateMachine 值所指定的接續點是明確的「恢復」點。因此可以避免重新計算,因為系統可以直接從上次中斷的地方開始計算。

回呼、接續與非同步運算

就技術層面而言,StateMachine 可做為「接續」continuation,用於決定後續要執行的運算作業。StateMachine 可以透過從 step 函式傳回,這會將控制權移回 Driver 例項,而不是封鎖,而自行「暫停」suspend。接著,Driver 可以切換至準備就緒的 StateMachine,或放棄控制項返回 SkyFrame。

一般來說,「回呼」callbacks和「連續」callbacks會聯想到一個概念。不過,StateMachine 會保留兩者。

  • 回呼 - 說明儲存非同步運算結果的儲存位置。
  • 接續 - 指定下一個執行狀態。

叫用非同步作業時需要回呼,這表示在呼叫方法時,實際作業不會立即發生,就像使用 SkyValue 查詢時一樣。回呼應盡可能保持簡單。

接續StateMachine 回傳的 StateMachine 值,並封裝所有非同步運算解決後的複雜執行作業。這種結構化方法有助於維持回呼的複雜度。

工作

Tasks 介面提供 StateMachine,其中的 API 可依 SkyKey 查詢 SkyValues,並安排並行子工作。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查詢

StateMachine 會使用 Tasks.lookUp 超載查詢 SkyValues。這類似於 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow,並且具有類似的例外狀況處理語意。實作不會立即執行查詢,而是在執行前盡可能批次執行4查詢作業。此值可能無法立即取得 (例如需要重新啟動 SkyFrame),因此呼叫端可使用回呼指定要如何處理產生的值。

StateMachine 處理器 (Driver 並橋接至 SkyFrame) 會保證該值會在下一個狀態開始前可用。範例如下。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上述範例中,第一個步驟會查詢 new Key(),並將 this 做為取用端傳遞。可以這麼做的原因是 DoesLookup 實作了 Consumer<SkyValue>

根據合約,在下一個狀態 DoesLookup.processValue 開始之前,DoesLookup.step 的所有查詢均已完成。因此,可從 processValue 存取 value

子工作

Tasks.enqueue 會要求在邏輯上並行執行子工作。子工作也是 StateMachine,可以執行一般 StateMachine 可執行的任何操作,包括以遞迴方式建立更多子工作或查詢 SkyValues。與 lookUp 類似,狀態機器驅動程式可確保所有子工作都已完成,再繼續進行下一步。範例如下。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

雖然 Subtask1Subtask2 在邏輯上是並行的,但所有項目皆在單一執行緒中執行,因此 i 的「並行」更新不需要任何同步處理。

結構化並行

由於每個 lookUpenqueue 都必須解析到下一個狀態,因此並行作業自然受限於樹狀結構。如以下範例所示,您可以建立階層5並行。

結構化並行

UML 可得知並行結構形成了樹狀結構。利用替代檢視畫面,可以更妥善地顯示樹狀結構結構。

非結構化並行

結構化並行就是容易理解的。

組合和控制流程模式

本節舉例說明如何撰寫多個 StateMachine,並針對特定控制流程問題提供解決方案。

依序狀態

這是最常見也最直接的控制流程模式。如需相關範例,請參閱 SkyKeyComputeState 內的有狀態運算

分支版本

使用一般的 Java 控制流程傳回不同值,即可達到 StateMachine 中的分支狀態,如以下範例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

某些分支版本在提早完成時傳回 DONE 是很常見的情況。

進階依序組合

由於 StateMachine 控制結構屬於無記憶體的結構,因此將 StateMachine 定義做為子工作有時可能會不太容易。讓 M1M2 都是共用 StateMachineSStateMachine 執行個體,M1M2 分別是序列 <A、S、B><X, S, Y>。問題在於 S 無法知道完成後要繼續執行 B 還是 Y,且 StateMachine 不會完全保留呼叫堆疊。本節會說明達成這個目標的幾種技巧。

使用 StateMachine 做為終端機序列元素

無法解決最初提出的問題。只有在共用的 StateMachine 為序列中的終端機時,才會示範依序組合。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是複雜的狀態機器,還是可以運作。

依序組合的子工作

由於新增至佇列的子工作保證會在下一個狀態前完成,因此有時可能會稍微濫用6子工作機制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter注射

有時候,您無法濫用 Tasks.enqueue,因為還有其他平行子工作或 Tasks.lookUp 呼叫必須在 S 執行前完成。在這種情況下,在 S 中插入 runAfter 參數可用來通知 S 後續步驟。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

此方法比濫用子工作更簡潔。不過,過於謹慎地套用這種做法 (例如使用 runAfter 建立多個 StateMachine 的巢狀結構),就是通往 Callback Hell 的道路。最好改用一般序列狀態拆分序列的 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替換成下列內容

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

已禁止的替代方案:runAfterUnlessError

在先前的草稿中,我們考慮了會在錯誤時迅速取消的 runAfterUnlessError。這項範例的動機在於錯誤最終通常會檢查兩次,一次是含有 runAfter 參照的 StateMachine,一次是由 runAfter 機器本身執行一次。

經過審慎評估後,我們認為程式碼的統一性比複製錯誤檢查更重要。如果 runAfter 機制未能與 tasks.enqueue 機制一致運作,而這類機制一律需要進行錯誤檢查,就會令人感到困惑。

直接委派

每次有正式狀態轉換時,主要 Driver 迴圈都會推進。根據合約,進階狀態表示所有先前加入佇列的 SkyValue 查詢和子工作都會在執行下一個狀態前解析。有時,委派 StateMachine 的邏輯會導致階段提前為不必要或反效果。舉例來說,如果委派代表的第一個 step 執行能與委派狀態查詢平行處理的 SkyKey 查詢,則推進階段會讓這些查詢依序執行。而系統更應該執行直接委派,如以下範例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

資料流程

前文討論的重點在於管理控制流程。本節說明資料值的傳播。

實作 Tasks.lookUp 回呼

範例說明如何在 SkyValue 查詢中實作 Tasks.lookUp 回呼。本節將說明原因,並建議處理多個 SkyValue 的方法。

Tasks.lookUp 回呼

Tasks.lookUp 方法會將回呼 sink 做為參數。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的方法是使用 Java lambda 來實作:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是執行查詢的 StateMachine 執行個體成員變數。不過,相較於在 StateMachine 實作中實作 Consumer<SkyValue> 介面, lambda 需要額外的記憶體分配。當有多筆查詢可能模稜兩可時,lambda 仍然非常實用。

也會錯誤處理 Tasks.lookUp 的超載,類似於 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

以下是實作範例:

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

與不含錯誤處理的查詢一樣,讓 StateMachine 類別直接實作回呼,即可為 lambda 省下記憶體配置。

「錯誤處理」可提供更多細節,但基本上,錯誤與一般值之間沒有太多差異。

使用多個 SkyValue

通常需要多次查詢 SkyValue。很多時間都可行的方法,就是切換 SkyValue 的類型。以下範例已從原型製作程式碼簡化。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

Consumer<SkyValue> 回呼實作可以明確共用,因為值類型不同。如果情況不同,您可以改回使用以 lambda 為基礎的實作,或實作適當回呼的完整內部類別例項。

StateMachine 秒之間傳播值

到目前為止,本文件只說明如何安排子工作中的工作,但子工作也需要將值回報給呼叫端。由於子工作在邏輯上屬於非同步性質,因此子工作結果會透過回呼將結果傳遞給呼叫端。為執行這項作業,子工作會定義透過其建構函式插入的接收器介面。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼叫端 StateMachine 看起來會如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上述範例示範了幾個要點。Caller 必須傳回其結果,並定義自己的 Caller.ResultSinkCaller 會實作 BarProducer.ResultSink 回呼。在恢復時,processResult 會檢查 value 是否為空值,判斷是否發生錯誤。在接受子工作或 SkyValue 查詢的輸出之後,這是常見的行為模式。

請注意,實作 acceptBarError 會主動將結果轉送至 Caller.ResultSink,如同錯誤泡泡的必要條件。

如要瞭解頂層 StateMachine 的替代做法,請參閱 Driver 並橋接至 SkyFunctions

處理錯誤

Tasks.lookUp 回呼StateMachines 之間的值之前有幾個錯誤處理範例。系統不會擲回 InterruptedException 以外的例外狀況,而會改為透過回呼作為值傳遞。這類回呼通常具有專屬或語意,而且只會傳遞一個值或錯誤。

下一節將說明不巧但重要的互動方式與 SkyFrame 錯誤處理。

錯誤啟動 (--nokeep_going)

在錯誤啟動期間,即使並非所有要求的 SkyValue 可用,SkyFunction 仍可能重新啟動。在這種情況下,基於 Tasks API 合約,一律不會達到後續狀態。但 StateMachine 仍應傳播例外狀況。

因為無論達到下一個狀態,作業都必須發生,因此錯誤處理回呼必須執行這項工作。對內部 StateMachine 來說,是由叫用父項回呼來達成。

在頂層 StateMachine (包含與 SkyFunction 的介面) 中,呼叫 ValueOrExceptionProducersetException 方法即可達到這個效果。即使缺少 SkyValues,ValueOrExceptionProducer.tryProduceValue 也會擲回例外狀況。

如果直接使用 Driver,請務必檢查 SkyFunction 是否傳播錯誤,即便機器尚未完成處理作業。

事件處理

針對需要發出事件的 SkyFunctions 時,StoredEventHandler 會插入 SkyKeyComputeState,並進一步插入需要這些函式的 StateMachine。以往,由於 SkyFrame 會捨棄某些事件 (除非重新播放),但之後已修正,因此需要 StoredEventHandler。 系統會保留 StoredEventHandler 插入功能,因為這樣可以簡化從處理錯誤處理回呼時發出的事件實作方式。

Driver 並橋接至 SkyFunctions

Driver 負責管理 StateMachine 的執行作業,從指定的根層級 StateMachine 開始。StateMachine 能以遞迴方式將子工作 StateMachine 排入佇列,因此一個 Driver 可以管理大量子工作。這些子工作會建立樹狀結構,而結果為結構化並行Driver 會批次處理跨子工作 SkyValue 的查詢,提高效率。

許多使用 Driver 建構的類別,且使用下列 API。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 會將單一根層級 StateMachine 做為參數。呼叫 Driver.drive 會在不重新啟動 SkyFrame 的情況下盡可能執行 StateMachine。當 StateMachine 完成時會傳回 true,否則會傳回 false,表示沒有可用的值。

Driver 會維護 StateMachine 的並行狀態,非常適合在 SkyKeyComputeState 中嵌入。

直接為 Driver 執行個體化

StateMachine 實作項目通常會透過回呼溝通結果。您可以直接將 Driver 例項化,如以下範例所示。

Driver 嵌入 SkyKeyComputeState 實作項目,同時實作對應的 ResultSink,以便進一步定義。在頂層,State 物件是運算結果的適當接收器,因為其保證會超過 Driver

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

下方程式碼是 ResultProducer 的草圖。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

然後延遲計算結果的程式碼可能如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

正在嵌入「Driver

如果 StateMachine 產生值且未引發例外狀況,則嵌入 Driver 是另一種可行的實作方式,如以下範例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 的程式碼可能如下所示 (其中 StateSkyKeyComputeState 的函式專屬類型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

Driver 嵌入 StateMachine 實作項目,更適合 SkyFrame 的同步程式設計樣式。

可能會產生例外狀況的 StateMachines

否則,也有 SkyKeyComputeState 可嵌入的 ValueOrExceptionProducerValueOrException2Producer 類別,具有同步 API 來比對同步的 SkyFunction 程式碼。

ValueOrExceptionProducer 抽象類別包含下列方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

其中包含嵌入的 Driver 例項,與嵌入驅動程式中的 ResultProducer 類別和 SkyFunction 的介面非常類似。而是會在發生任一情形時呼叫 setValuesetException,而不是定義 ResultSink。當這兩種情況發生時,以例外狀況為優先。tryProduceValue 方法會將非同步回呼程式碼橋接至同步程式碼,並在設定時擲回例外狀況。

如先前所述,在錯誤泡泡期間,即使機器因無法取得所有輸入而尚未完成,還是有可能發生錯誤。為因應這種情況,tryProduceValue 會擲回任何已設定的例外狀況,甚至在機器完成之前亦然。

Epilogue:最終移除回呼

StateMachine 是效率高,但要執行非同步運算的樣板密集方式。連續 (尤其是傳遞至 ListenableFutureRunnable 形式) 在 Bazel 程式碼的特定部分中廣泛,但在分析 SkyFunctions 時則較不盛行。分析主要取決於 CPU 限制,因此磁碟 I/O 沒有高效率的非同步 API。最後,建議您把回呼最佳化,因為回呼會經過學習,導致可讀性不佳。

Java 虛擬執行緒是最具潛力的替代方案之一。您不需要寫入回呼,所有內容都會替換為同步的封鎖呼叫。可以這麼做的原因是,相較於平台執行緒,綁定虛擬執行緒資源應該沒什麼成本。但是,即使使用虛擬執行緒,以建立執行緒和同步處理基元取代簡單的同步作業,也所費不貲。我們從 StateMachine 遷移至 Java 虛擬執行緒,執行速度較慢,因此端對端分析延遲時間幾乎增加了 3 倍。由於虛擬執行緒仍為預先發布版功能,因此效能提升後,可能會在日後再執行這項遷移作業。

另一種建議做法是等待 Loom 協同程式 (如果有的話)。這種做法的好處是可以採用合作式多工處理來減少同步處理作業負擔。

如果其他方法都失敗,低階位元碼重寫也可以是可行的替代方法。如果充分最佳化,也許就能利用手動編寫的回呼程式碼達到最佳效能。

附錄

回歸地獄

回呼堆積是使用回呼的非同步程式碼中嚴重的問題。原因在於後續步驟的接續程序是在上一個步驟中嵌入。如果有多個步驟,這個巢狀結構可能非常深層。如果加上控制流程,程式碼就無法管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

巢狀實作有一個優點之一,就是可以保留外部步驟的堆疊框架。在 Java 中,擷取的 lambda 變數必須是最終版本,因此使用這類變數可能會相當麻煩。如要避免深度巢狀結構,請將方法參照做為接續傳回,而非 lambda,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

過度使用 runAfter 插入模式時,也可能會發生回呼堆積,但使用依序步驟的插補插入可以避免這種情況。

範例:鏈結 SkyValue 查詢

應用程式邏輯需要有 SkyValue 查詢的相依鏈結,例如,如果第二個 SkyKey 依附第一個 SkyValue,則屬於這種情況。無論如何,這將導致複雜且深層的回呼結構。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不過,由於接續會指定為方法參照,因此程式碼在狀態轉換期間看起來會很複雜:step2 後面會跟著 step1。請注意,在這裡,lambda 會用於指派 value2。如此一來,程式碼的順序就會與由上至下計算的運算順序相符。

其他訣竅

可讀性:執行作業排序

為提升可讀性,請盡力將 StateMachine.step 實作項目維持在執行順序,且回呼實作項目必須緊接在程式碼中的位置。控制流程分支版本並非每次都有機會發生。如果是這種情況,其他意見可能有所幫助。

範例:鏈結 SkyValue 查詢中,系統會建立中繼方法參照來達成此目的。這麼做會交換少量的效能提升可讀性,值得參考。

生成假設

中型 Java 物件破壞了 Java 垃圾收集器的生成假設,這類收集器是專為處理短暫的物件或永久存在的物件所設計。根據定義,SkyKeyComputeState 中的物件違反這個假設。這類物件包含所有仍在執行中 StateMachine 的建構樹狀結構,根層級為 Driver,在暫停時具有中間壽命,會等候非同步運算完成。

在 JDK19 中似乎不太糟,但使用 StateMachine 時,有時可能會觀察到 GC 時間增加,即便實際產生的垃圾量會急劇減少。由於 StateMachine 具有中長的生命週期,可以升級為舊版,因此會較快填滿,因此需要更多昂貴的主要或完整 GC 清除所用資源。

一開始的預防措施是盡量減少使用 StateMachine 變數,但有時並非可行,例如在多個狀態中需要一個值時。如果可以,本機堆疊 step 變數為年輕產生變數,且有效率的 GC。

對於 StateMachine 變數,將工作拆成數個子工作,並按照建議的模式StateMachine 之間傳播值,也很有幫助。請注意,按照這個模式操作時,只有子項 StateMachine 能參照父項 StateMachine,反之亦然。也就是說,當子項使用結果回呼完成及更新父項時,子項便會自動排除在範圍內,並且符合 GC 的使用資格。

最後,在某些情況下,先前的狀態需要 StateMachine 變數,但在後續狀態下則不需要。如果已知大型物件的參照已不再需要,將其中的參照設為空值會很有用。

命名狀態

命名方法時,通常可以為該方法內發生的行為命名方法。由於沒有任何堆疊,因此在 StateMachine 中執行此操作的方式比較不明確。舉例來說,假設 foo 方法呼叫了子方法 bar。在 StateMachine 中,這可能會轉譯為狀態序列 foo,後接 barfoo 不再包含行為 bar。因此,狀態的方法名稱通常範圍較小,可反映本機行為。

並行樹狀圖

以下是結構化並行中圖表的替代檢視畫面,可更清楚描述樹狀結構。方塊會形成一棵小樹。

結構化並行 3D


  1. 在沒有可用的值時,SkyFrame 的慣例做法是從從頭開始重新啟動。 

  2. 請注意,step 可以擲回 InterruptedException,但範例會省略這個屬性。Bazel 程式碼中有幾種低的方法會擲回這個例外狀況,而該方法會傳播至 Driver,稍後才會執行 StateMachine。您可以不宣告在不必要的情況下擲回。

  3. 並行子工作源自 ConfiguredTargetFunction,這個元件會針對各個依附元件執行獨立工作。每個依附元件都有自己的獨立 StateMachine,而不是操控會一次處理所有依附元件的複雜資料結構,造成效率不彰的情形。

  4. 單一步驟中的多個 tasks.lookUp 呼叫會批次處理。您也可以在並行子工作內進行查詢,藉此建立其他批次。 

  5. 這在概念上與 Java 的結構化並行 jeps/428 相似。 

  6. 這與產生執行緒並彙整來達到依序組合的方式類似。