SkyFrame StateMachines 指南

回報問題 查看原始碼 Nightly · 8.0 7.4 . 7.3 · 7.2 · 7.1 · 7.0 · 6.5

總覽

Skyframe StateMachine 是位於堆疊上的解構函式物件。當必要值無法立即取得,但會以非同步方式計算時,此方法可支援彈性且不重複的評估1StateMachine 在等待期間無法綁定執行緒資源,而是必須暫停及恢復。因此,解構會公開明確的重新進入點,以便略過先前的運算。

StateMachine 可用於表示序列、分支、結構化邏輯並行作業,並專門針對 Skyframe 互動進行調整。StateMachine 可組合成較大的 StateMachine,並共用子 StateMachine。並行作業一律是透過建構和純粹邏輯來建立階層。每個並行子工作都會在單一共用父項 SkyFunction 執行緒中執行。

簡介

本節將簡要說明 StateMachine 的用途,並介紹 java.com.google.devtools.build.skyframe.state 套件中的 StateMachine

Skyframe 重新啟動功能簡介

Skyframe 是可並行評估依附元件圖表的架構。圖表中的每個節點都對應至 SkyFunction 的評估結果,其中 SkyKey 會指定參數,而 SkyValue 會指定結果。計算模型的運作方式是,SkyFunction 可以透過 SkyKey 查詢 SkyValue,觸發對其他 SkyFunction 的遞迴並行評估。當要求的 SkyValue 尚未就緒,因為某些運算子圖未完成,而非阻斷 (這會綁定執行緒),要求的 SkyFunction 會觀察 null getValue 回應,並應傳回 null 而非 SkyValue,表示因缺少輸入內容而未完成。當先前要求的所有 SkyValues 都已可用時,Skyframe 會重新啟動 SkyFunction。

SkyKeyComputeState 推出之前,處理重新啟動作業的傳統方式是完全重新執行運算。雖然這會產生二次方複雜度,但以這種方式編寫的函式最終會完成,因為每次重跑時,返回 null 的查詢會減少。有了 SkyKeyComputeState,您就可以將手動指定的檢查點資料與 SkyFunction 建立關聯,大幅減少重新運算的次數。

StateMachineSkyKeyComputeState 中存在的物件,可透過公開暫停和繼續執行掛鉤,在 SkyFunction 重新啟動時消除幾乎所有重新計算作業 (假設 SkyKeyComputeState 不會從快取中移除)。

SkyKeyComputeState 中的具狀態運算

從以物件為導向的設計角度來看,建議您考慮將運算物件儲存在 SkyKeyComputeState 中,而非純粹的資料值。在 Java 中,行為載入物件的最低說明是功能介面,而這項說明已足夠。StateMachine 有以下有趣的遞迴定義2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 介面類似 SkyFunction.Environment,但專為非同步作業而設計,並新增對邏輯上並行的子工作支援3

step 的傳回值是另一個 StateMachine,可讓您以歸納法指定一連串步驟。StateMachine 完成後,step 會傳回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

以下輸出內容說明 StateMachine

hello
world

請注意,由於 step2 符合 StateMachine 的函式介面定義,因此方法參照 this::step2 也是 StateMachine。方法參照是指定 StateMachine 中下一個狀態最常見的方式。

暫停和繼續使用

直覺來說,將運算分解為 StateMachine 步驟 (而非單一函式),可提供暫停恢復運算所需的鉤子。StateMachine.step 傳回時,會有明確的暫停點。由傳回的 StateMachine 值指定的續行點是明確的「resume」點。因此,系統可以從上次結束的地方繼續運算,因此可避免重新運算。

回呼、繼續運作和非同步運算

從技術層面來說,StateMachine接續,用於決定要執行的後續運算。StateMachine 可以透過從 step 函式傳回,將控制權轉移回 Driver 例項,進而自願暫停,而非阻斷。Driver 接著可以切換為已就緒的 StateMachine,或將控制權交回 Skyframe。

傳統上,回呼續接會混淆為一個概念。不過,StateMachine 會保留兩者之間的差異。

  • 回呼:說明非同步運算的結果儲存位置。
  • 接續:指定下一個執行狀態。

呼叫非同步作業時必須使用回呼,這表示在呼叫方法時不會立即執行實際作業,例如 SkyValue 查詢。回呼應盡可能簡單。

接續動作StateMachineStateMachine 傳回值,並且在所有非同步運算解析後,封裝後續的複雜執行作業。這種結構化方法有助於將回呼的複雜度控制在可管理的範圍內。

工作

Tasks 介面會為 StateMachine 提供 API,以便透過 SkyKey 查詢 SkyValue,並排定並行子工作。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue 查詢

StateMachine 會使用 Tasks.lookUp 超載來查詢 SkyValues。它們與 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow 相似,且具有類似的例外狀況處理語意。實作項目不會立即執行查詢,而是會先將查詢分批處理4。值可能無法立即使用,例如需要 Skyframe 重新啟動,因此調用端會使用回呼指定如何處理結果值。

StateMachine 處理器 (Driver 和連結至 SkyFrame) 可確保在下一個狀態開始前提供值。以下提供範例。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上例中,第一個步驟會為 new Key() 執行查詢,並將 this 傳遞給消費者。這是因為 DoesLookup 實作了 Consumer<SkyValue>

根據合約,在下一個狀態 DoesLookup.processValue 開始之前,所有 DoesLookup.step 的查詢都會完成。因此,在 processValue 中存取 value 時,即可使用 value

子工作

Tasks.enqueue 要求執行邏輯上並行的子工作。子工作也是 StateMachine,可以執行一般 StateMachine 可執行的任何操作,包括遞迴建立更多子工作或查詢 SkyValues。與 lookUp 類似,狀態機器人驅動程式會確保所有子工作都完成,再繼續進行下一個步驟。以下提供範例。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

雖然 Subtask1Subtask2 在邏輯上是並行,但所有內容都會在單一執行緒中執行,因此 i 的「並行」更新作業不需要任何同步處理。

結構化並行

由於每個 lookUpenqueue 都必須在進到下一個狀態前解析,因此並行作業自然會限制在樹狀結構中。您可以建立階層式5 並行作業,如以下範例所示。

結構化並行

UML 中很難判斷並行結構是否形成樹狀結構。我們提供其他檢視畫面,可更清楚顯示樹狀結構。

非結構化並行

結構化並行功能更容易分析。

組合和控制流程模式

本節將提供範例,說明如何組合多個 StateMachine,以及解決特定控制流程問題的解決方案。

順序狀態

這是最常見且最直接的控制流程模式。SkyKeyComputeState 內部的有狀態運算中顯示了這項功能的範例。

分支

您可以使用一般 Java 控制流程傳回不同的值,藉此在 StateMachine 中分支狀態,如以下範例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

某些分支會為了提早完成而傳回 DONE,這很常見。

進階順序組合

由於 StateMachine 控制結構沒有記憶體,因此將 StateMachine 定義做為子工作來共用,有時會不方便。讓 M1M2 分別代表共用 StateMachine SStateMachine 例項,其中 M1M2 分別是序列 <A, S, B><X, S, Y>。問題在於 S 不知道是否要繼續執行 BY,且 StateMachine 並未保留呼叫堆疊。本節將介紹幾種實現此目標的方法。

StateMachine 做為終端序列元素

這無法解決提出的問題。只有在共用 StateMachine 是序列終端時,才會示範順序組合。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身是複雜的狀態機器,這項做法也適用。

依序組合的子工作

由於排入佇列的子工作保證會在下一個狀態前完成,因此有時可能會稍微濫用6子工作機制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter 注入

有時,濫用 Tasks.enqueue 是不可能的,因為在 S 執行前,必須先完成其他並行子工作或 Tasks.lookUp 呼叫。在這種情況下,您可以將 runAfter 參數插入 S,以便通知 S 接下來要執行的動作。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

這種做法比濫用子工作更簡潔。不過,如果過度使用這個方法,例如使用 runAfter 巢狀多個 StateMachine,就會導致回呼地獄。建議您改用一般序列狀態來分割序列 runAfter

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以換成下列指令。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止的替代做法:runAfterUnlessError

在先前的草稿中,我們考慮了 runAfterUnlessError,這個函式會在發生錯誤時提早中止。這是因為錯誤通常會檢查兩次:一次由具有 runAfter 參照的 StateMachine 檢查,另一次由 runAfter 機器本身檢查。

經過一番思考後,我們認為代碼的一致性比重複執行錯誤檢查更重要。如果 runAfter 機制無法以與 tasks.enqueue 機制一致的方式運作,就會造成混淆,因為 tasks.enqueue 機制一律需要進行錯誤檢查。

直接委派

每次發生正式狀態轉換時,主 Driver 迴圈就會推進。根據合約規定,進階狀態表示所有先前排入佇列的 SkyValue 查詢和子工作,會在下一個狀態執行前完成。有時委派物件 StateMachine 的邏輯會導致階段提前,造成不必要或不利的結果。舉例來說,如果委派函的首個 step 執行 SkyKey 查詢,且可與委派狀態的查詢並行,則階段提前會使其依序執行。執行直接委派作業可能更有意義,如以下範例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

資料流程

先前的討論重點是控制流程管理。本節說明資料值的傳播方式。

實作 Tasks.lookUp 回呼

以下是實作 SkyValue 查詢中的 Tasks.lookUp 回呼的範例。本節將說明處理多個 SkyValue 的理由,並提供相關處理方式。

Tasks.lookUp 回呼

Tasks.lookUp 方法會將回呼 sink 做為參數。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用做法是使用 Java lambda 實作這項操作:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是執行查詢的 StateMachine 例項成員變數。不過,與在 StateMachine 實作中實作 Consumer<SkyValue> 介面相比,lambda 需要額外的記憶體配置。在有多個查詢會造成歧義的情況下,lambda 仍有其用處。

Tasks.lookUp 也有錯誤處理超載,類似 SkyFunction.Environment.getValueOrThrow

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

實作範例如下所示。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

與未經過錯誤處理的查詢一樣,讓 StateMachine 類別直接實作回呼,可為 lambda 節省記憶體配置。

錯誤處理會提供更多詳細資訊,但基本上,錯誤和正常值的傳播方式並無太大差異。

使用多個 SkyValue

通常需要多次查詢 SkyValue。大部分情況下,開啟 SkyValue 類型的方法都能解決問題。以下是從原型正式版程式碼簡化而成的範例。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

由於值類型不同,因此可以明確共用 Consumer<SkyValue> 回呼實作項目。若非如此,則可改用以 lambda 為基礎的實作項目,或實作適當回呼的完整內部類別例項。

StateMachine 之間傳播值

到目前為止,本文件只說明如何在子工作中安排工作,但子工作也需要將值回報給呼叫端。由於子工作在邏輯上是異步的,因此會透過回呼將結果傳回給呼叫端。為使這項作業順利進行,子工作會定義透過建構函式插入的接收端介面。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼叫端 StateMachine 的內容如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上述範例說明瞭幾件事。Caller 必須將結果傳回,並定義自己的 Caller.ResultSinkCaller 會實作 BarProducer.ResultSink 回呼。恢復後,processResult 會檢查 value 是否為空值,以判斷是否發生錯誤。這是接受子任務或 SkyValue 查詢的輸出內容後的常見行為模式。

請注意,acceptBarError 的實作會根據 錯誤浮現的規定,急切地將結果轉送至 Caller.ResultSink

如要瞭解頂層 StateMachine 的替代方案,請參閱「Driver 和 SkyFunctions 連線」一文。

處理錯誤

Tasks.lookUp 回呼StateMachines 之間傳播值中,已經有幾個錯誤處理範例。除了 InterruptedException 以外的例外狀況不會擲回,而是會透過回呼以值的形式傳遞。這類回呼通常具有或非語意,會傳遞一個值或錯誤。

下一節將說明與 Skyframe 錯誤處理相關的微妙但重要的互動。

錯誤冒泡 (--nokeep_going)

在錯誤冒泡期間,即使並非所有要求的 SkyValues 都可用,SkyFunction 仍可能會重新啟動。在這種情況下,由於 Tasks API 合約,系統永遠不會達到後續狀態。不過,StateMachine 仍應傳播例外狀況。

無論是否已達到下一個狀態,傳播作業都必須執行,因此錯誤處理回呼必須執行這項工作。對於內部 StateMachine,這項操作是透過叫用父項回呼來完成。

在與 SkyFunction 介面的頂層 StateMachine 中,您可以呼叫 ValueOrExceptionProducersetException 方法來執行此操作。ValueOrExceptionProducer.tryProduceValue 隨後會擲回例外狀況,即使有缺少的 SkyValues 也一樣。

如果直接使用 Driver,請務必檢查 SkyFunction 是否傳播錯誤,即使機器尚未完成處理也一樣。

事件處理

對於需要發出事件的 SkyFunction,系統會將 StoredEventHandler 插入 SkyKeyComputeState,並進一步插入需要這些事件的 StateMachine。過去,如果沒有重播,Skyframe 會捨棄特定事件,因此需要使用 StoredEventHandler,但這個問題已在後續修正。StoredEventHandler 插入作業會保留,因為這可簡化從錯誤處理回呼發出的事件實作。

Driver 和 SkyFunctions 的橋接

Driver 負責管理 StateMachine 的執行作業,從指定的根 StateMachine 開始。由於 StateMachine 可遞迴地將子工作 StateMachine 排入佇列,因此單一 Driver 可管理多個子工作。這些子工作會建立樹狀結構,這是結構化並行的結果。Driver 會將 SkyValue 查詢資料批次化,並在各個子工作中執行,以提高效率。

有許多類別是圍繞 Driver 建構,並使用下列 API。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 會採用單一根目錄 StateMachine 做為參數。呼叫 Driver.drive 會在 Skyframe 重新啟動之前,盡可能執行 StateMachine。在 StateMachine 完成時傳回 true,否則傳回 false,表示並非所有值皆可用。

Driver 會保留 StateMachine 的並行狀態,非常適合嵌入 SkyKeyComputeState

直接例項化 Driver

StateMachine 實作通常會透過回呼傳遞結果。您可以直接將 Driver 例項化,如以下範例所示。

Driver 會嵌入 SkyKeyComputeState 實作項目,並在稍後定義對應的 ResultSink 實作項目。在頂層,State 物件是運算結果的適當接收器,因為它保證會比 Driver 長壽。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下程式碼會勾勒 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

接著,用於延後計算結果的程式碼可能會如下所示。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入 Driver

如果 StateMachine 產生值且不會擲回例外狀況,則嵌入 Driver 是另一種可能的實作方式,如以下範例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction 的程式碼可能如下所示 (其中 StateSkyKeyComputeState 的函式特定類型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

Driver 嵌入 StateMachine 實作項目,更適合 Skyframe 的同步程式碼樣式。

可能產生例外的 StateMachine

否則,SkyKeyComputeState 可嵌入的 ValueOrExceptionProducerValueOrException2Producer 類別會提供同步 API,以符合同步 SkyFunction 程式碼。

ValueOrExceptionProducer 抽象類別包含下列方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

它包含一個嵌入的 Driver 例項,並與「嵌入驅動程式」中的 ResultProducer 類別相似,並以類似方式與 SkyFunction 建立介面。實作會在發生上述任一情況時呼叫 setValuesetException,而非定義 ResultSink。如果兩者同時發生,則以例外狀況為優先。tryProduceValue 方法會將非同步回呼程式碼連結至同步程式碼,並在設定回呼程式碼時擲回例外狀況。

如先前所述,在錯誤冒泡期間,即使機器尚未完成,但由於並非所有輸入都可用,因此可能會發生錯誤。為因應此情況,tryProduceValue 會擲回任何已設定的例外狀況,即使在機器完成運作之前也是如此。

結尾:最終移除回呼

StateMachine 是執行非同步運算的一種高效能方法,但會產生大量程式碼。在 Bazel 程式碼的特定部分,延續 (尤其是傳遞至 ListenableFutureRunnable 形式) 相當普遍,但在分析 SkyFunction 中並不常見。分析作業大多受限於 CPU,且沒有有效的異步 API 可用於磁碟 I/O。最終,建議您將回呼最佳化,因為回呼有學習曲線,且會影響可讀性。

Java 虛擬執行緒是最有希望的替代方案之一。不必撰寫回呼,而是以同步的封鎖呼叫取代所有內容。這是因為綁定虛擬執行緒資源 (不像平台執行緒) 應該是低成本的。不過,即使使用虛擬執行緒,以執行緒建立和同步處理原始碼取代簡單的同步作業,成本也太高。我們從 StateMachine 遷移至 Java 虛擬執行緒,但速度卻慢了好幾個數量級,導致端對端分析延遲時間增加了近 3 倍。由於虛擬執行緒仍是預覽功能,因此這項遷移作業可能會在日後效能改善時執行。

另一種可考慮的方法是等待 Loom 協同程式 (如果有提供的話)。這項做法的優點是,您可以透過合作多工處理來減少同步作業的額外負擔。

如果其他方法全都失效,低階位元碼重寫也可能是可行的替代方案。經過充分最佳化後,效能可能會接近手寫回呼程式碼。

附錄

回呼地獄

回呼地獄是使用回呼的非同步程式碼中眾所皆知的問題。這是因為後續步驟的後續動作會嵌套在前一個步驟中。如果有許多步驟,這個巢狀結構可能會非常深。如果與控制流程耦合,程式碼就會變得難以管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

巢狀實作的優點之一,就是可以保留外部步驟的堆疊框架。在 Java 中,擷取的 lambda 變數必須有效地設為最終變數,因此使用這類變數可能會很麻煩。如要避免深層巢狀結構,請將方法參照做為連續動作傳回,而非 lambda,如下所示。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果 runAfter 插入模式使用過於密集,也可能會發生回呼地獄,但只要將插入作業與順序步驟交錯使用,即可避免這種情況。

範例:鏈結 SkyValue 查詢

應用程式邏輯通常需要 SkyValue 查詢的從屬鏈結,例如,如果第二個 SkyKey 依附於第一個 SkyValue。從直觀的角度來看,這會導致複雜的深層巢狀回呼結構。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不過,由於接續會指定為方法參照,因此程式碼會在狀態轉換期間看起來是程序化的:step2 會接在 step1 之後。請注意,此處使用 lambda 指派 value2。這樣一來,程式碼的順序就會與由上而下的運算順序相符。

其他提示

可讀性:執行順序

為提升可讀性,請盡量讓 StateMachine.step 實作項目保持在執行順序中,並在程式碼中傳遞回呼時立即實作。但在控制流程分支時,這不一定可行。在這種情況下,提供其他意見可能會有所幫助。

在「範例:鏈結 SkyValue 查詢」中,我們會建立中間方法參照,以便達成這項目標。這麼做會犧牲一點效能,換取可讀性,在這種情況下,這可能值得。

世代假說

中期存活的 Java 物件會破壞 Java 垃圾收集器的世代假設,後者旨在處理存活時間極短或永久存活的物件。根據定義,SkyKeyComputeState 中的物件違反了這個假設。這類物件包含所有仍在執行的 StateMachine 所建構的樹狀結構,其根目錄為 Driver,在暫停時具有中間生命週期,等待非同步運算完成。

在 JDK19 中,這個問題似乎沒有那麼嚴重,但使用 StateMachine 時,有時 GC 時間會增加,即使實際產生的垃圾減少幅度相當大也一樣。由於 StateMachine 具有中間壽命,因此可能會升級至舊版,導致舊版更快填滿,因此需要進行更耗費資源的主要或完整 GC 才能清理。

最初的預防措施是盡量減少 StateMachine 變數的使用,但這不一定可行,例如如果需要在多個狀態中使用值。在可行情況下,本機堆疊 step 變數是新生代變數,且可有效 GC。

針對 StateMachine 變數,將工作細分為子工作,並遵循StateMachine 之間傳播值的建議模式,也相當實用。請注意,在遵循這個模式時,只有子 StateMachine 有上層 StateMachine 的參照,反之亦然。也就是說,當子項使用結果回呼完成並更新父項時,子項就會自然脫離範圍,並符合 GC 的資格。

最後,在某些情況下,需要在較早的狀態中使用 StateMachine 變數,但在較晚的狀態中則不需要。一旦知道不再需要大型物件時,將大型物件的參照設為空值可能會很有幫助。

命名狀態

命名方法時,通常可以為該方法內發生的行為命名。由於 StateMachine 沒有堆疊,因此不太清楚如何在其中執行這項操作。舉例來說,假設方法 foo 會呼叫子方法 bar。在 StateMachine 中,這可以轉譯為狀態序列 foo,後面接著 barfoo 不再包含 bar 行為。因此,狀態的方法名稱通常範圍較窄,可能會反映本機行為。

並行作業樹狀圖

以下是結構化並行性中圖表的其他檢視畫面,可更清楚呈現樹狀結構。這些區塊會形成一個小樹狀結構。

結構化並行 3D


  1. 這與 Skyframe 慣例相反,後者會在無法取得值時從一開始重新啟動。 

  2. 請注意,step 可擲回 InterruptedException,但範例略過這項操作。Bazel 程式碼中有一些低層方法會擲回此例外狀況,並向上傳播至執行 StateMachineDriver (稍後會加以說明)。在不需要時,不宣告擲回錯誤也沒關係。 

  3. 並行子工作的動機來自 ConfiguredTargetFunction,後者會針對每個依附元件執行獨立的工作。每個依附元件都有各自的獨立 StateMachine,因此不需要操控複雜的資料結構來一次處理所有依附元件,以免造成效率低落。 

  4. 單一步驟中的多個 tasks.lookUp 呼叫會一起批次處理。在並行子工作中發生的查詢,可以建立額外的批次。 

  5. 這在概念上與 Java 的結構化並行處理相似,請參閱 jeps/428。 

  6. 這麼做就像產生執行緒,然後將其加入以實現順序組合。