Canonical ABI Explainer

May 29, 2026 ยท View on GitHub

This document defines the Canonical ABI used to convert between the values and functions of components in the Component Model and the values and functions of modules in Core WebAssembly. See the AST explainer for a walkthrough of the static structure of a component and the concurrency explainer for a high-level description of the concurrency concepts being specified here.

Introduction

The Canonical ABI specifies, for each component function signature, a corresponding core function signature and the process for reading component-level values into and out of linear memory. While a full formal specification would specify the Canonical ABI in terms of macro-expansion into Core WebAssembly instructions augmented with a new set of (spec-internal) administrative instructions, the informal presentation here instead specifies the process in terms of Python code that would be logically executed at validation- and run-time by a component model implementation. The Python code is presented by interleaving definitions with descriptions and eliding some boilerplate. For a complete listing of all Python definitions in a single executable file with a small unit test suite, see the canonical-abi directory.

The convention followed by the Python code below is that all traps are raised by explicit trap()/trap_if() calls; Python assert() statements should never fire and are only included as hints to the reader. Similarly, there should be no uncaught Python exceptions.

While the Python code for lifting and lowering values appears to create an intermediate copy when lifting linear memory into high-level Python values, a real implementation should be able to fuse lifting and lowering into a single direct copy from the source linear memory into the destination linear memory.

Lastly, independently of Python, the Canonical ABI defined below assumes that out-of-memory conditions (such as memory.grow returning -1 from within realloc) will trap (via unreachable). This significantly simplifies the Canonical ABI by avoiding the need to support the complicated protocols necessary to support recovery in the middle of nested allocations. (Note: by nature of eliminating realloc, switching to lazy lowering would obviate this issue, allowing guest wasm code to handle failure by eagerly returning some value of the declared return type to indicate failure.

Component Instances

Once a component has been parsed/decoded and validated, it can be loaded at runtime by instantiating it to produce a component instance. The ComponentInstance class tracks all the spec-internal state that is used by the definitions below to specify the Canonical ABI. For example, all i32 handles to resources, waitables, waitable sets, error contexts and threads will index into the handles or threads fields of ComponentInstance.

class ComponentInstance:
  store: Store
  parent: Optional[ComponentInstance]
  handles: Table[ResourceHandle | Waitable | WaitableSet | ErrorContext]
  threads: Table[Thread]
  may_enter: bool
  may_leave: bool
  backpressure: int
  num_waiting_to_enter: int
  exclusive_thread: Optional[Thread]

  def __init__(self, store, parent = None):
    assert(parent is None or parent.store is store)
    self.store = store
    self.parent = parent
    self.handles = Table()
    self.threads = Table()
    self.may_enter = True
    self.may_leave = True
    self.backpressure = 0
    self.num_waiting_to_enter = 0
    self.exclusive_thread = None

Components are always instantiated in the context of a store (analogous to the Core WebAssembly store) which is saved immutably in the instance's store field. The Store class is defined below as part of the Embedding interface and logically contains all the component instances loaded by the host that can interact with each other. For example, in a browser, all component instances in the same tab that were created via WebAssembly.instantiate or ESM-integration would go into the same store.

When one component uses an instance definition to instantiate another component, the component containing the instance definition is called the parent and the component that gets instantiated is called the child. Components immutably store their parent component or, if instantiated directly by the host, None, in the parent field. Thus, the set of component instances in a store forms a forest rooted by the component instances that were instantiated directly by the host.

The ComponentInstance.may_enter_from, enter_from and leave_to methods defined here are used to guard and record execution entering and exiting a component instance. These methods are used by the Store methods and Task.request_cancellation, defined below, to ensure Component Invariant #2.

  def may_enter_from(self, caller: Optional[ComponentInstance]):
    for inst in self.entering_set(caller):
      if not inst.may_enter:
        return False
    return True

  def enter_from(self, caller: Optional[ComponentInstance]):
    for inst in self.entering_set(caller):
      assert(inst.may_enter)
      inst.may_enter = False

  def leave_to(self, caller: Optional[ComponentInstance]):
    for inst in self.entering_set(caller):
      assert(not inst.may_enter)
      inst.may_enter = True

  def entering_set(self, caller: Optional[ComponentInstance]) -> set[ComponentInstance]:
    if caller:
      return self.self_and_ancestors() - caller.self_and_ancestors()
    else:
      return self.self_and_ancestors()

  def self_and_ancestors(self) -> set[ComponentInstance]:
    s = { self }
    ancestor = self.parent
    while ancestor is not None:
      s.add(ancestor)
      ancestor = ancestor.parent
    return s

In may_enter_from, enter_from and leave_to, the caller parameter is either the caller's ComponentInstance in a component-to-component call or None for a host-to-component call. This caller is used to avoid trapping in the case of a parent component donut wrapping a child component and being reentered by a child component import call which by definition does not violate Component Invariant #2.

To distinguish and allow donut-wrapping-reentrance, we say that entering a component instance C also implicitly enters all of C's transitive parents ("ancestors") but when calling from one component into another, any component instance already entered by the caller (including itself) is subtracted from the set of component instances being entered by the callee because execution is not "entering" but rather "staying inside" those instances held in common.

For example, given a parent component instance P which contains core module instances M1 and M2 and child component instances C1 and C2, may_enter_from allows every call in this callstack to succeed:

     +-------------------------------------------------+
     |                        P                        |
     | +-----------+   +----+   +----+   +-----------+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) | |
     | +-----------+   +----+   +----+   +-----------+ |
     +-------------------------------------------------+

In particular, when the host first calls into P (via lifted M1), P.entering_set(None) is { P }, so P.may_enter is tested and then set to False. When P calls into C1, C1.entering_set(P) is { C1 } (since P.self_and_ancestors() = { P } is subtracted from C1.self_and_ancestors() = { C1, P }) and thus C1.may_enter is tested and set to False. When C1 calls C2, C2.entering_set(C1) is { C2 }, so C2.may_enter is also set to False. And then finally when C2 calls back into P (via lifted M2), P.entering_set(C2) is empty (because C2.self_and_ancestors() = { C2, P } is subtracted from P.self_and_ancestors() = { P }) and thus there is no trap_if(not P.may_enter) (which would have otherwise failed).

If now P tries to call from M2 back into C1 (using the power of call_indirect), there would be a trap, since C1.entering_set(P) is { C1 } and C1.may_enter is already False:

     +-----------------------------------------------------------+
     |                            P                              |
     | +-----------+   +----+   +----+   +-----------+    +----+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) |-X->| C1 | |
     | +-----------+   +----+   +----+   +-----------+    +----+ |
     +-----------------------------------------------------------+

Alternatively, let's say P also contains a third child C3 whose exports are re-exported by P so that they can be called directly by the host. Then if M2 calls back out into the host and the host tries to call C3 directly, it also traps since C3.entering_set(None) is { C3, P } and P.may_enter is already set to False:

     +-------------------------------------------------+       +--------+
     |                            P                    |       |   P    |
     | +-----------+   +----+   +----+   +-----------+ |       | +----+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) |-->host-X->| C3 | |
     | +-----------+   +----+   +----+   +-----------+ |       | +----+ |
     +-------------------------------------------------+       +--------+

From an optimizing compiler's perspective, the set[ComponentInstance] returned by entering_set is known statically when compiling a component-to-component trampoline and thus the compiler can fully unroll the for loops in may_enter_from, enter_from and leave_to into fixed sequences of branches and stores with fixed memory locations for the may_enter flags. Furthermore, because component-to-component reentrance is only possible via donut wrapping and donut wrapping is only possible when a parent component contains a canon lower definition, whenever the compiler sees a component with no canon lower definitions, it can mark the may_enter flags of all its direct children as optimized-out and then completely ignore them. Since donut wrapping is rare, this means that, in practice, only root component instances' may_enter flags will be tested and only for host-to-component or component-to-component calls between different root components (linked by the host). Thus, the overall cost of reentrance should be very low, in exchange for allowing the producer toolchain to not have to safely handle reentrance at every single import call.

The other fields of ComponentInstance are described below as they are used.

Concurrency

The Component Model has native concurrency support as summarized by the concurrency explainer. In the Canonical ABI, concurrency support is defined in 3 layers, defined in the next 3 subsections, resp.:

  • The base layer is Core WebAssembly stack-switching's cont.new, suspend and resume primitives, implemented here in terms of Python's standard threading library.
  • The next layer up is Thread, which stores a mutable, nullable contref, updated over time as the thread suspends and resumes execution, along with some other thread-local state.
  • The top layer is Task, which corresponds to a single cross-component concurrent function call, contains 1..N Threads, and holds the state needed to define the ABI rules for the passing of parameters and results, the dropping of borrowed handles, cancellation and backpressure.

Stack Switching

Component Model concurrency is defined in terms of the Core WebAssembly stack-switching proposal's cont.new, resume and suspend instructions so that there is a clear composition story between Component Model and Core WebAssembly concurrency in the future. Since Python does not natively provide algebraic effects, cont.new, resume and suspend are implemented in this section in terms of other Python primitives. Thus, this section can be skimmed since the semantics of stack-switching are already rigorously defined elsewhere; it's only important to understand the Python function signatures and how they correspond to the stack-switching instructions.

Since the Component Model only needs a subset of the full expressivity of stack-switching, only that subset is implemented, which significantly simplifies things. In particular, the Component Model uses stack-switching in the following restricted manner:

First, there are only two global control tags used with suspend:

(tag $block (param $switch-to (ref null $Thread)) (result $cancelled bool))
(tag $current-thread (result (ref $Thread)))

Consequently, instead of having a single generic Python suspend() function, there are block() and current_thread() Python functions that implement suspend $block and suspend $current-thread, resp.

The $block tag is used to suspend a thread until some future event. The parameters and results will be described in the next section, where they are used to define Thread.

The $current-thread tag is used to retrieve the current thread, which is semantically stored in the resume handler's local state (although an optimizing implementation would instead maintain the current thread in the VM's execution context (or a special Core WebAssembly global) so that it could be cheaply loaded and/or kept in register state).

Second, there is only a single type of continuation passed to resume that corresponds to the $block tag ($current-thread continuations are immediately resumed and never "escape"):

(type $ct (cont (func (param bool) (result (ref null $Thread)))))

Third, every resume performed by the Canonical ABI always handles both $block and $current-thread and every Canonical ABI suspend is, by construction, always scoped by a Canonical ABI resume. Thus, every Canonical ABI suspend unconditionally transfers control flow directly to the innermost enclosing Canonical ABI resume without a general handler/tag search.

Given this restricted usage, specialized versions of cont.new, resume and suspend that are "monomorphized" to the above types and tags are implemented in terms of Python's standard preemptive threading primitives, using threading.Thread to provide a native stack, threading.Lock to only allow a single threading.Thread to execute at a time, and threading.local to maintain the dynamic handler scope using thread-local storage. This could have been implemented more directly and efficiently using fibers, but the Python standard library doesn't have fibers. However, a realistic implementation is expected to use (a pool of) fibers.

Starting with cont.new, the monomorphized version takes a function type matching $ct, as defined above:

class Cancelled(IntEnum):
  FALSE = 0
  TRUE = 1

class Continuation:
  lock: threading.Lock
  handler: Handler
  block_result: Cancelled

class Handler:
  lock: threading.Lock
  current_thread: Thread
  cont: Optional[Continuation]
  block_arg: Optional[Thread]

thread_local_handler = threading.local()

def new_already_acquired_lock() -> threading.Lock:
  lock = threading.Lock()
  lock.acquire()
  return lock

def cont_new(f: Callable[[Cancelled], Optional[Thread]]) -> Continuation:
  cont = Continuation()
  cont.lock = new_already_acquired_lock()
  def thread_base():
    cont.lock.acquire()
    thread_local_handler.value = cont.handler
    block_arg = f(cont.block_result)
    handler = thread_local_handler.value
    handler.cont = None
    handler.block_arg = block_arg
    handler.lock.release()
  threading.Thread(target = thread_base).start()
  return cont

Continuation.block_result and Continuation.handler are set by resume right before resume calls Continuation.lock.release() to transfer control flow to the continuation. After resuming the continuation, resume calls Handler.lock.acquire() to wait until the continuation signals suspension or return by calling Handler.lock.release(). The Handler is stored in thread_local_handler.value to implement the dynamic scoping that is required for suspend. Because the thread created by cont_new can be suspended and resumed many times (each time with a new Continuation and Handler, resp.), Handler must be re-loaded from thread_local_handler.value after f returns since it may have changed since the initial resume.

Next, resume is monomorphized to take a continuation of type $ct, the argument to pass to the continuation, and the Thread to use to implement the (on $current-thread) handler. The remaining (on $block) and "returned" cases join to produce a single return value, with the (on $block) case returning a Continuation + argument passed to suspend $block and the "returned" case returning None + the continuation function's return value.

def resume(cont: Continuation, block_result: Cancelled, current_thread: Thread) -> \
           tuple[Optional[Continuation], Optional[Thread]]:
  handler = Handler()
  handler.lock = new_already_acquired_lock()
  handler.current_thread = current_thread
  cont.handler = handler
  cont.block_result = block_result
  cont.lock.release()
  handler.lock.acquire()
  return (handler.cont, handler.block_arg)

Next, the block function implements suspend $block, taking its signature from the $block tag defined above. Following the locking scheme already established by cont_new and resume, the implementation passes control flow and event arguments back to the parent resume and then waits to be unblocked by a future resume that provides the event results.

def block(switch_to: Optional[Thread]) -> Cancelled:
  cont = Continuation()
  cont.lock = new_already_acquired_lock()
  handler = thread_local_handler.value
  handler.cont = cont
  handler.block_arg = switch_to
  handler.lock.release()
  cont.lock.acquire()
  thread_local_handler.value = cont.handler
  return cont.block_result

Lastly, the current_thread function implements suspend $current-thread, taking its signature from the $current-thread tag defined above. As mentioned above, the handler for $current-thread is hard-coded by the Component Model to simply return the Thread passed to resume which is logically stored as part of the handler's local state. Thus, the handler can be "inlined" at the suspend-site by simply returning the Thread stored in thread-local storage by resume. The other two functions are simply helpers for deriving the current task and component instance from the current thread.

def current_thread() -> Thread:
  return thread_local_handler.value.current_thread

def current_task() -> Task:
  return current_thread().task

def current_instance() -> ComponentInstance:
  return current_task().inst

Once Core WebAssembly gets stack-switching, the Component Model's $block and $current-thread tags would not be exposed to Core WebAssembly. Thus, an optimizing implementation would continue to be able to implement block() as a direct control flow transfer and current_thread() with implicit execution context, both without a general handler/tag search. In particular, this avoids the pathological O(N2) behavior which would otherwise arise if Component Model cooperative threads were used in conjunction with deeply-nested Core WebAssembly handlers.

Additionally, once Core WebAssembly has stack switching, any unhandled events that originate in Core WebAssembly would turn into traps if they reach a component boundary (just like unhandled exceptions do now; see call_and_trap_on_throw below). Thus, all cross-component/cross-language stack switching would continue to be mediated by the Component Model's types and Canonical ABI, with Core WebAssembly stack-switching used to implement intra-component concurrency according to the language's own internal ABI.

Threads

As described in the concurrency explainer, threads are created both implicitly, when calling a component export (in canon_lift below), and explicitly, when core wasm code calls the thread.new-indirect built-in (in canon_thread_new_indirect below). While threads are logically created for each component export call, by design, non-async function calls and non-async-lowered calls to non-async-lifted async function calls can be implemented by the engine as a normal synchronous native function call.

Additionally, the engine can allocate the per-thread/task ABI state below lazily to avoid overhead on small cross-component calls. To assist in this optimization, threads are put into their own ComponentInstance.threads table to reduce interference from the other kinds of handles.

Threads are represented in the Canonical ABI by the Thread class defined in this section. The Thread class is implemented in terms of the stack-switching primitives defined in the previous section and contains an optional mutable continuation that is set and cleared whenever a thread is suspended and resumed, resp. On top of the basic suspend and resume operations, Thread adds the following higher-level concurrency concepts:

Introducing the Thread class in chunks, a Thread can be in one of the following 3 states:

  • running: actively executing on the stack (thus having no continuation)
  • suspended: waiting to be resumed by another thread running in the same component instance
  • waiting: waiting to be resumed nondeterministically by the host after some condition is met, with ready and non-ready sub-states, depending on whether the condition is met.
class Thread:
  cont: Optional[Continuation]
  ready_func: Optional[Callable[[], bool]]
  task: Task
  cancellable: bool
  index: Optional[int]
  storage: tuple[int,int]

  def running(self):
    return self.cont is None

  def suspended(self):
    return not self.running() and self.ready_func is None

  def waiting(self):
    return not self.running() and self.ready_func is not None

  def ready(self):
    return self.waiting() and self.ready_func()

When a Thread is created, a new continuation is created for thread_func (wrapping the thread_func with cont_func to exactly match the function type expected by cont_new) and leaving the thread initially in the suspended state.

  def __init__(self, task, thread_func):
    def cont_func(cancelled):
      assert(self.running() and not cancelled)
      thread_func()
      return None
    self.cont = cont_new(cont_func)
    self.ready_func = None
    self.task = task
    self.cancellable = False
    self.index = None
    self.storage = [0,0]
    assert(self.suspended())

The next two Thread methods are only called by Thread methods below to add and remove a thread to the Store.waiting list at the same time as setting and clearing, resp., the readiness function that Store.tick (defined below) will test repeatedly to determine when the thread is ready to be resumed.

  def start_waiting_internal(self, ready_func):
    assert(not self.waiting() and not self.ready_func)
    self.ready_func = ready_func
    self.task.inst.store.waiting.append(self)

  def stop_waiting_internal(self, cancelled):
    assert(self.waiting() and self.ready_func)
    assert(cancelled or self.ready())
    self.ready_func = None
    self.task.inst.store.waiting.remove(self)

One way to allow a newly-created thread to start executing is for core wasm to call the thread.resume-later built-in. This built-in does not immediately switch execution to the thread but instead transitions the thread to the ready waiting state, so that it can be Thread.resumed immediately.

  def resume_later(self):
    assert(self.suspended())
    self.start_waiting_internal(lambda: True)
    assert(self.ready())

Once its time to execute a suspended or waiting thread, Thread.resume is called on that thread. This method transitions the thread to the running state by clearing and then resumeing the Thread's stored continuation. If the resumed continuation suspends with a Thread to switch_to, Thread.resume will resume that Thread's continuation, and so on, repeatedly, until the continuation either returns or suspends with no thread to switch_to.

  def resume(self, cancelled = Cancelled.FALSE):
    assert(not self.running() and (self.cancellable or not cancelled))
    if self.waiting():
      self.stop_waiting_internal(cancelled)
    thread = self
    while thread is not None:
      cont = thread.cont
      thread.cont = None
      (thread.cont, switch_to) = resume(cont, cancelled, thread)
      thread = switch_to
      cancelled = Cancelled.FALSE

The Thread.resume method passes cancellation requests (from Task.request_cancellation defined below) to the continuation being resumed, allowing a thread that opted-in to being cancellable to be resumed even if it's not ready.

Note that the while loop shown above is effectively implementing the switch instruction of the stack-switching proposal since switch is just an optimization of suspend followed by resume. The non-optimized version is used here to simplify storing of the new Continuation into Thread.cont. However, an optimized implementation could do the direct switch.

The next two Thread methods are only called by Thread methods below to suspend with the block effect (defined in the preceding section). Thread.block_internal passes no thread to switch_to and so causes Thread.resume to actually block. In contrast, Thread.switch_to_internal passes a thread to switch_to, causing the loop in Thread.resume to directly switch to that thread without blocking.

  def block_internal(self, cancellable):
    self.cancellable = cancellable
    cancelled = block(switch_to = None)
    assert(self.running() and (cancellable or not cancelled))
    return cancelled

  def switch_to_internal(self, cancellable, other):
    self.cancellable = cancellable
    cancelled = block(switch_to = other)
    assert(self.running() and (cancellable or not cancelled))
    return cancelled

The cancellable parameters in these methods indicate whether the caller is prepared to handle cancellation. If cancellable is false for all of a task's threads, the cancellation request will be stored in Task.state and delivered the next time Task.deliver_pending_cancel() is called with cancellable set by one of the Thread methods below.

Once a thread is Thread.resume()ed and starts executing, it can suspend its execution by calling the thread.suspend built-in which calls Thread.suspend here. Thread.suspend first attempts to deliver any pending cancellation requests and then otherwise simply blocks.

  def suspend(self, cancellable) -> Cancelled:
    assert(self.running())
    if self.task.deliver_pending_cancel(cancellable):
      return Cancelled.TRUE
    return self.block_internal(cancellable)

The Thread.wait_until method is used by all the synchronous blocking built-ins, as well as auto-backpressure and the callback event loop, to wait until a particular readiness condition is met. Given wait_until, "yielding" can simply be defined as waiting on a readiness condition that is already met.

  def wait_until(self, ready_func, cancellable = False) -> Cancelled:
    assert(self.running())
    if self.task.deliver_pending_cancel(cancellable):
      return Cancelled.TRUE
    if ready_func() and not DETERMINISTIC_PROFILE and random.randint(0,1):
      return Cancelled.FALSE
    self.start_waiting_internal(ready_func)
    return self.block_internal(cancellable)

  def yield_(self, cancellable) -> Cancelled:
    return self.wait_until(lambda: True, cancellable)

As with Thread.suspend, before anything else, wait_until reports any pending cancellation requests if the caller is cancellable. The randomint conjunct on the early return if ready_func() is already True means that, at any potential suspension point, the embedder can nondeterministically decide whether to switch to another thread or keep running the current one. In particular, when a caller makes an async call to a callee which wait_untils a condition that's already met (e.g. in the case of yield), the embedder can use scheduling heuristics to decide whether or not to block the current thread.

The Thread.suspend_then_resume and Thread.yield_then_resume methods immediately resume execution of some other suspended thread in the same component instance, leaving the original thread in either a suspended or ready waiting state, resp. Like other Thread methods, these methods first report any pending cancellation if the caller is cancellable.

  def suspend_then_resume(self, cancellable, other: Thread) -> Cancelled:
    assert(self.running() and other.suspended())
    if self.task.deliver_pending_cancel(cancellable):
      return Cancelled.TRUE
    return self.switch_to_internal(cancellable, other)

  def yield_then_resume(self, cancellable, other: Thread) -> Cancelled:
    assert(self.running() and other.suspended())
    if self.task.deliver_pending_cancel(cancellable):
      return Cancelled.TRUE
    self.start_waiting_internal(lambda: True)
    return self.switch_to_internal(cancellable, other)

Lastly, the Thread.suspend_then_promote and Thread.yield_then_promote methods attempt to immediately resume execution of some other thread in the same component instance if the other thread is in a ready waiting state. If so, control flow is transferred directly and the current thread is left suspended or in a ready waiting state, resp. If the other thread is not ready to run, then these operations fall back to plain suspend or yield_ behavior, resp.

  def suspend_then_promote(self, cancellable, other: Thread) -> Cancelled:
    assert(self.running())
    if other.ready():
      other.stop_waiting_internal(cancelled = False)
      return self.suspend_then_resume(cancellable, other)
    else:
      return self.suspend(cancellable)

  def yield_then_promote(self, cancellable, other: Thread) -> Cancelled:
    assert(self.running())
    if other.ready():
      other.stop_waiting_internal(cancelled = False)
      return self.yield_then_resume(cancellable, other)
    else:
      return self.yield_(cancellable)

Tasks

As described in the concurrency explainer, a "task" is created for each call to a component export (in canon_lift below). Each task contains 1..N threads that execute on behalf of the task, starting with the implicit thread that is spawned by canon_lift and transitively including additional explicit threads spawned by the implicit thread via thread.new-indirect. Tasks contain internal state that is used to ensure (via trapping guards) that guest code obeys the Canonical ABI rules.

At a high-level, all cross-component calls funnel through the same FuncInst spec-level function type, where the host can be the caller, the callee or even (in rare re-export cases) both:

OnStart = Callable[[], list[any]]
OnResolve = Callable[[Optional[list[any]]], None]
OnCancel = Callable[[], None]
FuncInst = Callable[[OnStart, OnResolve, Optional[ComponentInstance]], OnCancel]

The three parameters of FuncInst are:

  • an OnStart callback that is called by the callee when it is ready to receive its arguments after waiting for any backpressure to subside;
  • an OnResolve callback that is called by the callee when it is ready to return its value or, if cancellation has been requested, None.
  • the caller's ComponentInstance, if the caller is not the host

Critically, if the callee blocks at the wasm level, the spec-level FuncInst returns immediately to the caller while continuing to execute the callee in a separate Thread. The OnStart and OnResolve callbacks may be called at any time before or after the callee returns. If the callee returns and the OnResolve callback has not yet been called, the caller may invoke the returned OnCancel callback at most once to cooperatively request that the callee "hurry up" and call OnResolve (possibly, but not necessarily, passing None and/or skipping the call to OnStart).

When FuncInst is implemented by wasm guest code (as opposed to the host), each call creates a Task object to track the state of the call and ensure that the wasm guest code adheres to the above FuncInst calling convention (or else traps). Task is introduced in chunks, starting with fields and initialization:

class Task:
  class State(Enum):
    INITIAL = 1
    STARTED = 2
    PENDING_CANCEL = 3
    CANCEL_DELIVERED = 4
    RESOLVED = 5

  ft: FuncType
  opts: CanonicalOptions
  inst: ComponentInstance
  on_start: OnStart
  on_resolve: OnResolve
  caller: Optional[ComponentInstance]
  state: State
  num_borrows: int
  implicit_thread: Optional[Thread]
  threads: list[Thread]

  def __init__(self, ft, opts, inst, on_start, on_resolve, caller):
    self.ft = ft
    self.opts = opts
    self.inst = inst
    self.on_start = on_start
    self.on_resolve = on_resolve
    self.caller = caller
    self.state = Task.State.INITIAL
    self.num_borrows = 0
    self.implicit_thread = None
    self.threads = []

The Task.needs_exclusive predicate returns whether this task's implicit thread (Task.implicit_thread) has not opted in to multiple concurrent linear memory shadow stacks (via "stackful" lift) and thus, according to Component Invariant #3, requires serialization with all the other implicit threads in the component instance that have similarly not opted in. This question only applies to async-typed functions, since synchronous functions can't block and thus can always execute in a LIFO fashion using a single linear memory shadow stack. When needs_exclusive is true, core wasm execution is gated on acquiring the ComponentInstance.exclusive_thread lock. Due to cooperativity, the exclusive_thread "lock" is simply a mutable field holding either None, when unlocked, or, when locked, a reference to the Task.implicit_thread currently holding the lock.

  def needs_exclusive(self):
    assert(self.ft.async_)
    return not self.opts.async_ or self.opts.callback

The Task.enter_implicit_thread method implements backpressure between when the caller of an async-typed function initiates the call and when the callee's core wasm entry point is executed. This interstitial placement allows a component instance that has been overloaded with concurrent function invocations to avoid OOM. When backpressure is enabled, enter_implicit_thread will block new async-typed calls until backpressure is disabled. There are three sources of backpressure:

  1. Explicit backpressure is triggered by core wasm calling backpressure.{inc,dec} which modify the ComponentInstance.backpressure counter.
  2. Implicit backpressure triggered when Task.needs_exclusive() is true and the ComponentInstance.exclusive_thread lock is already held.
  3. Residual backpressure triggered by explicit or implicit backpressure having been enabled then disabled, but there still being tasks waiting to enter that need to be given the chance to start without getting starved by new tasks.

Note that, because non-async-typed functions ignore backpressure entirely, they may reenter core wasm when an async-typed function would have been blocked by implicit backpressure. Thus, export bindings generators must be careful to handle this possibility (e.g., while maintaining the linear-memory shadow stack pointer) for components with mixed async- and non-async- typed exports.

  def enter_implicit_thread(self):
    assert(self.state == Task.State.INITIAL)
    self.implicit_thread = current_thread()
    if self.ft.async_:
      def has_backpressure():
        return (self.inst.backpressure > 0 or
                (self.needs_exclusive() and self.inst.exclusive_thread is not None))
      if has_backpressure() or self.inst.num_waiting_to_enter > 0:
        self.inst.num_waiting_to_enter += 1
        cancelled = self.implicit_thread.wait_until(lambda: not has_backpressure(), cancellable = True)
        self.inst.num_waiting_to_enter -= 1
        if cancelled:
          self.cancel()
          return False
      if self.needs_exclusive():
        assert(self.inst.exclusive_thread is None)
        self.inst.exclusive_thread = self.implicit_thread
    self.register_thread(self.implicit_thread)
    return True

  def register_thread(self, thread):
    assert(thread not in self.threads and thread.task is self)
    self.threads.append(thread)
    assert(thread.index is None)
    thread.index = self.inst.threads.add(thread)

Since the order in which suspended threads are resumed is nondeterministic (see Store.tick below), once Task.enter_implicit_thread suspends the task's implicit thread due to backpressure, the above definition allows the host to arbitrarily select which threads to resume in which order. Additionally, the above definition ensures the following properties:

  • While a callee is waiting to enter, if the caller requests cancellation, the callee is immediately cancelled.
  • When backpressure is disabled then reenabled, no new tasks start, even tasks that were blocked and then unblocked by the first occurrence of backpressure (i.e., disabling backpressure never unleashes an unstoppable thundering herd of pending tasks).

Once a task's implicit thread has cleared the backpressure gate, it is added to the lists of threads running inside the current task and component instance by Task.register_thread() (which is also called by thread.new-indirect, below).

Symmetrically, the Task.exit_implicit_thread method is called before a task's implicit thread returns to reverse the effects of Task.enter_implicit_thread. In particular, if the exclusive_thread lock was acquired, it is released. Task.unregister_thread (which is also called by thread.new-indirect, below) traps if the task's last thread is unregistered and the task has not yet returned a value to its caller.

  def exit_implicit_thread(self):
    assert(current_thread() is self.implicit_thread)
    self.unregister_thread(self.implicit_thread)
    if self.ft.async_ and self.needs_exclusive():
      assert(self.inst.exclusive_thread is self.implicit_thread)
      self.inst.exclusive_thread = None

  def unregister_thread(self, thread):
    assert(thread in self.threads and thread.task is self)
    self.threads.remove(thread)
    if len(self.threads) == 0:
      trap_if(self.state != Task.State.RESOLVED)
      assert(self.num_borrows == 0)
    assert(thread.index is not None)
    self.inst.threads.remove(thread.index)

The Task.request_cancellation method is called by the host or wasm caller to signal that they don't need the return value and that the callee should hurry up and call the OnResolve callback. If a task's implicit thread is waiting to start (in Task.enter_implicit_thread) due to backpressure, then it is immediately cancelled without running any guest code. Otherwise, if any of a cancelled task's threads are expecting cancellation (e.g., when an async callback export returns to the event loop or when waitable-set.wait or a thread.* built-in is called with cancellable set), request_cancellation considers resuming that thread (picking one nondeterministically if there are multiple), giving the thread the chance to handle cancellation promptly so that subtask.cancel completes without blocking.

  def request_cancellation(self):
    assert(not self.caller or self.caller is current_instance())
    if self.state == Task.State.INITIAL:
      self.state = Task.State.CANCEL_DELIVERED
      self.implicit_thread.resume(Cancelled.TRUE)
    else:
      assert(self.state == Task.State.STARTED)
      candidates = { t for t in self.threads if t.cancellable }
      if self.needs_exclusive() and self.inst.exclusive_thread not in { None, self.implicit_thread }:
        candidates.discard(self.implicit_thread)
      if candidates and self.inst.may_enter_from(self.caller):
        self.state = Task.State.CANCEL_DELIVERED
        self.inst.enter_from(self.caller)
        random.choice(list(candidates)).resume(Cancelled.TRUE)
        self.inst.leave_to(self.caller)
      else:
        self.state = Task.State.PENDING_CANCEL

As handled above, cancellation must additionally avoid resuming a cancellable thread when doing so would violate Component Invariant #2 or #3. In particular, invariant #2 requires not resuming any thread while the task's containing component instance may not be reentered and invariant #3 requires not resuming a needs_exclusive task's implicit thread while another task's implicit thread is running exclusively.

If cancellation cannot be immediately delivered by Task.request_cancellation, the request is remembered in Task.state and delivered at the next opportunity by Task.deliver_pending_cancel, which is checked at all cancellation points:

  def deliver_pending_cancel(self, cancellable) -> bool:
    if cancellable and self.state == Task.State.PENDING_CANCEL:
      self.state = Task.State.CANCEL_DELIVERED
      return True
    return False

The Task.start method is called by canon_lift to get the list of component-level values that will be subsequently lowered into the callee component's memory. Since it is called involuntarily after backpressure, before guest code runs, the definition can assert that it is called correctly.

  def start(self) -> list[any]:
    assert(self.state == Task.State.INITIAL)
    self.state = Task.State.STARTED
    return self.on_start()

The Task.return_ method is called by canon_task_return and canon_lift to return a list of lifted values to the task's caller via the OnResolve callback. There is a dynamic error if the callee has not dropped all borrowed handles by the time task.return is called which means that the caller can assume that all its lent handles have been returned to it when it receives the SUBTASK RETURNED event. Note that the initial trap_if allows a task to return a value even after cancellation has been requested.

  def return_(self, result):
    trap_if(self.state == Task.State.RESOLVED)
    trap_if(self.num_borrows > 0)
    assert(result is not None)
    self.on_resolve(result)
    self.state = Task.State.RESOLVED

Lastly, the Task.cancel method is called by canon_task_cancel and enforces the same num_borrows condition as return_, ensuring that when the caller's OnResolve callback is called, the caller knows all borrows have been returned. The initial trap_if only allows cancellation after cancellation has been delivered to core wasm. In particular, if request_cancellation cannot synchronously deliver cancellation and sets Task.state to PENDING_CANCEL, core wasm will still trap if it tries to call task.cancel.

  def cancel(self):
    trap_if(self.state != Task.State.CANCEL_DELIVERED)
    trap_if(self.num_borrows > 0)
    self.on_resolve(None)
    self.state = Task.State.RESOLVED

Embedding

A WebAssembly Component Model implementation will typically be embedded into a host environment. An embedder implements the connection between such a host environment and the Component Model semantics defined here. A full Embedding interface would contain functions for decoding, validating, instantiating and interrogating components, just like the Core WebAssembly Embedding. However, the Embedding interface here just covers the subset that is necessary to define the behavior of the Canonical ABI.

The Embedding interface is defined as the methods of the Store class, which is the Component Model's version of a Core WebAssembly store. Defining Store in chunks, the Store constructor is analogous to Core WebAssembly store_init and defines the initial state of the Store:

class Store:
  waiting: list[Thread]
  nesting_depth: int

  def __init__(self):
    self.waiting = []
    self.nesting_depth = 0

The waiting field is populated by Thread methods, as defined above, and the nesting_depth field is purely a specification device used by Store methods below to define the valid host call interleavings.

The Store.invoke method is analogous to Core WebAssembly's func_invoke and takes a FuncInst (analogous to a Core WebAssembly funcinst) along with its runtime OnStart and OnResolve arguments (which are described above alongside their definitions). The Store.nesting_depth field tracks whether there are any active Store.invoke calls for the benefit of Store.tick, defined below.

  def invoke(self, f: FuncInst, on_start: OnStart, on_resolve: OnResolve) -> OnCancel:
    self.nesting_depth += 1
    on_cancel = f(on_start, on_resolve, caller = None)
    self.nesting_depth -= 1
    return on_cancel

The FuncInst passed to Store.invoke can be either a guest function (produced by Store.lift, defined next) or (in the special case of component re-exports) a host function. Symmetrically, FuncInsts can be called either from the host (via Store.invoke) or core wasm code (via Store.lower). Store.invoke passes a None caller to signal that the host is the caller.

The Store.lift method is called for each canon lift definition in a component to wrap a core wasm CoreFuncInst into a component-level FuncInst, passing canon lift's immediate arguments as well as the containing component instance. Similarly, Store.lower is called for each canon lower definition in a component to wrap a component-level FuncInst into a core wasm CoreFuncInst. (In a complete Embedding API, Store.lift and Store.lower would be replaced by a single, higher-level Store.instantiate method of type Component -> ComponentInstance, analogous to the Core WebAssembly's module_instantiate. But for the Canonical ABI, just lift and lower are sufficient to define relevant ABI behavior.)

  CoreFuncInst = Callable[[list[CoreValType]], list[CoreValType]]

  def lift(self, f: CoreFuncInst, ft: FuncType, opts: CanonicalOptions, inst: ComponentInstance) -> FuncInst:
    def func_inst(on_start: OnStart, on_resolve: OnResolve, caller: Optional[ComponentInstance]) -> OnCancel:
      assert(not caller or caller is current_instance())
      trap_if(not inst.may_enter_from(caller))
      inst.enter_from(caller)
      on_cancel = canon_lift(f, ft, opts, inst, on_start, on_resolve, caller)
      inst.leave_to(caller)
      return on_cancel
    return func_inst

  def lower(self, f: FuncInst, ft: FuncType, opts: CanonicalOptions, inst: ComponentInstance) -> CoreFuncInst:
    def core_func_inst(args: list[CoreValType]) -> list[CoreValType]:
      assert(inst is current_instance())
      assert(all(not i.may_enter for i in inst.self_and_ancestors()))
      results = canon_lower(f, ft, opts, args)
      assert(all(not i.may_enter for i in inst.self_and_ancestors()))
      return results
    return core_func_inst

Before entering a component via core wasm export call, the FuncInst wrapper produced by Store.lift traps if entering the component would violate Component Invariant #2, and then records that the instance was entered by calling ComponentInstance.enter_from. The rest of the trampoline is defined by canon_lift below. Importantly though, canon_lift will return immediately if it blocks, thereby calling ComponentInstance.leave_to and allowing reentrance (via Store.invoke or Store.tick) without trapping.

Before temporarily leaving a component via core wasm import call, the CoreFuncInst wrapper produced by Store.lower asserts that the may_enter flags of the current component instance and all its ancestors are already False (as set by ComponentInstance.enter_from in Store.lift). Thus, by default, reentrance is disallowed. However, if the lowered FuncInst callee blocks before returning a value and the canon lower definition didn't specify the async ABI option (which opts in to the non-blocking async ABI), canon_lower will block until the callee returns (via Thread.wait_until, defined above) which will suspend the current thread and return from canon_lift to Store.lift which then calls ComponentInstance.leave_to to enable reentrance for as long as Thread.wait_until stays blocked. Thus, in accordance with Component Invariant #2, synchronous (blocking) calls to async-typed function imports may be reentered during canon_lower.

Lastly, the Store.tick method does not have an analogue in Core WebAssembly but is necessary to enable native concurrency support in the Component Model. Store.tick allows the runtime to nondeterministically resume a thread that previously [blocked] but is now ready. As defined above, Thread.resume will execute the thread until it either returns or blocks again. Thus, each call to Store.tick just allows a single thread to make a single quantum of cooperative progress and the expectation is that the host heuristically interleaves calls to Store.invoke with calls to Store.tick so that concurrent tasks can complete while new tasks are being started.

  def tick(self):
    assert(self.nesting_depth == 0)
    assert(all(thread.task.inst.may_enter_from(None) for thread in self.waiting))
    self.nesting_depth += 1
    candidates = { thread for thread in self.waiting if thread.ready() }
    if candidates:
      thread = random.choice(list(candidates))
      thread.task.inst.enter_from(None)
      thread.resume()
      thread.task.inst.leave_to(None)
    self.nesting_depth -= 1

As shown above, Store.nesting_depth is greater than zero while calling Store.invoke and thus the first assert prohibits the host from calling Store.tick during an active Store.invoke. This prohibition ensures that the second assert holds, which is that all component instances in the store can be (re)entered. If this were not the case, a random thread might be resumed while one of its imports' component instances was on the stack and not reenterable, leading to a spurious trap when it was called.

Lifting and Lowering Context

Most Canonical ABI definitions depend on some ambient information which is established by the canon lift- or canon lower-defined function that is being called:

  • the ABI options supplied via canonopt
  • the containing component instance
  • the Task or Subtask used to lower or lift, resp., borrow handles

These three pieces of ambient information are stored in an LiftLowerContext object that is threaded through all the Python functions below as the cx parameter/field.

class LiftLowerContext:
  opts: LiftLowerOptions
  inst: ComponentInstance
  borrow_scope: Optional[Task|Subtask]

  def __init__(self, opts, inst, borrow_scope = None):
    self.opts = opts
    self.inst = inst
    self.borrow_scope = borrow_scope

The borrow_scope field may be None if the types being lifted/lowered are known to not contain borrow.

Canonical ABI Options

The following classes list the various Canonical ABI options (canonopt) that can be set on various Canonical ABI definitions. The default values of the Python fields are the default values when the associated canonopt is not present in the binary or text format definition.

The MemInst class represents a core WebAssembly memory instance, with bytes corresponding to the memory's bytes and addrtype coming from the memtype.

def ptr_size(ptr_type):
  match ptr_type:
    case 'i32':
      return 4
    case 'i64':
      return 8

@dataclass
class MemInst:
  bytes: bytearray
  addrtype: Literal['i32', 'i64']

  def __getitem__(self, i):
    return self.bytes[i]

  def __setitem__(self, i, v):
    self.bytes[i] = v

  def __len__(self):
    return len(self.bytes)

  def ptr_type(self):
    return self.addrtype

  def ptr_size(self):
    return ptr_size(self.ptr_type())

The ptr_type and ptr_size methods return the core value type and byte size of memory pointers.

The LiftOptions class contains the subset of canonopt which are needed when lifting individual parameters and results:

@dataclass
class LiftOptions:
  string_encoding: str = 'utf8'
  memory: Optional[MemInst] = None

  def equal(lhs, rhs):
    return lhs.string_encoding == rhs.string_encoding and \
           lhs.memory is rhs.memory

The equal static method is used by task.return below to dynamically compare equality of just this subset of canonopt.

The LiftLowerOptions class contains the subset of canonopt which are needed when lifting or lowering individual parameters and results:

@dataclass
class LiftLowerOptions(LiftOptions):
  realloc: Optional[Callable] = None

The CanonicalOptions class contains the rest of the canonopt options that affect how an overall function is lifted/lowered:

@dataclass
class CanonicalOptions(LiftLowerOptions):
  post_return: Optional[Callable] = None
  async_: bool = False
  callback: Optional[Callable] = None

Runtime State

The following Python classes define spec-internal state and utility methods that are used to define the externally-visible behavior of Canonical ABI's lifting, lowering and built-in definitions below. These fields are chosen for simplicity over performance and thus an optimizing implementation is expected to use a more optimized representations as long as it preserves the same externally-visible behavior. Some specific examples of expected optimizations are noted below.

Table State

The Table class encapsulates a mutable, growable array of opaque elements that are represented in Core WebAssembly as i32 indices into the array. Currently, every component instance contains two tables: a threads table containing all the component's threads and a handles table containing everything else (resource handles, waitables and waitable sets and error contexts).

class Table:
  array: list[any]
  free: list[int]

  MAX_LENGTH = 2**28 - 1

  def __init__(self):
    self.array = [None]
    self.free = []

  def __iter__(self):
    for e in self.array:
      if e is not None:
        yield e

  def get(self, i):
    trap_if(i >= len(self.array))
    trap_if(self.array[i] is None)
    return self.array[i]

  def add(self, e):
    if self.free:
      i = self.free.pop()
      assert(self.array[i] is None)
      self.array[i] = e
    else:
      i = len(self.array)
      trap_if(i > Table.MAX_LENGTH)
      self.array.append(e)
    return i

  def remove(self, i):
    e = self.get(i)
    self.array[i] = None
    self.free.append(i)
    return e

Table maintains a dense array of elements that can contain holes created by the remove method (defined above). When table elements are accessed (e.g., by canon_lift and resource.rep, below), there are thus both a bounds check and hole check necessary. Upon initialization, table element 0 is allocated and set to None, effectively reserving index 0 which is both useful for catching null/uninitialized accesses and allowing 0 to serve as a sentinel value.

The add and remove methods work together to maintain a free list of holes that are used in preference to growing the table. The free list is represented as a Python list here, but an optimizing implementation could instead store the free list in the free elements of array.

The limit of 2**28 ensures that the high 4 bits of table indices are unset and available for other use in guest code (e.g., for tagging, packed words or sentinel values).

Resource State

The ResourceHandle class defines the elements of the component instance's handles table used to represent handles to resources:

class ResourceHandle:
  rt: ResourceType
  rep: int
  own: bool
  borrow_scope: Optional[Task]
  num_lends: int

  def __init__(self, rt, rep, own, borrow_scope = None):
    self.rt = rt
    self.rep = rep
    self.own = own
    self.borrow_scope = borrow_scope
    self.num_lends = 0

The rt and rep fields of ResourceHandle store the resource type and representation of the resource. The rep field is currently fixed to be an i32, but will be generalized in the future to other types.

The own field indicates whether this element was created from an own type (or, if false, a borrow type).

The borrow_scope field stores the Task that lowered the borrowed handle as a parameter. When a component only uses sync-lifted exports, due to lack of reentrance, there is at most one Task alive in a component instance at any time and thus an optimizing implementation doesn't need to store the Task per ResourceHandle.

The num_lends field maintains a conservative approximation of the number of live handles that were lent from this handle (by calls to borrow-taking functions). This count is maintained by the Subtask bookkeeping functions (below) and is ensured to be zero when an own handle is dropped.

The ResourceType class represents a runtime instance of a resource type that has been created either by the host or a component instance (where multiple component instances of the same static component create multiple ResourceType instances). ResourceType Python object identity is used by trapping guards on the rt field of ResourceHandle (above) and thus resource type equality is not defined structurally (on the contents of ResourceType).

class ResourceType(Type):
  impl: ComponentInstance
  dtor: Optional[Callable]

  def __init__(self, impl, dtor = None):
    self.impl = impl
    self.dtor = dtor

Waitable State

A "waitable" is a concurrent activity that can be waited on by the built-ins waitable-set.wait and waitable-set.poll. Currently, there are 5 different kinds of waitables: subtasks and the 4 combinations of the readable and writable ends of futures and streams.

Waitables deliver "events" which are values of the following EventTuple type. The two int "payload" fields of EventTuple store core wasm i32s and are to be interpreted based on the EventCode. The meaning of the different EventCodes and their payloads will be introduced incrementally below by the code that produces the events (specifically, in subtask_event, stream_event or future_event).

class EventCode(IntEnum):
  NONE = 0
  SUBTASK = 1
  STREAM_READ = 2
  STREAM_WRITE = 3
  FUTURE_READ = 4
  FUTURE_WRITE = 5
  TASK_CANCELLED = 6

EventTuple = tuple[EventCode, int, int]

The Waitable class factors out the state and behavior common to all 5 kinds of waitables, which are each defined as subclasses of Waitable below.

Every Waitable can store at most one pending event in its pending_event field which will be delivered to core wasm as soon as the core wasm code explicitly waits on this Waitable (which may take an arbitrarily long time). A pending_event is represented in the Python code below as a closure so that the closure can specify behaviors that trigger right before events are delivered to core wasm and so that the closure can compute the event based on the state of the world at delivery time (as opposed to when pending_event was first set). Currently, pending_event holds a closure of the subtask_event, stream_event or future_event functions defined below. An optimizing implementation would avoid closure allocation by inlining a union containing the closure fields directly in the component instance table.

A waitable can belong to at most one "waitable set" (defined next) which is referred to by the wset field. A Waitable's pending_event is delivered (via get_pending_event) when core wasm code waits on its waitable set (via waitable-set.wait or, when using callback, by returning to the event loop).

Lastly, a waitable cannot be waited on both asynchronously (via waitable set) and synchronously (via synchronous subtask.cancel or {stream,future}.{,cancel-}{read,write}) since this raises the possibility that the waitable set "steals" events from the synchronous waiter, leaving the synchronous waiter forever waiting. This condition is asserted by the Waitable methods here and guarded via traps by the relevant built-ins below.

class Waitable:
  pending_event: Optional[Callable[[], EventTuple]]
  wset: Optional[WaitableSet]
  has_sync_waiter: bool

  def __init__(self):
    self.pending_event = None
    self.wset = None
    self.has_sync_waiter = False

  def set_pending_event(self, pending_event):
    self.pending_event = pending_event

  def has_pending_event(self):
    return bool(self.pending_event)

  def in_waitable_set(self):
    return self.wset is not None

  def wait_for_pending_event(self):
    assert(not self.in_waitable_set() and not self.has_sync_waiter)
    self.has_sync_waiter = True
    current_thread().wait_until(self.has_pending_event, cancellable = False)
    self.has_sync_waiter = False

  def get_pending_event(self) -> EventTuple:
    pending_event = self.pending_event
    self.pending_event = None
    return pending_event()

  def join(self, wset):
    assert(not self.has_sync_waiter)
    if self.wset:
      self.wset.elems.remove(self)
    self.wset = wset
    if wset:
      wset.elems.append(self)

  def drop(self):
    assert(not self.has_pending_event())
    assert(not self.has_sync_waiter)
    self.join(None)

A "waitable set" contains a collection of waitables that can be waited on or polled for any element to make progress. Although the WaitableSet class below represents elems as a list and implements {has,get}_pending_event with an O(n) search, because a waitable can be associated with at most one set and can contain at most one pending event, a real implementation could instead store a list of waitables-with-pending-events as a linked list embedded directly in the component instance's table element to avoid the separate allocation while providing O(1) polling.

class WaitableSet:
  elems: list[Waitable]
  num_waiting: int

  def __init__(self):
    self.elems = []
    self.num_waiting = 0

  def has_pending_event(self):
    return any(w.has_pending_event() for w in self.elems)

  def get_pending_event(self) -> EventTuple:
    assert(self.has_pending_event())
    random.shuffle(self.elems)
    for w in self.elems:
      assert(self is w.wset)
      if w.has_pending_event():
        return w.get_pending_event()

  def wait_for_event_and(self, ready_func, cancellable) -> EventTuple:
    def ready_and_has_event():
      return ready_func() and self.has_pending_event()
    self.num_waiting += 1
    cancelled = current_thread().wait_until(ready_and_has_event, cancellable)
    if cancelled:
      event = (EventCode.TASK_CANCELLED, 0, 0)
    else:
      event = self.get_pending_event()
    self.num_waiting -= 1
    return event

  def wait_for_event(self, cancellable) -> EventTuple:
    return self.wait_for_event_and(lambda: True, cancellable)

  def poll(self, cancellable) -> EventTuple:
    if current_task().deliver_pending_cancel(cancellable):
      return (EventCode.TASK_CANCELLED, 0, 0)
    elif not self.has_pending_event():
      return (EventCode.NONE, 0, 0)
    else:
      return self.get_pending_event()

  def drop(self):
    trap_if(len(self.elems) > 0)
    trap_if(self.num_waiting > 0)

The random.shuffle in get_pending_event give embedders the semantic freedom to schedule delivery of events nondeterministically (e.g., taking into account priorities); runtimes do not have to literally randomize event delivery.

The ready_func passed to WaitableSet.wait_for_event_and allows the caller to stipulate extra conditions that have to be met, in addition to there being an event ready for delivery. In particular, this is used by the async callback event loop to avoid overlapping callback execution.

The WaitableSet.drop method traps if dropped while it still contains elements (whose Waitable.wset field would become dangling) or if it is being waited-upon by another Task (as indicated by a non-zero num_waiting).

Subtask State

While canon_lift creates Task objects, canon_lower creates Subtask objects, using Subtask to contain all the state relevant to the caller. See the structured concurrency section for a summary of how Task and Subtask relate with respect to component- and host-defined callers and callees. This section introduces Subtask incrementally, starting with its fields and initialization:

class Subtask(Waitable):
  class State(IntEnum):
    STARTING = 0
    STARTED = 1
    RETURNED = 2
    CANCELLED_BEFORE_STARTED = 3
    CANCELLED_BEFORE_RETURNED = 4

  state: State
  on_cancel: Optional[OnCancel]
  lenders: Optional[list[ResourceHandle]]
  cancellation_requested: bool

  def __init__(self):
    Waitable.__init__(self)
    self.state = Subtask.State.STARTING
    self.on_cancel = None
    self.lenders = []
    self.cancellation_requested = False

The state field of Subtask tracks the callee's progression from the initial STARTING state along the subtask state machine. A Subtask is considered "resolved" if it has returned a value or if, after having had cancellation requested by the caller, called task.cancel (either before or after calling OnStart):

  def resolved(self):
    match self.state:
      case (Subtask.State.STARTING |
            Subtask.State.STARTED):
        return False
      case (Subtask.State.RETURNED |
            Subtask.State.CANCELLED_BEFORE_STARTED |
            Subtask.State.CANCELLED_BEFORE_RETURNED):
        return True

The Subtask.add_lender method is called by lift_borrow (below). This method increments the num_lends counter on the handle being lifted, which is guarded to be zero by canon_resource_drop (below). The Subtask.deliver_resolve method is called right before the SUBTASK resolve event is delivered to wasm (for any of the RETURNED, CANCELLED_BEFORE_STARTED or CANCELLED_BEFORE_RETURNED states), at which point all the borrowed handles are logically returned to the caller by decrementing all the num_lend counts that were initially incremented.

  def add_lender(self, lending_handle):
    assert(not self.resolve_delivered() and not self.resolved())
    lending_handle.num_lends += 1
    self.lenders.append(lending_handle)

  def deliver_resolve(self):
    assert(not self.resolve_delivered() and self.resolved())
    for h in self.lenders:
      h.num_lends -= 1
    self.lenders = None

  def resolve_delivered(self):
    assert(self.lenders is not None or self.resolved())
    return self.lenders is None

Note, the lenders list usually has a fixed size (in all cases except when a function signature has borrows in lists or streams) and thus can be stored inline in the native stack frame.

The Subtask.drop method is only called for Subtasks that have been added to the current component instance's handles table and checks that the callee has been allowed to resolve and explicitly relinquish any borrowed handles.

  def drop(self):
    trap_if(not self.resolve_delivered())
    Waitable.drop(self)

Buffer State

A "buffer" is an abstract region of memory that can either be read-from or written-to. This region of memory can either be owned by the host or by wasm. Currently wasm memory is always 32-bit or 64-bit linear memory, but soon GC memory will be added. Thus, buffers provide an abstraction over at least 4 different "kinds" of memory.

(Currently, buffers are only created implicitly as part of stream and future built-ins such as stream.read. However, in the future, explicit component-level buffer types and canonical built-ins may be added to allow explicitly creating buffers and passing them between components.)

A "readable buffer" allows reading t values from the buffer's memory. A "writable buffer" allows writing t values into the buffer's memory. All buffers have an associated component-level value type t and a remain method that returns how many t values may still be read or written. Buffers mostly hide their original/complete size. However, zero-length buffers need to be treated specially (particularly when a zero-length read rendezvous with a zero-length write), so there is a special query for detecting whether a buffer is zero-length. Internally, buffers do have a maximum length of 2^28 - 1 which is independent of the type of memory backing the buffer. Based on this, buffers are represented by the following 3 abstract Python classes:

class Buffer:
  MAX_LENGTH = 2**28 - 1
  t: ValType
  remain: Callable[[], int]
  is_zero_length: Callable[[], bool]

class ReadableBuffer(Buffer):
  read: Callable[[int], list[any]]

class WritableBuffer(Buffer):
  write: Callable[[list[any]]]

As preconditions (internally ensured by the Canonical ABI code below):

  • read may only be passed a positive number less than or equal to remain
  • write may only be passed a non-empty list of length less than or equal to remain containing values of type t

Since read and write are synchronous Python functions, buffers inherently guarantee synchronous access to a fixed-size backing memory and are thus distinguished from streams (which provide asynchronous operations for reading and writing an unbounded number of values to potentially-different regions of memory over time).

The ReadableBuffer and WritableBuffer abstract classes may either be implemented by the host or by another wasm component. In the latter case, these abstract classes are implemented by the concrete ReadableBufferGuestImpl and WritableBufferGuestImpl classes which eagerly check alignment and range when the buffer is constructed so that read and write are infallible operations (modulo traps):

class BufferGuestImpl(Buffer):
  cx: LiftLowerContext
  t: ValType
  ptr: int
  progress: int
  length: int

  def __init__(self, t, cx, ptr, length):
    trap_if(length > Buffer.MAX_LENGTH)
    if t and length > 0:
      trap_if(ptr != align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
      trap_if(ptr + length * elem_size(t, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
    self.cx = cx
    self.t = t
    self.ptr = ptr
    self.progress = 0
    self.length = length

  def remain(self):
    return self.length - self.progress

  def is_zero_length(self):
    return self.length == 0

class ReadableBufferGuestImpl(BufferGuestImpl):
  def read(self, n):
    assert(n <= self.remain())
    if self.t:
      vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
      self.ptr += n * elem_size(self.t, self.cx.opts.memory.ptr_type())
    else:
      vs = n * [()]
    self.progress += n
    return vs

class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
  def write(self, vs):
    assert(len(vs) <= self.remain())
    if self.t:
      store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
      self.ptr += len(vs) * elem_size(self.t, self.cx.opts.memory.ptr_type())
    else:
      assert(all(v == () for v in vs))
    self.progress += len(vs)

When t is None (arising from stream and future with empty element types), the core-wasm-supplied ptr is entirely ignored, while the length and progress are still semantically meaningful. Source bindings may represent this case with a generic stream/future of [unit] type or a distinct type that conveys events without values.

The load_list_from_valid_range and store_list_into_valid_range functions that do all the heavy lifting are shared with function parameter/result lifting and lowering and defined below.

Stream State

Values of stream type are represented in the Canonical ABI as i32 indices into the current component instance's handles table referring to either the readable or writable end of a stream. Reading from the readable end of a stream is achieved by calling stream.read and supplying a WritableBuffer. Conversely, writing to the writable end of a stream is achieved by calling stream.write and supplying a ReadableBuffer. The runtime waits until both a readable and writable buffer have been supplied and then performs a direct copy between the two buffers. This rendezvous-based design avoids the need for an intermediate buffer and copy (unlike, e.g., a Unix pipe; a Unix pipe would instead be implemented as a resource type owning the buffer memory and two streams; on going in and one coming out).

The result of a {stream,future}.{read,write} is communicated to the wasm guest via a CopyResult code:

class CopyResult(IntEnum):
  COMPLETED = 0
  DROPPED = 1
  CANCELLED = 2

The DROPPED code indicates that the other end has since been dropped and thus no more reads/writes are possible. The CANCELLED code is only possible after this end has performed a {stream,future}.{read,write} followed by a {stream,future}.cancel-{read,write}; CANCELLED notifies the wasm code that the cancellation finished and so ownership of the memory buffer has been returned to the wasm code. Lastly, COMPLETED indicates that the copy is done and neither DROPPED nor CANCELLED apply.

As with functions and buffers, native host code can be on either side of a stream. Thus, streams are defined in terms of abstract interfaces that can be implemented and consumed by wasm or host code (with all {wasm,host} pairings being possible and well-defined). Since a stream in a function parameter or result type always represents the transfer of the readable end of a stream, only the ReadableStream interface can be implemented by either wasm or the host; the WritableStream interface is always written to by wasm via a writable stream end created by stream.new.

ReclaimBuffer = Callable[[], None]
OnCopy = Callable[[ReclaimBuffer], None]
OnCopyDone = Callable[[CopyResult], None]

class SharedBase:
  t: ValType
  cancel: Callable[[], None]
  drop: Callable[[], None]

class ReadableStream(SharedBase):
  read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None]

class WritableStream(SharedBase):
  write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]

The key operations in these interfaces are read and write which work as follows:

  • read never blocks and returns its values by either synchronously or asynchronously writing to the given WritableBuffer and then calling the given OnCopy* callbacks to notify the caller of progress.
  • Symmetrically, write never blocks and takes the value to be written from the given ReadableBuffer, calling the given OnCopy* callbacks to notify the caller of progress.
  • OnCopyDone is called to indicate that the read or write is finished copying and that the caller has regained ownership of the buffer.
  • OnCopy is called to indicate a copy has been made to or from the buffer. However, there may be further copies made in the future, so the caller has not regained ownership of the buffer.
  • The ReclaimBuffer callback passed to OnCopy allows the caller of read or write to immediately regain ownership of the buffer once the first copy has completed.
  • cancel is non-blocking, but does not guarantee that ownership of the buffer has been returned; cancel only lets the caller request that one of the OnCopy* callbacks be called ASAP (which may or may not happen during cancel).
  • The client may not call read, write or drop while there is a previous read or write in progress.

The OnCopy* callbacks are a spec-internal detail used to specify the allowed concurrent behaviors of stream.{read,write} and not exposed directly to core wasm code. Specifically, the point of the OnCopy* callbacks is to specify that multiple reads or writes are allowed into the same Buffer up until the point where either the buffer is full or the calling core wasm code receives a STREAM_READ or STREAM_WRITE progress event (in which case ReclaimBuffer is called). This reduces the number of context-switches required by the spec, particularly when streaming between two components.

The SharedStreamImpl class implements both ReadableStream and WritableStream for streams created by wasm (via stream.new) and tracks the common state shared by both the readable and writable ends of streams (defined below).

Introducing SharedStreamImpl in chunks, starting with the fields and initialization:

class SharedStreamImpl(ReadableStream, WritableStream):
  dropped: bool
  pending_inst: Optional[ComponentInstance]
  pending_buffer: Optional[Buffer]
  pending_on_copy: Optional[OnCopy]
  pending_on_copy_done: Optional[OnCopyDone]

  def __init__(self, t):
    self.t = t
    self.dropped = False
    self.reset_pending()

  def reset_pending(self):
    self.set_pending(None, None, None, None)

  def set_pending(self, inst, buffer, on_copy, on_copy_done):
    self.pending_inst = inst
    self.pending_buffer = buffer
    self.pending_on_copy = on_copy
    self.pending_on_copy_done = on_copy_done

If set, the pending_* fields record the Buffer and OnCopy* callbacks of a read or write that is waiting to rendezvous with a complementary write or read. Dropping the readable or writable end of a stream or cancelling a read or write notifies any pending read or write via its OnCopyDone callback:

  def reset_and_notify_pending(self, result):
    pending_on_copy_done = self.pending_on_copy_done
    self.reset_pending()
    pending_on_copy_done(result)

  def cancel(self):
    self.reset_and_notify_pending(CopyResult.CANCELLED)

  def drop(self):
    if not self.dropped:
      self.dropped = True
      if self.pending_buffer:
        self.reset_and_notify_pending(CopyResult.DROPPED)

While the abstract ReadableStream and WritableStream interfaces allow cancel to return without having returned ownership of the buffer (which, in general, is necessary for various host APIs), when wasm is implementing the stream, cancel always returns ownership of the buffer immediately.

Note that cancel and drop notify in opposite directions:

  • cancel must be called on a readable or writable end with an operation pending, and thus cancel notifies the same end that called it.
  • drop must not be called on a readable or writable end with an operation pending, and thus drop notifies the opposite end.

The read method implements ReadableStream.read and is called by either stream.read or the host, depending on who is passed the readable end of the stream. If the reader is first to rendezvous, then all the parameters are stored in the pending_* fields, requiring the reader to wait for the writer to rendezvous. If the writer was first to rendezvous, then there is already a pending ReadableBuffer to read from, and so the reader copies as much as it can (which may be less than a full buffer's worth) and eagerly completes the copy without blocking. In the final special case where the pending writer has a zero-length buffer, the writer is notified, but the reader remains blocked:

  def read(self, inst, dst_buffer, on_copy, on_copy_done):
    if self.dropped:
      on_copy_done(CopyResult.DROPPED)
    elif not self.pending_buffer:
      self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
    else:
      assert(self.t == dst_buffer.t == self.pending_buffer.t)
      trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
      if self.pending_buffer.remain() > 0:
        if dst_buffer.remain() > 0:
          n = min(dst_buffer.remain(), self.pending_buffer.remain())
          dst_buffer.write(self.pending_buffer.read(n))
          self.pending_on_copy(self.reset_pending)
        on_copy_done(CopyResult.COMPLETED)
      else:
        self.reset_and_notify_pending(CopyResult.COMPLETED)
        self.set_pending(inst, dst_buffer, on_copy, on_copy_done)

Currently, there is a trap when both the read and write come from the same component instance and there is a non-empty, non-number element type. This trap will be removed in a subsequent release; the reason for the trap is that when lifting and lowering can alias the same memory, interleavings can be complex and must be handled carefully. Future improvements to the Canonical ABI (lazy lowering) can greatly simplify this interleaving and be more practical to implement.

The write method implements WritableStream.write and is called by the stream.write built-in (noting that the host cannot be passed the writable end of a stream but may instead implement the ReadableStream interface and pass the readable end into a component). The steps for write are the same as read except for when a zero-length write rendezvous with a zero-length read, in which case the write eagerly completes, leaving the read pending:

  def write(self, inst, src_buffer, on_copy, on_copy_done):
    if self.dropped:
      on_copy_done(CopyResult.DROPPED)
    elif not self.pending_buffer:
      self.set_pending(inst, src_buffer, on_copy, on_copy_done)
    else:
      assert(self.t == src_buffer.t == self.pending_buffer.t)
      trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
      if self.pending_buffer.remain() > 0:
        if src_buffer.remain() > 0:
          n = min(src_buffer.remain(), self.pending_buffer.remain())
          self.pending_buffer.write(src_buffer.read(n))
          self.pending_on_copy(self.reset_pending)
        on_copy_done(CopyResult.COMPLETED)
      elif src_buffer.is_zero_length() and self.pending_buffer.is_zero_length():
        on_copy_done(CopyResult.COMPLETED)
      else:
        self.reset_and_notify_pending(CopyResult.COMPLETED)
        self.set_pending(inst, src_buffer, on_copy, on_copy_done)

Putting together the behavior of zero-length read and write above, we can see that, when both the reader and writer are zero-length, regardless of who was first, the zero-length write always completes, leaving the zero-length read pending. To avoid livelock, the Canonical ABI requires that a writer must (eventually) follow a completed zero-length write with a non-zero-length write that is allowed to block. This will break the loop, notifying the reader end and allowing it to rendezvous with a non-zero-length read and make progress. See the stream readiness section in the async explainer for more background on purpose of zero-length reads and writes.

The none_or_number_type predicate used above includes both the integer and floating point number types:

def none_or_number_type(t):
  return t is None or isinstance(t, U8Type | U16Type | U32Type | U64Type |
                                    S8Type | S16Type | S32Type | S64Type |
                                    F32Type | F64Type)

The two ends of a stream are stored as separate elements in the component instance handles table and each end has a separate CopyState that reflects what that end is currently doing or has done. This state field is factored out into the CopyEnd class that is derived below. The two ends also share some state which is referenced by the shared field and either points to a SharedStreamImpl (for component-created streams) or something host-defined for (host-created streams).

class CopyState(Enum):
  IDLE = 1
  COPYING = 2
  CANCELLING_COPY = 3
  DONE = 4

class CopyEnd(Waitable):
  state: CopyState
  shared: SharedBase

  def __init__(self, shared):
    Waitable.__init__(self)
    self.state = CopyState.IDLE
    self.shared = shared

  def copying(self):
    match self.state:
      case CopyState.IDLE | CopyState.DONE:
        return False
      case CopyState.COPYING | CopyState.CANCELLING_COPY:
        return True
    assert(False)

  def drop(self):
    trap_if(self.copying())
    self.shared.drop()
    Waitable.drop(self)

class ReadableStreamEnd(CopyEnd):
  def copy(self, inst, dst, on_copy, on_copy_done):
    self.shared.read(inst, dst, on_copy, on_copy_done)

class WritableStreamEnd(CopyEnd):
  def copy(self, inst, src, on_copy, on_copy_done):
    self.shared.write(inst, src, on_copy, on_copy_done)

As shown in drop, attempting to drop a readable or writable end while a copy is in progress or in the process of being cancelled traps. This means that client code must take care to wait for these operations to finish (potentially cancelling them via stream.cancel-{read,write}) before dropping.

The polymorphic copy method dispatches to either ReadableStream.read or WritableStream.write and allows the implementations of stream.{read,write} to share a single definition (in stream_copy below).

Future State

Futures are similar to streams, except that instead of passing 0..N values, exactly one value is passed from the writer end to the reader end unless the reader end is explicitly dropped first.

Futures are defined in terms of abstract ReadableFuture and WritableFuture interfaces:

class ReadableFuture(SharedBase):
  read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None]

class WritableFuture(SharedBase):
  write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]

These interfaces work like ReadableStream and WritableStream except that there is no OnCopy callback passed to read or write to report partial progress (since at most 1 value is copied) and the given Buffer must have remain() == 1.

Introducing SharedFutureImpl in chunks, the first part is exactly symmetric to SharedStreamImpl in how initialization and cancellation work:

class SharedFutureImpl(ReadableFuture, WritableFuture):
  dropped: bool
  pending_inst: Optional[ComponentInstance]
  pending_buffer: Optional[Buffer]
  pending_on_copy_done: Optional[OnCopyDone]

  def __init__(self, t):
    self.t = t
    self.dropped = False
    self.reset_pending()

  def reset_pending(self):
    self.set_pending(None, None, None)

  def set_pending(self, inst, buffer, on_copy_done):
    self.pending_inst = inst
    self.pending_buffer = buffer
    self.pending_on_copy_done = on_copy_done

  def reset_and_notify_pending(self, result):
    pending_on_copy_done = self.pending_on_copy_done
    self.reset_pending()
    pending_on_copy_done(result)

  def cancel(self):
    self.reset_and_notify_pending(CopyResult.CANCELLED)

Dropping works the same in futures as in streams, except that a future writable end cannot be dropped without having written a value. This is guarded by WritableFutureEnd.drop so it can be asserted here:

  def drop(self):
    if not self.dropped:
      self.dropped = True
      if self.pending_buffer:
        assert(isinstance(self.pending_buffer, WritableBuffer))
        self.reset_and_notify_pending(CopyResult.DROPPED)

Lastly, read and write work mostly like streams, but simplified based on the fact that we're copying at most 1 value. The only asymmetric difference is that, as mentioned above, only the writable end can observe that the readable end was dropped before receiving a value.

  def read(self, inst, dst_buffer, on_copy_done):
    assert(not self.dropped and dst_buffer.remain() == 1)
    if not self.pending_buffer:
      self.set_pending(inst, dst_buffer, on_copy_done)
    else:
      trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
      dst_buffer.write(self.pending_buffer.read(1))
      self.reset_and_notify_pending(CopyResult.COMPLETED)
      on_copy_done(CopyResult.COMPLETED)

  def write(self, inst, src_buffer, on_copy_done):
    assert(src_buffer.remain() == 1)
    if self.dropped:
      on_copy_done(CopyResult.DROPPED)
    elif not self.pending_buffer:
      self.set_pending(inst, src_buffer, on_copy_done)
    else:
      trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
      self.pending_buffer.write(src_buffer.read(1))
      self.reset_and_notify_pending(CopyResult.COMPLETED)
      on_copy_done(CopyResult.COMPLETED)

As with streams, the # temporary limitation shown above is that a future cannot be read and written from the same component instance when it has a non-empty, non-number value type.

Lastly, the {Readable,Writable}FutureEnd classes are mostly symmetric with {Readable,Writable}StreamEnd, defining a polymorphic copy method that dispatches to either ReadableFuture.read or WritableFuture.write, which allows the implementation of future.{read,write} to share a single definition (in future_copy below). The only difference is that WritableFutureEnd.drop traps if the writer hasn't successfully written a value or been notified of the reader dropping their end:

class ReadableFutureEnd(CopyEnd):
  def copy(self, inst, dst_buffer, on_copy_done):
    self.shared.read(inst, dst_buffer, on_copy_done)

class WritableFutureEnd(CopyEnd):
  def copy(self, inst, src_buffer, on_copy_done):
    self.shared.write(inst, src_buffer, on_copy_done)

  def drop(self):
    trap_if(self.state != CopyState.DONE)
    CopyEnd.drop(self)

Despecialization

In the explainer, component value types are classified as either fundamental or specialized, where the specialized value types are defined by expansion into fundamental value types. In most cases, the canonical ABI of a specialized value type is the same as its expansion so, to avoid repetition, the other definitions below use the following despecialize function to replace specialized value types with their expansion:

def despecialize(t):
  match t:
    case TupleType(ts)       : return RecordType([ FieldType(str(i), t) for i,t in enumerate(ts) ])
    case EnumType(labels)    : return VariantType([ CaseType(l, None) for l in labels ])
    case OptionType(t)       : return VariantType([ CaseType("none", None), CaseType("some", t) ])
    case ResultType(ok, err) : return VariantType([ CaseType("ok", ok), CaseType("error", err) ])
    case MapType(k, v)       : return ListType(despecialize(TupleType([k, v])))
    case _                   : return t

The specialized value types string and flags are missing from this list because they are given specialized canonical ABI representations distinct from their respective expansions.

Type Predicates

The contains_borrow and contains_async_value predicates return whether the given type contains a borrow or future/stream, respectively.

def contains_borrow(t):
  return contains(t, lambda u: isinstance(u, BorrowType))

def contains_async_value(t):
  return contains(t, lambda u: isinstance(u, StreamType | FutureType))

def contains(t, p):
  t = despecialize(t)
  match t:
    case None:
      return False
    case PrimValType() | OwnType() | BorrowType():
      return p(t)
    case ListType(u) | StreamType(u) | FutureType(u):
      return p(t) or contains(u, p)
    case RecordType(fields):
      return p(t) or any(contains(f.t, p) for f in fields)
    case VariantType(cases):
      return p(t) or any(contains(c.t, p) for c in cases)
    case FuncType():
      return any(p(u) for u in t.param_types() + t.result_type())
    case _:
      assert(False)

Alignment

Each value type is assigned an alignment which is used by subsequent Canonical ABI definitions. Presenting the definition of alignment piecewise, we start with the top-level case analysis:

def alignment(t, ptr_type):
  match despecialize(t):
    case BoolType()                  : return 1
    case S8Type() | U8Type()         : return 1
    case S16Type() | U16Type()       : return 2
    case S32Type() | U32Type()       : return 4
    case S64Type() | U64Type()       : return 8
    case F32Type()                   : return 4
    case F64Type()                   : return 8
    case CharType()                  : return 4
    case StringType()                : return ptr_size(ptr_type)
    case ErrorContextType()          : return 4
    case ListType(t, l)              : return alignment_list(t, l, ptr_type)
    case RecordType(fields)          : return alignment_record(fields, ptr_type)
    case VariantType(cases)          : return alignment_variant(cases, ptr_type)
    case FlagsType(labels)           : return alignment_flags(labels)
    case OwnType() | BorrowType()    : return 4
    case StreamType() | FutureType() : return 4

List alignment is the same as tuple alignment when the length is fixed and otherwise uses the alignment of pointers.

def alignment_list(elem_type, maybe_length, ptr_type):
  if maybe_length is not None:
    return alignment(elem_type, ptr_type)
  return ptr_size(ptr_type)

Record alignment is tuple alignment, with the definitions split for reuse below:

def alignment_record(fields, ptr_type):
  a = 1
  for f in fields:
    a = max(a, alignment(f.t, ptr_type))
  return a

As an optimization, variant discriminants are represented by the smallest integer covering the number of cases in the variant (with cases numbered in order from 0 to len(cases)-1). Depending on the payload type, this can allow more compact representations of variants in memory. This smallest integer type is selected by the following function, used above and below:

def alignment_variant(cases, ptr_type):
  return max(alignment(discriminant_type(cases), ptr_type), max_case_alignment(cases, ptr_type))

def discriminant_type(cases):
  n = len(cases)
  assert(0 < n < (1 << 32))
  match math.ceil(math.log2(n)/8):
    case 0: return U8Type()
    case 1: return U8Type()
    case 2: return U16Type()
    case 3: return U32Type()

def max_case_alignment(cases, ptr_type):
  a = 1
  for c in cases:
    if c.t is not None:
      a = max(a, alignment(c.t, ptr_type))
  return a

As an optimization, flags are represented as packed bit-vectors. Like variant discriminants, flags use the smallest integer that fits all the bits.

def alignment_flags(labels):
  n = len(labels)
  assert(0 < n <= 32)
  if n <= 8: return 1
  if n <= 16: return 2
  return 4

Element Size

Each value type is also assigned an elem_size which is the number of bytes used when values of the type are stored as elements of a list. Having this byte size be a static property of the type instead of attempting to use a variable-length element-encoding scheme both simplifies the implementation and maps well to languages which represent lists as random-access arrays. Empty types, such as records with no fields, are not permitted, to avoid complications in source languages.

def elem_size(t, ptr_type):
  match despecialize(t):
    case BoolType()                  : return 1
    case S8Type() | U8Type()         : return 1
    case S16Type() | U16Type()       : return 2
    case S32Type() | U32Type()       : return 4
    case S64Type() | U64Type()       : return 8
    case F32Type()                   : return 4
    case F64Type()                   : return 8
    case CharType()                  : return 4
    case StringType()                : return 2 * ptr_size(ptr_type)
    case ErrorContextType()          : return 4
    case ListType(t, l)              : return elem_size_list(t, l, ptr_type)
    case RecordType(fields)          : return elem_size_record(fields, ptr_type)
    case VariantType(cases)          : return elem_size_variant(cases, ptr_type)
    case FlagsType(labels)           : return elem_size_flags(labels)
    case OwnType() | BorrowType()    : return 4
    case StreamType() | FutureType() : return 4

def elem_size_list(elem_type, maybe_length, ptr_type):
  if maybe_length is not None:
    return maybe_length * elem_size(elem_type, ptr_type)
  return 2 * ptr_size(ptr_type)

def elem_size_record(fields, ptr_type):
  s = 0
  for f in fields:
    s = align_to(s, alignment(f.t, ptr_type))
    s += elem_size(f.t, ptr_type)
  assert(s > 0)
  return align_to(s, alignment_record(fields, ptr_type))

def align_to(ptr, alignment):
  return math.ceil(ptr / alignment) * alignment

def elem_size_variant(cases, ptr_type):
  s = elem_size(discriminant_type(cases), ptr_type)
  s = align_to(s, max_case_alignment(cases, ptr_type))
  cs = 0
  for c in cases:
    if c.t is not None:
      cs = max(cs, elem_size(c.t, ptr_type))
  s += cs
  return align_to(s, alignment_variant(cases, ptr_type))

def elem_size_flags(labels):
  n = len(labels)
  assert(0 < n <= 32)
  if n <= 8: return 1
  if n <= 16: return 2
  return 4

Loading

The load function defines how to read a value of a given value type t out of linear memory starting at offset ptr, returning the value represented as a Python value. Presenting the definition of load piecewise, we start with the top-level case analysis:

def load(cx, ptr, t):
  assert(ptr == align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
  assert(ptr + elem_size(t, cx.opts.memory.ptr_type()) <= len(cx.opts.memory))
  match despecialize(t):
    case BoolType()         : return convert_int_to_bool(load_int(cx, ptr, 1))
    case U8Type()           : return load_int(cx, ptr, 1)
    case U16Type()          : return load_int(cx, ptr, 2)
    case U32Type()          : return load_int(cx, ptr, 4)
    case U64Type()          : return load_int(cx, ptr, 8)
    case S8Type()           : return load_int(cx, ptr, 1, signed = True)
    case S16Type()          : return load_int(cx, ptr, 2, signed = True)
    case S32Type()          : return load_int(cx, ptr, 4, signed = True)
    case S64Type()          : return load_int(cx, ptr, 8, signed = True)
    case F32Type()          : return decode_i32_as_float(load_int(cx, ptr, 4))
    case F64Type()          : return decode_i64_as_float(load_int(cx, ptr, 8))
    case CharType()         : return convert_i32_to_char(cx, load_int(cx, ptr, 4))
    case StringType()       : return load_string(cx, ptr)
    case ErrorContextType() : return lift_error_context(cx, load_int(cx, ptr, 4))
    case ListType(t, l)     : return load_list(cx, ptr, t, l)
    case RecordType(fields) : return load_record(cx, ptr, fields)
    case VariantType(cases) : return load_variant(cx, ptr, cases)
    case FlagsType(labels)  : return load_flags(cx, ptr, labels)
    case OwnType()          : return lift_own(cx, load_int(cx, ptr, 4), t)
    case BorrowType()       : return lift_borrow(cx, load_int(cx, ptr, 4), t)
    case StreamType(t)      : return lift_stream(cx, load_int(cx, ptr, 4), t)
    case FutureType(t)      : return lift_future(cx, load_int(cx, ptr, 4), t)

Integers are loaded directly from memory, with their high-order bit interpreted according to the signedness of the type.

def load_int(cx, ptr, nbytes, signed = False):
  return int.from_bytes(cx.opts.memory[ptr : ptr+nbytes], 'little', signed = signed)

Integer-to-boolean conversions treats 0 as false and all other bit-patterns as true:

def convert_int_to_bool(i):
  assert(i >= 0)
  return bool(i)

Floats are loaded directly from memory, with the sign and payload information of NaN values discarded. Consequently, there is only one unique NaN value per floating-point type. This reflects the practical reality that some languages and protocols do not preserve these bits. In the Python code below, this is expressed as canonicalizing NaNs to a particular bit pattern.

See the comments about lowering of float values for a discussion of possible optimizations.

DETERMINISTIC_PROFILE = False # or True
CANONICAL_FLOAT32_NAN = 0x7fc00000
CANONICAL_FLOAT64_NAN = 0x7ff8000000000000

def canonicalize_nan32(f):
  if math.isnan(f):
    f = core_f32_reinterpret_i32(CANONICAL_FLOAT32_NAN)
    assert(math.isnan(f))
  return f

def canonicalize_nan64(f):
  if math.isnan(f):
    f = core_f64_reinterpret_i64(CANONICAL_FLOAT64_NAN)
    assert(math.isnan(f))
  return f

def decode_i32_as_float(i):
  return canonicalize_nan32(core_f32_reinterpret_i32(i))

def decode_i64_as_float(i):
  return canonicalize_nan64(core_f64_reinterpret_i64(i))

def core_f32_reinterpret_i32(i):
  return struct.unpack('<f', struct.pack('<I', i))[0] # f32.reinterpret_i32

def core_f64_reinterpret_i64(i):
  return struct.unpack('<d', struct.pack('<Q', i))[0] # f64.reinterpret_i64

An i32 is converted to a char (a Unicode Scalar Value) by dynamically testing that its unsigned integral value is in the valid Unicode Code Point range and not a Surrogate:

def convert_i32_to_char(cx, i):
  assert(i >= 0)
  trap_if(i >= 0x110000)
  trap_if(0xD800 <= i <= 0xDFFF)
  return chr(i)

Strings are loaded from two pointer-sized values: a pointer (offset in linear memory) and a number of code units. There are three supported string encodings in canonopt: UTF-8, UTF-16 and latin1+utf16. This last option allows a dynamic choice between Latin-1 and UTF-16, indicated by the high bit of the second pointer-sized value (32nd bit on 32-bit memories, 64th bit on 64-bit memories). String values include their original encoding and length in tagged code units as a "hint" that enables store_string (defined below) to make better up-front allocation size choices in many cases. Thus, the value produced by load_string isn't simply a Python str, but a tuple containing a str, the original encoding and the number of source code units.

The MAX_STRING_BYTE_LENGTH constant limits the byte length of a string when loading. This limit is low enough to ensure that when storing the string via store_string, the number of bytes allocated stays below REALLOC_I32_MAX even when converting to other encodings. This means that any loaded string will be short enough to be stored in 32-bit components. The worst case inflation in string byte length is by a factor of 2 (e.g. this could occur when converting from UTF-8 to UTF-16).

String = tuple[str, str, int]

def load_string(cx, ptr) -> String:
  begin = load_int(cx, ptr, cx.opts.memory.ptr_size())
  tagged_code_units = load_int(cx, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
  return load_string_from_range(cx, begin, tagged_code_units)

def utf16_tag(ptr_type):
  return 1 << (ptr_size(ptr_type) * 8 - 1)

REALLOC_I32_MAX = 2**32 - 1
MAX_STRING_BYTE_LENGTH = (1 << 28) - 1
assert(REALLOC_I32_MAX > 2 * MAX_STRING_BYTE_LENGTH)

def load_string_from_range(cx, ptr, tagged_code_units) -> String:
  tag = utf16_tag(cx.opts.memory.ptr_type())
  match cx.opts.string_encoding:
    case 'utf8':
      alignment = 1
      byte_length = tagged_code_units
      encoding = 'utf-8'
    case 'utf16':
      alignment = 2
      byte_length = 2 * tagged_code_units
      encoding = 'utf-16-le'
    case 'latin1+utf16':
      alignment = 2
      if bool(tagged_code_units & tag):
        byte_length = 2 * (tagged_code_units ^ tag)
        encoding = 'utf-16-le'
      else:
        byte_length = tagged_code_units
        encoding = 'latin-1'

  trap_if(byte_length > MAX_STRING_BYTE_LENGTH)
  trap_if(ptr != align_to(ptr, alignment))
  trap_if(ptr + byte_length > len(cx.opts.memory))
  try:
    s = cx.opts.memory[ptr : ptr+byte_length].decode(encoding)
  except UnicodeError:
    trap()

  return (s, cx.opts.string_encoding, tagged_code_units)

Error context values are lifted directly from the current component instance's handles table:

def lift_error_context(cx, i):
  errctx = cx.inst.handles.get(i)
  trap_if(not isinstance(errctx, ErrorContext))
  return errctx

Lists and records are loaded by recursively loading their elements/fields. The byte length of a list is limited to MAX_LIST_BYTE_LENGTH which, similar to MAX_STRING_BYTE_LENGTH, keeps list lengths well below the 32-bit address space limit. The worst case inflation of a list length in bytes is by a factor of 2 (which could occur when pointer sizes increase from 4 to 8 bytes) so MAX_LIST_BYTE_LENGTH is small enough that the byte length passed to realloc stays below REALLOC_I32_MAX even when doubled.

MAX_LIST_BYTE_LENGTH = (1 << 28) - 1
assert(REALLOC_I32_MAX > 2 * MAX_LIST_BYTE_LENGTH)

def load_list(cx, ptr, elem_type, maybe_length):
  if maybe_length is not None:
    return load_list_from_valid_range(cx, ptr, maybe_length, elem_type)
  begin = load_int(cx, ptr, cx.opts.memory.ptr_size())
  length = load_int(cx, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
  return load_list_from_range(cx, begin, length, elem_type)

def load_list_from_range(cx, ptr, length, elem_type):
  trap_if(length * elem_size(elem_type, cx.opts.memory.ptr_type()) > MAX_LIST_BYTE_LENGTH)
  trap_if(ptr != align_to(ptr, alignment(elem_type, cx.opts.memory.ptr_type())))
  trap_if(ptr + length * elem_size(elem_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
  return load_list_from_valid_range(cx, ptr, length, elem_type)

def load_list_from_valid_range(cx, ptr, length, elem_type):
  a = []
  for i in range(length):
    a.append(load(cx, ptr + i * elem_size(elem_type, cx.opts.memory.ptr_type()), elem_type))
  return a

def load_record(cx, ptr, fields):
  record = {}
  for field in fields:
    ptr = align_to(ptr, alignment(field.t, cx.opts.memory.ptr_type()))
    record[field.label] = load(cx, ptr, field.t)
    ptr += elem_size(field.t, cx.opts.memory.ptr_type())
  return record

As a technical detail: the align_to in the loop in load_record is guaranteed to be a no-op on the first iteration because the record as a whole starts out aligned (as asserted at the top of load).

Variants are loaded using the order of the cases in the type to determine the case index, assigning 0 to the first case, 1 to the next case, etc. While the code below appears to perform case-label lookup at runtime, a normal implementation can build the appropriate index tables at compile-time so that variant-passing is always O(1) and not involving string operations.

def load_variant(cx, ptr, cases):
  disc_size = elem_size(discriminant_type(cases), cx.opts.memory.ptr_type())
  case_index = load_int(cx, ptr, disc_size)
  ptr += disc_size
  trap_if(case_index >= len(cases))
  c = cases[case_index]
  ptr = align_to(ptr, max_case_alignment(cases, cx.opts.memory.ptr_type()))
  if c.t is None:
    return { c.label: None }
  return { c.label: load(cx, ptr, c.t) }

Flags are converted from a bit-vector to a dictionary whose keys are derived from the ordered labels of the flags type. The code here takes advantage of Python's support for integers of arbitrary width.

def load_flags(cx, ptr, labels):
  i = load_int(cx, ptr, elem_size_flags(labels))
  return unpack_flags_from_int(i, labels)

def unpack_flags_from_int(i, labels):
  record = {}
  for l in labels:
    record[l] = bool(i & 1)
    i >>= 1
  return record

own handles are lifted by removing the handle from the current component instance's handles table so that ownership is transferred to the lowering component. The lifting operation fails if unique ownership of the handle isn't possible, for example if the index was actually a borrow or if the own handle is currently being lent out as borrows.

def lift_own(cx, i, t):
  h = cx.inst.handles.remove(i)
  trap_if(not isinstance(h, ResourceHandle))
  trap_if(h.rt is not t.rt)
  trap_if(h.num_lends != 0)
  trap_if(not h.own)
  return h.rep

The abstract lifted value for handle types is currently just the internal resource representation i32, which is kept opaque from the receiving component (it's stored in the handles table and only accessed indirectly via index). (This assumes that resource representations are immutable. If representations were to become mutable, the address of the mutable cell would be passed as the lifted value instead.)

In contrast to own, borrow handles are lifted by reading the representation from the source handle, leaving the source handle intact in the current component instance's handles table:

def lift_borrow(cx, i, t):
  assert(isinstance(cx.borrow_scope, Subtask))
  h = cx.inst.handles.get(i)
  trap_if(not isinstance(h, ResourceHandle))
  trap_if(h.rt is not t.rt)
  cx.borrow_scope.add_lender(h)
  return h.rep

The Subtask.add_lender participates in the enforcement of the dynamic borrow rules, which keep the source handle alive until the end of the call (as a conservative upper bound on how long the borrow handle can be held). Note that add_lender is called for borrowed source handles so that they must be kept alive until the subtask completes, which in turn prevents the current task from task.returning while its non-returned subtask still holds a transitively-borrowed handle.

Streams and futures are entirely symmetric, transferring ownership of the readable end from the lifting component to the host or lowering component and trapping if the readable end is in the middle of copying (which would create a dangling-pointer situation) or is in the DONE state (in which case the only valid operation is {stream,future}.drop-{readable,writable}).

def lift_stream(cx, i, t):
  return lift_async_value(ReadableStreamEnd, cx, i, t)

def lift_future(cx, i, t):
  return lift_async_value(ReadableFutureEnd, cx, i, t)

def lift_async_value(ReadableEndT, cx, i, t):
  assert(not contains_borrow(t))
  e = cx.inst.handles.remove(i)
  trap_if(not isinstance(e, ReadableEndT))
  trap_if(e.shared.t != t)
  trap_if(e.state != CopyState.IDLE)
  return e.shared

Storing

The store function defines how to write a value v of a given value type t into linear memory starting at offset ptr. Presenting the definition of store piecewise, we start with the top-level case analysis:

def store(cx, v, t, ptr):
  assert(ptr == align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
  assert(ptr + elem_size(t, cx.opts.memory.ptr_type()) <= len(cx.opts.memory))
  match despecialize(t):
    case BoolType()         : store_int(cx, int(bool(v)), ptr, 1)
    case U8Type()           : store_int(cx, v, ptr, 1)
    case U16Type()          : store_int(cx, v, ptr, 2)
    case U32Type()          : store_int(cx, v, ptr, 4)
    case U64Type()          : store_int(cx, v, ptr, 8)
    case S8Type()           : store_int(cx, v, ptr, 1, signed = True)
    case S16Type()          : store_int(cx, v, ptr, 2, signed = True)
    case S32Type()          : store_int(cx, v, ptr, 4, signed = True)
    case S64Type()          : store_int(cx, v, ptr, 8, signed = True)
    case F32Type()          : store_int(cx, encode_float_as_i32(v), ptr, 4)
    case F64Type()          : store_int(cx, encode_float_as_i64(v), ptr, 8)
    case CharType()         : store_int(cx, char_to_i32(v), ptr, 4)
    case StringType()       : store_string(cx, v, ptr)
    case ErrorContextType() : store_int(cx, lower_error_context(cx, v), ptr, 4)
    case ListType(t, l)     : store_list(cx, v, ptr, t, l)
    case RecordType(fields) : store_record(cx, v, ptr, fields)
    case VariantType(cases) : store_variant(cx, v, ptr, cases)
    case FlagsType(labels)  : store_flags(cx, v, ptr, labels)
    case OwnType()          : store_int(cx, lower_own(cx, v, t), ptr, 4)
    case BorrowType()       : store_int(cx, lower_borrow(cx, v, t), ptr, 4)
    case StreamType(t)      : store_int(cx, lower_stream(cx, v, t), ptr, 4)
    case FutureType(t)      : store_int(cx, lower_future(cx, v, t), ptr, 4)

Integers are stored directly into memory. Because the input domain is exactly the integers in range for the given type, no extra range checks are necessary; the signed parameter is only present to ensure that the internal range checks of int.to_bytes are satisfied.

def store_int(cx, v, ptr, nbytes, signed = False):
  cx.opts.memory[ptr : ptr+nbytes] = int.to_bytes(v, nbytes, 'little', signed = signed)

Floats are stored directly into memory, with the sign and payload bits of NaN values modified nondeterministically. This reflects the practical reality that different languages, protocols and CPUs have different effects on NaNs.

Although this nondeterminism is expressed in the Python code below as generating a "random" NaN bit-pattern, native implementations do not need to use the same "random" algorithm, or even any random algorithm at all. Hosts may instead chose to canonicalize to an arbitrary fixed NaN value, or even to the original value of the NaN before lifting, allowing them to optimize away both the canonicalization of lifting and the randomization of lowering.

When a host implements the deterministic profile, NaNs are canonicalized to a particular NaN bit-pattern.

def maybe_scramble_nan32(f):
  if math.isnan(f):
    if DETERMINISTIC_PROFILE:
      f = core_f32_reinterpret_i32(CANONICAL_FLOAT32_NAN)
    else:
      f = core_f32_reinterpret_i32(random_nan_bits(32, 8))
    assert(math.isnan(f))
  return f

def maybe_scramble_nan64(f):
  if math.isnan(f):
    if DETERMINISTIC_PROFILE:
      f = core_f64_reinterpret_i64(CANONICAL_FLOAT64_NAN)
    else:
      f = core_f64_reinterpret_i64(random_nan_bits(64, 11))
    assert(math.isnan(f))
  return f

def random_nan_bits(total_bits, exponent_bits):
  fraction_bits = total_bits - exponent_bits - 1
  bits = random.getrandbits(total_bits)
  bits |= ((1 << exponent_bits) - 1) << fraction_bits
  bits |= 1 << random.randrange(fraction_bits - 1)
  return bits

def encode_float_as_i32(f):
  return core_i32_reinterpret_f32(maybe_scramble_nan32(f))

def encode_float_as_i64(f):
  return core_i64_reinterpret_f64(maybe_scramble_nan64(f))

def core_i32_reinterpret_f32(f):
  return struct.unpack('<I', struct.pack('<f', f))[0] # i32.reinterpret_f32

def core_i64_reinterpret_f64(f):
  return struct.unpack('<Q', struct.pack('<d', f))[0] # i64.reinterpret_f64

The integral value of a char (a Unicode Scalar Value) is a valid unsigned i32 and thus no runtime conversion or checking is necessary:

def char_to_i32(c):
  i = ord(c)
  assert(0 <= i <= 0xD7FF or 0xE000 <= i <= 0x10FFFF)
  return i

Storing strings is complicated by the goal of attempting to optimize the different transcoding cases. In particular, one challenge is choosing the linear memory allocation size before examining the contents of the string. The reason for this constraint is that, in some settings where single-pass iterators are involved (host calls and post-MVP adapter functions), examining the contents of a string more than once would require making an engine-internal temporary copy of the whole string, which the component model specifically aims not to do. To avoid multiple passes, the canonical ABI instead uses a realloc approach to update the allocation size during the single copy. A blind realloc approach would normally suffer from multiple reallocations per string (e.g., using the standard doubling-growth strategy). However, as already shown in load_string above, string values come with two useful hints: their original encoding and number of source code units. From this hint data, store_string can do a much better job minimizing the number of reallocations.

We start with a case analysis to enumerate all the meaningful encoding combinations, subdividing the latin1+utf16 encoding into either latin1 or utf16 based on the utf16_tag flag set by load_string:

def store_string(cx, v: String, ptr):
  begin, tagged_code_units = store_string_into_range(cx, v)
  store_int(cx, begin, ptr, cx.opts.memory.ptr_size())
  store_int(cx, tagged_code_units, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())

def store_string_into_range(cx, v: String):
  src, src_encoding, src_tagged_code_units = v
  tag = utf16_tag(cx.opts.memory.ptr_type())

  if src_encoding == 'latin1+utf16':
    if bool(src_tagged_code_units & tag):
      src_simple_encoding = 'utf16'
      src_code_units = src_tagged_code_units ^ tag
    else:
      src_simple_encoding = 'latin1'
      src_code_units = src_tagged_code_units
  else:
    src_simple_encoding = src_encoding
    src_code_units = src_tagged_code_units

  match cx.opts.string_encoding:
    case 'utf8':
      match src_simple_encoding:
        case 'utf8'         : return store_string_copy(cx, src, src_code_units, 1, 1, 'utf-8')
        case 'utf16'        : return store_utf16_to_utf8(cx, src, src_code_units)
        case 'latin1'       : return store_latin1_to_utf8(cx, src, src_code_units)
    case 'utf16':
      match src_simple_encoding:
        case 'utf8'         : return store_utf8_to_utf16(cx, src, src_code_units)
        case 'utf16'        : return store_string_copy(cx, src, src_code_units, 2, 2, 'utf-16-le')
        case 'latin1'       : return store_string_copy(cx, src, src_code_units, 2, 2, 'utf-16-le')
    case 'latin1+utf16':
      match src_encoding:
        case 'utf8'         : return store_string_to_latin1_or_utf16(cx, src, src_code_units)
        case 'utf16'        : return store_string_to_latin1_or_utf16(cx, src, src_code_units)
        case 'latin1+utf16' :
          match src_simple_encoding:
            case 'latin1'   : return store_string_copy(cx, src, src_code_units, 1, 2, 'latin-1')
            case 'utf16'    : return store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units)

The simplest 4 cases above can compute the exact destination size and then copy with a simply loop (that possibly inflates Latin-1 to UTF-16 by injecting a 0 byte after every Latin-1 byte).

def store_string_copy(cx, src, src_code_units, dst_code_unit_size, dst_alignment, dst_encoding):
  dst_byte_length = dst_code_unit_size * src_code_units
  assert(dst_byte_length <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, dst_alignment, dst_byte_length)
  trap_if(ptr != align_to(ptr, dst_alignment))
  trap_if(ptr + dst_byte_length > len(cx.opts.memory))
  encoded = src.encode(dst_encoding)
  assert(dst_byte_length == len(encoded))
  cx.opts.memory[ptr : ptr+len(encoded)] = encoded
  return (ptr, src_code_units)

The 2 cases of transcoding into UTF-8 share an algorithm that starts by optimistically assuming that each code unit of the source string fits in a single UTF-8 byte and then, failing that, reallocates to a worst-case size, finishes the copy, and then finishes with a shrinking reallocation.

def store_utf16_to_utf8(cx, src, src_code_units):
  worst_case_size = src_code_units * 3
  return store_string_to_utf8(cx, src, src_code_units, worst_case_size)

def store_latin1_to_utf8(cx, src, src_code_units):
  worst_case_size = src_code_units * 2
  return store_string_to_utf8(cx, src, src_code_units, worst_case_size)

def store_string_to_utf8(cx, src, src_code_units, worst_case_size):
  assert(src_code_units <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, 1, src_code_units)
  trap_if(ptr + src_code_units > len(cx.opts.memory))
  for i,code_point in enumerate(src):
    if ord(code_point) < 2**7:
      cx.opts.memory[ptr + i] = ord(code_point)
    else:
      assert(worst_case_size <= REALLOC_I32_MAX)
      ptr = cx.opts.realloc(ptr, src_code_units, 1, worst_case_size)
      trap_if(ptr + worst_case_size > len(cx.opts.memory))
      encoded = src.encode('utf-8')
      cx.opts.memory[ptr+i : ptr+len(encoded)] = encoded[i : ]
      if worst_case_size > len(encoded):
        ptr = cx.opts.realloc(ptr, worst_case_size, 1, len(encoded))
        trap_if(ptr + len(encoded) > len(cx.opts.memory))
      return (ptr, len(encoded))
  return (ptr, src_code_units)

Converting from UTF-8 to UTF-16 performs an initial worst-case size allocation (assuming each UTF-8 byte encodes a whole code point that inflates into a two-byte UTF-16 code unit) and then does a shrinking reallocation at the end if multiple UTF-8 bytes were collapsed into a single 2-byte UTF-16 code unit:

def store_utf8_to_utf16(cx, src, src_code_units):
  worst_case_size = 2 * src_code_units
  assert(worst_case_size <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, 2, worst_case_size)
  trap_if(ptr != align_to(ptr, 2))
  trap_if(ptr + worst_case_size > len(cx.opts.memory))
  encoded = src.encode('utf-16-le')
  cx.opts.memory[ptr : ptr+len(encoded)] = encoded
  if len(encoded) < worst_case_size:
    ptr = cx.opts.realloc(ptr, worst_case_size, 2, len(encoded))
    trap_if(ptr != align_to(ptr, 2))
    trap_if(ptr + len(encoded) > len(cx.opts.memory))
  code_units = int(len(encoded) / 2)
  return (ptr, code_units)

The next transcoding case handles latin1+utf16 encoding, where there general goal is to fit the incoming string into Latin-1 if possible based on the code points of the incoming string. The algorithm speculates that all code points do fit into Latin-1 and then falls back to a worst-case allocation size when a code point is found outside Latin-1. In this fallback case, the previously-copied Latin-1 bytes are inflated in place, inserting a 0 byte after every Latin-1 byte (iterating in reverse to avoid clobbering later bytes):

def store_string_to_latin1_or_utf16(cx, src, src_code_units):
  assert(src_code_units <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, 2, src_code_units)
  trap_if(ptr != align_to(ptr, 2))
  trap_if(ptr + src_code_units > len(cx.opts.memory))
  dst_byte_length = 0
  for usv in src:
    if ord(usv) < (1 << 8):
      cx.opts.memory[ptr + dst_byte_length] = ord(usv)
      dst_byte_length += 1
    else:
      worst_case_size = 2 * src_code_units
      assert(worst_case_size <= REALLOC_I32_MAX)
      ptr = cx.opts.realloc(ptr, src_code_units, 2, worst_case_size)
      trap_if(ptr != align_to(ptr, 2))
      trap_if(ptr + worst_case_size > len(cx.opts.memory))
      for j in range(dst_byte_length-1, -1, -1):
        cx.opts.memory[ptr + 2*j] = cx.opts.memory[ptr + j]
        cx.opts.memory[ptr + 2*j + 1] = 0
      encoded = src.encode('utf-16-le')
      cx.opts.memory[ptr+2*dst_byte_length : ptr+len(encoded)] = encoded[2*dst_byte_length : ]
      if worst_case_size > len(encoded):
        ptr = cx.opts.realloc(ptr, worst_case_size, 2, len(encoded))
        trap_if(ptr != align_to(ptr, 2))
        trap_if(ptr + len(encoded) > len(cx.opts.memory))
      tagged_code_units = int(len(encoded) / 2) | utf16_tag(cx.opts.memory.ptr_type())
      return (ptr, tagged_code_units)
  if dst_byte_length < src_code_units:
    ptr = cx.opts.realloc(ptr, src_code_units, 2, dst_byte_length)
    trap_if(ptr != align_to(ptr, 2))
    trap_if(ptr + dst_byte_length > len(cx.opts.memory))
  return (ptr, dst_byte_length)

The final transcoding case takes advantage of the extra heuristic information that the incoming UTF-16 bytes were intentionally chosen over Latin-1 by the producer, indicating that they probably contain code points outside Latin-1 and thus probably require inflation. Based on this information, the transcoding algorithm pessimistically allocates storage for UTF-16, deflating at the end if indeed no non-Latin-1 code points were encountered. This Latin-1 deflation ensures that if a group of components are all using latin1+utf16 and one component over-uses UTF-16, other components can recover the Latin-1 compression. (The Latin-1 check can be inexpensively fused with the UTF-16 validate+copy loop.)

def store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units):
  src_byte_length = 2 * src_code_units
  assert(src_byte_length <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, 2, src_byte_length)
  trap_if(ptr != align_to(ptr, 2))
  trap_if(ptr + src_byte_length > len(cx.opts.memory))
  encoded = src.encode('utf-16-le')
  cx.opts.memory[ptr : ptr+len(encoded)] = encoded
  if any(ord(c) >= (1 << 8) for c in src):
    tagged_code_units = int(len(encoded) / 2) | utf16_tag(cx.opts.memory.ptr_type())
    return (ptr, tagged_code_units)
  latin1_size = int(len(encoded) / 2)
  for i in range(latin1_size):
    cx.opts.memory[ptr + i] = cx.opts.memory[ptr + 2*i]
  ptr = cx.opts.realloc(ptr, src_byte_length, 1, latin1_size)
  trap_if(ptr + latin1_size > len(cx.opts.memory))
  return (ptr, latin1_size)

Error context values are lowered by storing them directly into the current component instance's handles table and passing the i32 index to wasm:

def lower_error_context(cx, v):
  return cx.inst.handles.add(v)

Lists and records are stored by recursively storing their elements and are symmetric to the loading functions. Unlike strings, lists can simply allocate based on the up-front knowledge of length and static element size. Storing a list that exceeds the size of a 32-bit memory traps even when storing on 64-bit platform to avoid having interfaces that 32-bit components can't use.

def store_list(cx, v, ptr, elem_type, maybe_length):
  if maybe_length is not None:
    assert(maybe_length == len(v))
    store_list_into_valid_range(cx, v, ptr, elem_type)
    return
  begin, length = store_list_into_range(cx, v, elem_type)
  store_int(cx, begin, ptr, cx.opts.memory.ptr_size())
  store_int(cx, length, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())

def store_list_into_range(cx, v, elem_type):
  byte_length = len(v) * elem_size(elem_type, cx.opts.memory.ptr_type())
  assert(byte_length <= REALLOC_I32_MAX)
  ptr = cx.opts.realloc(0, 0, alignment(elem_type, cx.opts.memory.ptr_type()), byte_length)
  trap_if(ptr != align_to(ptr, alignment(elem_type, cx.opts.memory.ptr_type())))
  trap_if(ptr + byte_length > len(cx.opts.memory))
  store_list_into_valid_range(cx, v, ptr, elem_type)
  return (ptr, len(v))

def store_list_into_valid_range(cx, v, ptr, elem_type):
  for i,e in enumerate(v):
    store(cx, e, elem_type, ptr + i * elem_size(elem_type, cx.opts.memory.ptr_type()))

def store_record(cx, v, ptr, fields):
  for f in fields:
    ptr = align_to(ptr, alignment(f.t, cx.opts.memory.ptr_type()))
    store(cx, v[f.label], f.t, ptr)
    ptr += elem_size(f.t, cx.opts.memory.ptr_type())

Variant values are represented as Python dictionaries containing exactly one entry whose key is the label of the lifted case and whose value is the (optional) case payload. While this code appears to do an O(n) search of the variant type for a matching case label, a normal implementation can statically fuse store_variant with its matching load_variant to ultimately build a dense array that maps producer's case indices to the consumer's case indices.

def store_variant(cx, v, ptr, cases):
  case_index, case_value = match_case(v, cases)
  disc_size = elem_size(discriminant_type(cases), cx.opts.memory.ptr_type())
  store_int(cx, case_index, ptr, disc_size)
  ptr += disc_size
  ptr = align_to(ptr, max_case_alignment(cases, cx.opts.memory.ptr_type()))
  c = cases[case_index]
  if c.t is not None:
    store(cx, case_value, c.t, ptr)

def match_case(v, cases):
  [label] = v.keys()
  [index] = [i for i,c in enumerate(cases) if c.label == label]
  [value] = v.values()
  return (index, value)

Flags are converted from a dictionary to a bit-vector by iterating through the labels of the flags type in the order they were listed in the type definition and OR-ing all the bits together. Flag lifting/lowering can be statically fused into array/integer operations (with a simple byte copy when the case lists are the same) to avoid any string operations in a similar manner to variants.

def store_flags(cx, v, ptr, labels):
  i = pack_flags_into_int(v, labels)
  store_int(cx, i, ptr, elem_size_flags(labels))

def pack_flags_into_int(v, labels):
  i = 0
  shift = 0
  for l in labels:
    i |= (int(bool(v[l])) << shift)
    shift += 1
  return i

Finally, own and borrow handles are lowered by initializing new handle elements in the current component instance's handles table. The increment of num_borrows is complemented by a decrement in canon_resource_drop and ensures that all borrowed handles are dropped before the end of the task.

def lower_own(cx, rep, t):
  h = ResourceHandle(t.rt, rep, own = True)
  return cx.inst.handles.add(h)

def lower_borrow(cx, rep, t):
  assert(isinstance(cx.borrow_scope, Task))
  if cx.inst is t.rt.impl:
    return rep
  h = ResourceHandle(t.rt, rep, own = False, borrow_scope = cx.borrow_scope)
  h.borrow_scope.num_borrows += 1
  return cx.inst.handles.add(h)

The special case in lower_borrow is an optimization, recognizing that, when a borrowed handle is passed to the component that implemented the resource type, the only thing the borrowed handle is good for is calling resource.rep, so lowering might as well avoid the overhead of creating an intermediate borrow handle.

Lowering a stream or future is entirely symmetric and simply adds a new readable end to the current component instance's handles table, passing the index of the new element to core wasm:

def lower_stream(cx, v, t):
  assert(isinstance(v, ReadableStream))
  assert(not contains_borrow(t))
  return cx.inst.handles.add(ReadableStreamEnd(v))

def lower_future(cx, v, t):
  assert(isinstance(v, ReadableFuture))
  assert(not contains_borrow(t))
  return cx.inst.handles.add(ReadableFutureEnd(v))

Flattening

With only the definitions above, the Canonical ABI would be forced to place all parameters and results in linear memory. While this is necessary in the general case, in many cases performance can be improved by passing small-enough values in registers by using core function parameters and results. To support this optimization, the Canonical ABI defines flatten_functype to map component function types to core function types by attempting to decompose all the non-dynamically-sized component value types into core value types.

For a variety of practical reasons, we need to limit the total number of flattened parameters and results, falling back to storing everything in linear memory. The number of flattened results is currently limited to 1 due to various parts of the toolchain (notably the C ABI) not yet being able to express multi-value returns. Hopefully this limitation is temporary and can be lifted before the Component Model is fully standardized.

When there are too many flat values, in general, a single i32 pointer can be passed instead (pointing to a tuple in linear memory). When lowering into linear memory, this requires the Canonical ABI to call realloc (in lower below) to allocate space to put the tuple. As an optimization, when lowering the return value of an imported function (via canon lower), the caller can have already allocated space for the return value (e.g., efficiently on the stack), passing in an i32 pointer as an parameter instead of returning an i32 as a return value.

Given all this, the top-level definition of flatten_functype is:

MAX_FLAT_PARAMS = 16
MAX_FLAT_ASYNC_PARAMS = 4
MAX_FLAT_RESULTS = 1

def flatten_functype(opts, ft, context):
  flat_params = flatten_types(ft.param_types(), opts)
  flat_results = flatten_types(ft.result_type(), opts)
  if not opts.async_:
    if len(flat_params) > MAX_FLAT_PARAMS:
      flat_params = [opts.memory.ptr_type()]
    if len(flat_results) > MAX_FLAT_RESULTS:
      match context:
        case 'lift':
          flat_results = [opts.memory.ptr_type()]
        case 'lower':
          flat_params += [opts.memory.ptr_type()]
          flat_results = []
    return CoreFuncType(flat_params, flat_results)
  else:
    match context:
      case 'lift':
        if len(flat_params) > MAX_FLAT_PARAMS:
          flat_params = [opts.memory.ptr_type()]
        if opts.callback:
          flat_results = ['i32']
        else:
          flat_results = []
      case 'lower':
        if len(flat_params) > MAX_FLAT_ASYNC_PARAMS:
          flat_params = [opts.memory.ptr_type()]
        if len(flat_results) > 0:
          flat_params += [opts.memory.ptr_type()]
        flat_results = ['i32']
    return CoreFuncType(flat_params, flat_results)

def flatten_types(ts, opts):
  return [ft for t in ts for ft in flatten_type(t, opts)]

As shown here, the core signatures of async-lowered functions use a lower limit on the maximum number of parameters (4) and results (0) passed as scalars before falling back to passing through memory.

Presenting the definition of flatten_type piecewise, we start with the top-level case analysis:

def flatten_type(t, opts):
  match despecialize(t):
    case BoolType()                       : return ['i32']
    case U8Type() | U16Type() | U32Type() : return ['i32']
    case S8Type() | S16Type() | S32Type() : return ['i32']
    case S64Type() | U64Type()            : return ['i64']
    case F32Type()                        : return ['f32']
    case F64Type()                        : return ['f64']
    case CharType()                       : return ['i32']
    case StringType()                     : return [opts.memory.ptr_type(), opts.memory.ptr_type()]
    case ErrorContextType()               : return ['i32']
    case ListType(t, l)                   : return flatten_list(t, l, opts)
    case RecordType(fields)               : return flatten_record(fields, opts)
    case VariantType(cases)               : return flatten_variant(cases, opts)
    case FlagsType(labels)                : return ['i32']
    case OwnType() | BorrowType()         : return ['i32']
    case StreamType() | FutureType()      : return ['i32']

List flattening of a fixed-length list uses the same flattening as a tuple (via flatten_record below).

def flatten_list(elem_type, maybe_length, opts):
  if maybe_length is not None:
    return flatten_type(elem_type, opts) * maybe_length
  return [opts.memory.ptr_type(), opts.memory.ptr_type()]

Record flattening simply flattens each field in sequence.

def flatten_record(fields, opts):
  flat = []
  for f in fields:
    flat += flatten_type(f.t, opts)
  return flat

Variant flattening is more involved due to the fact that each case payload can have a totally different flattening. Rather than giving up when there is a type mismatch, the Canonical ABI relies on the fact that the 4 core value types can be easily bit-cast between each other and defines a join operator to pick the tightest approximation. What this means is that, regardless of the dynamic case, all flattened variants are passed with the same static set of core types, which may involve, e.g., reinterpreting an f32 as an i32 or zero-extending an i32 into an i64.

def flatten_variant(cases, opts):
  flat = []
  for c in cases:
    if c.t is not None:
      for i,ft in enumerate(flatten_type(c.t, opts)):
        if i < len(flat):
          flat[i] = join(flat[i], ft)
        else:
          flat.append(ft)
  return flatten_type(discriminant_type(cases), opts) + flat

def join(a, b):
  if a == b: return a
  if (a == 'i32' and b == 'f32') or (a == 'f32' and b == 'i32'): return 'i32'
  return 'i64'

Flat Lifting

Values are lifted by iterating over a list of parameter or result Core WebAssembly values:

class CoreValueIter:
  values: list[int|float]
  i: int

  def __init__(self, vs):
    self.values = vs
    self.i = 0

  def next(self, t):
    v = self.values[self.i]
    self.i += 1
    match t:
      case 'i32': assert(isinstance(v, int) and 0 <= v < 2**32)
      case 'i64': assert(isinstance(v, int) and 0 <= v < 2**64)
      case 'f32': assert(isinstance(v, (int,float)))
      case 'f64': assert(isinstance(v, (int,float)))
      case _    : assert(False)
    return v

  def done(self):
    return self.i == len(self.values)

The match is only used for spec-level assertions; no runtime typecase is required.

The lift_flat function defines how to convert a list of core values into a single high-level value of type t. Presenting the definition of lift_flat piecewise, we start with the top-level case analysis:

def lift_flat(cx, vi, t):
  match despecialize(t):
    case BoolType()         : return convert_int_to_bool(vi.next('i32'))
    case U8Type()           : return lift_flat_unsigned(vi, 32, 8)
    case U16Type()          : return lift_flat_unsigned(vi, 32, 16)
    case U32Type()          : return lift_flat_unsigned(vi, 32, 32)
    case U64Type()          : return lift_flat_unsigned(vi, 64, 64)
    case S8Type()           : return lift_flat_signed(vi, 32, 8)
    case S16Type()          : return lift_flat_signed(vi, 32, 16)
    case S32Type()          : return lift_flat_signed(vi, 32, 32)
    case S64Type()          : return lift_flat_signed(vi, 64, 64)
    case F32Type()          : return canonicalize_nan32(vi.next('f32'))
    case F64Type()          : return canonicalize_nan64(vi.next('f64'))
    case CharType()         : return convert_i32_to_char(cx, vi.next('i32'))
    case StringType()       : return lift_flat_string(cx, vi)
    case ErrorContextType() : return lift_error_context(cx, vi.next('i32'))
    case ListType(t, l)     : return lift_flat_list(cx, vi, t, l)
    case RecordType(fields) : return lift_flat_record(cx, vi, fields)
    case VariantType(cases) : return lift_flat_variant(cx, vi, cases)
    case FlagsType(labels)  : return lift_flat_flags(vi, labels)
    case OwnType()          : return lift_own(cx, vi.next('i32'), t)
    case BorrowType()       : return lift_borrow(cx, vi.next('i32'), t)
    case StreamType(t)      : return lift_stream(cx, vi.next('i32'), t)
    case FutureType(t)      : return lift_future(cx, vi.next('i32'), t)

Integers are lifted from core i32 or i64 values using the signedness of the target type to interpret the high-order bit. When the target type is narrower than an i32, the Canonical ABI ignores the unused high bits (like load_int). The conversion logic here assumes that i32 values are always represented as unsigned Python ints and thus lifting to a signed type performs a manual 2s complement conversion in the Python (which would be a no-op in hardware).

def lift_flat_unsigned(vi, core_width, t_width):
  i = vi.next('i' + str(core_width))
  assert(0 <= i < (1 << core_width))
  return i % (1 << t_width)

def lift_flat_signed(vi, core_width, t_width):
  i = vi.next('i' + str(core_width))
  assert(0 <= i < (1 << core_width))
  i %= (1 << t_width)
  if i >= (1 << (t_width - 1)):
    return i - (1 << t_width)
  return i

The contents of strings and variable-length lists are stored in memory so lifting these types is essentially the same as loading them from memory; the only difference is that the pointer and length come from ptr-sized values instead of from linear memory. Fixed-length lists are lifted the same way as a tuple.

def lift_flat_string(cx, vi):
  ptr = vi.next(cx.opts.memory.ptr_type())
  packed_length = vi.next(cx.opts.memory.ptr_type())
  return load_string_from_range(cx, ptr, packed_length)

def lift_flat_list(cx, vi, elem_type, maybe_length):
  if maybe_length is not None:
    a = []
    for i in range(maybe_length):
      a.append(lift_flat(cx, vi, elem_type))
    return a
  ptr = vi.next(cx.opts.memory.ptr_type())
  length = vi.next(cx.opts.memory.ptr_type())
  return load_list_from_range(cx, ptr, length, elem_type)

Records are lifted by recursively lifting their fields:

def lift_flat_record(cx, vi, fields):
  record = {}
  for f in fields:
    record[f.label] = lift_flat(cx, vi, f.t)
  return record

Variants are also lifted recursively. Lifting a variant must carefully follow the definition of flatten_variant above, consuming the exact same core types regardless of the dynamic case payload being lifted. Because of the join performed by flatten_variant, we need a more-permissive value iterator that reinterprets between the different types appropriately and also wraps i64 values to 32-bit when needed:

def lift_flat_variant(cx, vi, cases):
  flat_types = flatten_variant(cases, cx.opts)
  assert(flat_types.pop(0) == 'i32')
  case_index = vi.next('i32')
  trap_if(case_index >= len(cases))
  class CoerceValueIter:
    def next(self, want):
      have = flat_types.pop(0)
      x = vi.next(have)
      match (have, want):
        case ('i32', 'f32') : return decode_i32_as_float(x)
        case ('i64', 'i32') : return wrap_i64_to_i32(x)
        case ('i64', 'f32') : return decode_i32_as_float(wrap_i64_to_i32(x))
        case ('i64', 'f64') : return decode_i64_as_float(x)
        case _              : assert(have == want); return x
  c = cases[case_index]
  if c.t is None:
    v = None
  else:
    v = lift_flat(cx, CoerceValueIter(), c.t)
  for have in flat_types:
    _ = vi.next(have)
  return { c.label: v }

def wrap_i64_to_i32(i):
  assert(0 <= i < (1 << 64))
  return i % (1 << 32)

Finally, flags are lifted by lifting to a record the same way as when loading flags from linear memory.

def lift_flat_flags(vi, labels):
  assert(0 < len(labels) <= 32)
  i = vi.next('i32')
  return unpack_flags_from_int(i, labels)

Flat Lowering

The lower_flat function defines how to convert a value v of a given type t into zero or more core values. Presenting the definition of lower_flat piecewise, we start with the top-level case analysis:

def lower_flat(cx, v, t):
  match despecialize(t):
    case BoolType()         : return [int(v)]
    case U8Type()           : return [v]
    case U16Type()          : return [v]
    case U32Type()          : return [v]
    case U64Type()          : return [v]
    case S8Type()           : return lower_flat_signed(v, 32)
    case S16Type()          : return lower_flat_signed(v, 32)
    case S32Type()          : return lower_flat_signed(v, 32)
    case S64Type()          : return lower_flat_signed(v, 64)
    case F32Type()          : return [maybe_scramble_nan32(v)]
    case F64Type()          : return [maybe_scramble_nan64(v)]
    case CharType()         : return [char_to_i32(v)]
    case StringType()       : return lower_flat_string(cx, v)
    case ErrorContextType() : return [lower_error_context(cx, v)]
    case ListType(t, l)     : return lower_flat_list(cx, v, t, l)
    case RecordType(fields) : return lower_flat_record(cx, v, fields)
    case VariantType(cases) : return lower_flat_variant(cx, v, cases)
    case FlagsType(labels)  : return lower_flat_flags(v, labels)
    case OwnType()          : return [lower_own(cx, v, t)]
    case BorrowType()       : return [lower_borrow(cx, v, t)]
    case StreamType(t)      : return [lower_stream(cx, v, t)]
    case FutureType(t)      : return [lower_future(cx, v, t)]

Since component-level values are assumed in-range and, as previously stated, core i32 values are always internally represented as unsigned ints, unsigned integer values need no extra conversion. Signed integer values are converted to unsigned core i32 or i64 values by 2s complement arithmetic (which again would be a no-op in hardware):

def lower_flat_signed(i, core_bits):
  if i < 0:
    i += (1 << core_bits)
  return [i]

Since strings and variable-length lists are stored in linear memory, lowering can reuse the previous definitions; only the resulting pointers are returned differently (as flat values instead of as a pair in linear memory). Fixed-length lists are lowered the same way as tuples.

def lower_flat_string(cx, v):
  ptr, packed_length = store_string_into_range(cx, v)
  return [ptr, packed_length]

def lower_flat_list(cx, v, elem_type, maybe_length):
  if maybe_length is not None:
    assert(maybe_length == len(v))
    flat = []
    for e in v:
      flat += lower_flat(cx, e, elem_type)
    return flat
  (ptr, length) = store_list_into_range(cx, v, elem_type)
  return [ptr, length]

Records are lowered by recursively lowering their fields:

def lower_flat_record(cx, v, fields):
  flat = []
  for f in fields:
    flat += lower_flat(cx, v[f.label], f.t)
  return flat

Variants are also lowered recursively. Symmetric to lift_flat_variant above, lower_flat_variant must consume all flattened types of flatten_variant, manually coercing the otherwise-incompatible type pairings allowed by join:

def lower_flat_variant(cx, v, cases):
  case_index, case_value = match_case(v, cases)
  flat_types = flatten_variant(cases, cx.opts)
  assert(flat_types.pop(0) == 'i32')
  c = cases[case_index]
  if c.t is None:
    payload = []
  else:
    payload = lower_flat(cx, case_value, c.t)
    for i,(fv,have) in enumerate(zip(payload, flatten_type(c.t, cx.opts))):
      want = flat_types.pop(0)
      match (have, want):
        case ('f32', 'i32') : payload[i] = encode_float_as_i32(fv)
        case ('i32', 'i64') : payload[i] = fv
        case ('f32', 'i64') : payload[i] = encode_float_as_i32(fv)
        case ('f64', 'i64') : payload[i] = encode_float_as_i64(fv)
        case _              : assert(have == want)
  for _ in flat_types:
    payload.append(0)
  return [case_index] + payload

Finally, flags are lowered by packing the flags into an i32 bitvector.

def lower_flat_flags(v, labels):
  assert(0 < len(labels) <= 32)
  return [pack_flags_into_int(v, labels)]

Lifting and Lowering Values

The lift_flat_values function defines how to lift a list of core parameters or results (given by the CoreValueIter vi) into a tuple of component-level values with types ts.

def lift_flat_values(cx, max_flat, vi, ts):
  flat_types = flatten_types(ts, cx.opts)
  if len(flat_types) > max_flat:
    ptr = vi.next(cx.opts.memory.ptr_type())
    tuple_type = TupleType(ts)
    trap_if(ptr != align_to(ptr, alignment(tuple_type, cx.opts.memory.ptr_type())))
    trap_if(ptr + elem_size(tuple_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
    return list(load(cx, ptr, tuple_type).values())
  else:
    return [ lift_flat(cx, vi, t) for t in ts ]

Symmetrically, the lower_flat_values function defines how to lower a list of component-level values vs of types ts into a list of core values. As already described for flatten_functype above, lowering handles the greater-than-max_flat case by either allocating storage with realloc or accepting a caller-allocated buffer as an out-param:

def lower_flat_values(cx, max_flat, vs, ts, out_param = None):
  cx.inst.may_leave = False
  flat_types = flatten_types(ts, cx.opts)
  if len(flat_types) > max_flat:
    tuple_type = TupleType(ts)
    tuple_value = {str(i): v for i,v in enumerate(vs)}
    if out_param is None:
      ptr = cx.opts.realloc(0, 0, alignment(tuple_type, cx.opts.memory.ptr_type()), elem_size(tuple_type, cx.opts.memory.ptr_type()))
      flat_vals = [ptr]
    else:
      ptr = out_param.next(cx.opts.memory.ptr_type())
      flat_vals = []
    trap_if(ptr != align_to(ptr, alignment(tuple_type, cx.opts.memory.ptr_type())))
    trap_if(ptr + elem_size(tuple_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
    store(cx, tuple_value, tuple_type, ptr)
  else:
    flat_vals = []
    for i in range(len(vs)):
      flat_vals += lower_flat(cx, vs[i], ts[i])
  cx.inst.may_leave = True
  return flat_vals

The may_leave flag is guarded by canon_lower below to prevent a component from calling out of the component while in the middle of lowering, ensuring that the relative ordering of the side effects of lifting followed by lowering cannot be observed and thus an implementation may reliably fuse lifting with lowering when making a cross-component call to avoid the intermediate copy.

Canonical Definitions

Using the above supporting definitions, we can describe the static and dynamic semantics of component-level canon definitions. The following subsections cover each of these canon cases.

canonopt Validation

Canonical options, often referred to as $opts in the definitions below, can be specified at most once in any particular list of options. For example specifying string-encoding=utf8 twice is an error. Each individual option, if present, is validated as such:

  • string-encoding=N - can be passed at most once, regardless of N.
  • memory - this is a subtype of (memory 0)
    • ๐Ÿ˜ memory may also be a subtype of (memory i64 0)
  • realloc - the function has type (func (param addr addr addr addr) (result addr)) where addr is the address type coming from the memtype of the memory canonopt (restricted to i32, but with ๐Ÿ˜ may also be i64).
  • if realloc is present then memory must be present
  • post-return - only allowed on canon lift, which has rules for validation
  • ๐Ÿ”€ async - is only allowed when used with an async function type in canon lift or canon lower and cannot be present with post-return
  • ๐Ÿ”€,not(๐ŸšŸ) async - callback must also be present. Note that with the ๐ŸšŸ feature (the "stackful" ABI), this restriction is lifted.
  • ๐Ÿ”€ callback - the function has type (func (param i32 i32 i32) (result i32)) and cannot be present without async and is only allowed with canon lift

Additionally some options are required depending on lift/lower operations performed for a component. These are defined as:

  • lower(T)

    • requires memory if T contains a list or string
  • lift(T)

    • requires realloc if T contains a list or string

canon lift

For a canonical definition:

(canon lift $callee:<funcidx> $opts:<canonopt>* (func $f (type $ft)))

In addition to general validation of $opts the additional validation is performed:

  • $callee must have type flatten_functype($opts, $ft, 'lift')
  • $f is given type $ft
  • if a post-return is present, it has type (func (param flatten_functype($opts, $ft, 'lift').results))
  • requires options based on lift(param) for all parameters in ft
  • requires options based on lower(result) if ft has a result
  • if len(flatten_types(ft.param_types())) > MAX_FLAT_PARAMS, realloc is required
  • if len(flatten_types(ft.result_type())) > max (where max = MAX_FLAT_RESULTS for sync lifts, and max = MAX_FLAT_PARAMS for async lifts), memory is required

Note that an async-lifted function whose result type requires a memory to lift (either because it contains lists or strings or because the number of flattened types exceeds MAX_FLAT_PARAMS) must include a memory option, and that option must exactly match that of the task.return built-in called at runtime.

When instantiating a component instance, the runtime calls Store.lift (defined above) to capture the $callee, $ft and $opts immediates of canon lift along with the component instance being instantiated. These are then passed into canon_lift every time the generated FuncInst is called, along with the runtime on_start, on_resolve and caller arguments.

Based on this, canon_lift is defined in chunks as follows. The whole call executes in a new implicit thread defined here by thread_func. The first thing this implicit thread does is to wait for any backpressure, as defined by Task.enter_implicit_thread above:

def canon_lift(callee, ft, opts, inst, on_start, on_resolve, caller) -> OnCancel:
  def thread_func():
    if not task.enter_implicit_thread():
      return

Once the backpressure gate is cleared, the arguments are lowered into core wasm values and memory according to the canonopt immediates of canon lift (as defined by lower_flat_values above). Note that if the caller cancels a task while the task is still waiting on backpressure, the call will be aborted before the arguments are lowered and thus owned handles are not transferred to the callee. The CANCELLED_BEFORE_{STARTED,RETURNED} return codes from subtask.cancel are thus necessary for the caller to distinguish whether their arguments were lowered or not.

    cx = LiftLowerContext(opts, inst, task)
    args = task.start()
    flat_args = lower_flat_values(cx, MAX_FLAT_PARAMS, args, ft.param_types())
    flat_ft = flatten_functype(opts, ft, 'lift')
    assert(types_match_values(flat_ft.params, flat_args))

If the async canonopt is not specified, a lifted function then calls the core wasm callee, passing the lowered arguments in core function parameters and receiving the return value as core function results. Once the core results are lifted according to lift_flat_values above, the optional post-return function (specified as a canonopt immediate of canon lift) is called, passing the same core wasm results as parameters so that the post-return function can free any associated allocations.

    if not opts.async_:
      flat_results = call_and_trap_on_throw(callee, flat_args)
      assert(types_match_values(flat_ft.results, flat_results))
      result = lift_flat_values(cx, MAX_FLAT_RESULTS, CoreValueIter(flat_results), ft.result_type())
      task.return_(result)
      if opts.post_return is not None:
        inst.may_leave = False
        [] = call_and_trap_on_throw(opts.post_return, flat_results)
        inst.may_leave = True
      task.exit_implicit_thread()
      return

By clearing may_leave for the duration of the post-return call, the Canonical ABI ensures that synchronously-lowered calls to synchronously-lifted functions can always be implemented by a plain synchronous function call without the need for fibers which would otherwise be necessary if the post-return function performed a blocking operation.

In both of the async cases below (with or without callback), the task.return built-in must be called, providing the return value as core wasm parameters to the task.return built-in (rather than as core function results as in the synchronous case). If task.return is not called by the time the Task's last Thread exits, there is a trap (in Task.unregister_thread).

In the async non-callback ("stackful async") case, there is a single call to the core wasm callee which must return empty core results. Waiting for async I/O happens by the callee synchronously calling built-ins like waitable-set.wait. Note that, since Task.enter_implicit_thread does not acquire the exclusive_thread lock for stackful async functions, calls to waitable-set.wait made by a stackful async function do not prevent any other threads from starting or resuming in the same component instance.

    if not opts.callback:
      [] = call_and_trap_on_throw(callee, flat_args)
      assert(types_match_values(flat_ft.results, []))
      task.exit_implicit_thread()
      return

Lastly, in the async callback ("stackless async") case, waiting happens by first calling the core wasm callee and then repeatedly calling the callback function (specified as a funcidx immediate in canon lift) until the EXIT code (0) is returned:

    [packed] = call_and_trap_on_throw(callee, flat_args)
    code,si = unpack_callback_result(packed)
    while code != CallbackCode.EXIT:
      assert(task.needs_exclusive() and inst.exclusive_thread is task.implicit_thread)
      inst.exclusive_thread = None
      match code:
        case CallbackCode.YIELD:
          cancelled = thread.wait_until(lambda: not inst.exclusive_thread, cancellable = True)
          if cancelled:
            event = (EventCode.TASK_CANCELLED, 0, 0)
          else:
            event = (EventCode.NONE, 0, 0)
        case CallbackCode.WAIT:
          wset = inst.handles.get(si)
          trap_if(not isinstance(wset, WaitableSet))
          event = wset.wait_for_event_and(lambda: not inst.exclusive_thread, cancellable = True)
        case _:
          trap()
      assert(inst.exclusive_thread is None)
      inst.exclusive_thread = task.implicit_thread
      event_code, p1, p2 = event
      [packed] = call_and_trap_on_throw(opts.callback, [event_code, p1, p2])
      code,si = unpack_callback_result(packed)
    task.exit_implicit_thread()
    return

The Thread.wait_until and WaitableSet.wait_for_event_and methods called by the event loop are the same methods called by the thread.yield and waitable-set.wait built-ins. Thus, the main difference between stackful and stackless async is whether these suspending operations are performed from an empty or non-empty core wasm callstack (with the former allowing additional engine optimization).

The event loop releases ComponentInstance.exclusive_thread (which was acquired by Task.enter_implicit_thread) before potentially blocking the thread to allow other needs_exclusive tasks to execute in the interim. However, the exclusive_thread lock is held throughout each core wasm invocation from the event loop to maintain Component Invariant #3. Thus, async callback-lifted tasks allow more concurrency than synchronously-lifted tasks (which only release the exclusive_thread lock after they've returned) but less concurrency than (stackful) non-callback async-lifted tasks, which entirely ignore exclusive_thread.

The end of canon_lift creates a new task/thread pair for the call and then calls Thread.resume on the new thread to synchronously transfer control flow to it (jumping to the top of thread_func above). The new thread executes until it either returns from thread_func or blocks by (transitively) calling Thread.block_internal. If a non-async-typed call blocks before the implicit thread has returned a value and there are no other ready threads in the same component instance, canon_lift traps, since non-async-typed calls may not block. Otherwise, canon_lift switches to a thread (nondeterministically, if multiple are ready), as if the guest code had done so itself using a built-in like thread.suspend-then-promote. This allows fully-synchronous components to still use cooperative pthreads that interleave via threading built-ins (e.g., thread.yield) and even perform blocking I/O as long as the blocking I/O does not transitively block returning a value to the caller (as would also be expressible with a CPS transform like Asyncify). Lastly, canon_lift returns Task.request_cancellation, bound to the call's new task, as the OnCancel return value of FuncInst.

  task = Task(ft, opts, inst, on_start, on_resolve, caller)
  thread = Thread(task, thread_func)
  thread.resume()
  if not ft.async_:
    while task.state != Task.State.RESOLVED:
      candidates = { t for t in inst.threads if t.ready() and t is not inst.exclusive_thread }
      trap_if(not candidates)
      random.choice(list(candidates)).resume()
  return task.request_cancellation

The special case that excludes any thread (created by a previous blocked async call) holding the instance's exclusive_thread lock is necessary to preserve Component Invariant #3, which might otherwise be violated if the current synchronous call is using the single global linear memory shadow stack.

Note that, because non-async-typed functions can't block, they do not actually require a separate thread/fiber/stack to implement the above specified behavior and a normal synchronous native function call can be used instead. Even if the non-async-typed callee uses cooperative threads, since control flow must inevitably make its way back to the implicit thread to return a value (task.return is not allowed), the caller's stack can always be used for the callee's implicit thread, knowing that control flow must switch back before returning to the caller (or else trap and tear down the whole store).

The bit-packing scheme used for the i32 packed return value is defined as follows:

class CallbackCode(IntEnum):
  EXIT = 0
  YIELD = 1
  WAIT = 2
  MAX = 2

def unpack_callback_result(packed):
  code = packed & 0xf
  trap_if(code > CallbackCode.MAX)
  assert(packed < 2**32)
  assert(Table.MAX_LENGTH < 2**28)
  waitable_set_index = packed >> 4
  return (CallbackCode(code), waitable_set_index)

The ability to asynchronously wait, yield and exit is thus available to both the callback and non-callback cases, making callback just an optimization to avoid allocating stacks for async languages that have avoided the need for stackful coroutines by design (e.g., async/await in JS, Python, C# and Rust).

Uncaught Core WebAssembly exceptions or, in a future with stack-switching, unhandled events, result in a trap at component boundaries. Thus, if a component wishes to signal an error, it must use some sort of explicit type such as result (whose error case particular language bindings may choose to map to and from exceptions):

def call_and_trap_on_throw(callee, args):
  try:
    return callee(args)
  except CoreWebAssemblyException:
    trap()

canon lower

For a canonical definition:

(canon lower $callee:<funcidx> $opts:<canonopt>* (core func $f))

In addition to general validation of $opts, additional validation is performed where $callee has type $ft:

  • $f is given type flatten_functype($opts, $ft, 'lower')
  • requires options based on lower(param) for all parameters in ft
  • requires options based on lift(result) if ft has a result
  • if len(flatten_types(ft.param_types())) > max_flat_params, memory is required
  • if len(flatten_types(ft.result_type())) > max_flat_results, realloc is required
  • ๐Ÿ”€ if async is specified, memory must be present

When instantiating a component instance, the runtime calls Store.lower (defined above) to capture the $callee, $ft and $opts immediates of canon lower along with the component instance being instantiated. These are then passed into canon_lower every time the generated CoreFuncInst is called, along with the runtime Core WebAssembly arguments.

Based on this, canon_lower is defined in chunks as follows. First, each call to canon_lower creates a new Subtask. However, this Subtask is only added to the current component instance's handles table (below) if async is specified and callee blocks. In any case, this Subtask is used as the LiftLowerContext.borrow_scope for borrow arguments, ensuring that owned handles are not dropped before Subtask.deliver_resolve is called (below).

def canon_lower(callee, ft, opts, flat_args: list[CoreValType]) -> list[CoreValType]:
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  subtask = Subtask()
  cx = LiftLowerContext(opts, thread.task.inst, subtask)

The next chunk makes the call to callee using the opts immediates of the canon lower definition to configure lift_flat_values and lower_flat_values (both defined above) and the current instance as the caller.

  flat_ft = flatten_functype(opts, ft, 'lower')
  assert(types_match_values(flat_ft.params, flat_args))
  flat_args = CoreValueIter(flat_args)

  if not opts.async_:
    max_flat_params = MAX_FLAT_PARAMS
    max_flat_results = MAX_FLAT_RESULTS
  else:
    max_flat_params = MAX_FLAT_ASYNC_PARAMS
    max_flat_results = 0

  on_progress = lambda:()
  flat_results = None

  def on_start():
    on_progress()
    assert(subtask.state == Subtask.State.STARTING)
    subtask.state = Subtask.State.STARTED
    return lift_flat_values(cx, max_flat_params, flat_args, ft.param_types())

  def on_resolve(result):
    on_progress()
    if result is None:
      assert(subtask.cancellation_requested)
      if subtask.state == Subtask.State.STARTING:
        subtask.state = Subtask.State.CANCELLED_BEFORE_STARTED
      else:
        assert(subtask.state == Subtask.State.STARTED)
        subtask.state = Subtask.State.CANCELLED_BEFORE_RETURNED
    else:
      assert(subtask.state == Subtask.State.STARTED)
      subtask.state = Subtask.State.RETURNED
      nonlocal flat_results
      flat_results = lower_flat_values(cx, max_flat_results, result, ft.result_type(), flat_args)

  subtask.on_cancel = callee(on_start, on_resolve, caller = thread.task.inst)
  assert(ft.async_ or subtask.state == Subtask.State.RETURNED)

The Subtask.state field is updated by the callbacks to keep track of the call progress. The on_progress variable starts as a no-op, but is used by the async case below to trigger event delivery.

According to the FuncInst calling contract, the call to callee should never "block" (i.e., wait on I/O). If the callee would block, it will instead return an OnCancel callback which is stored in the Subtask (so that it can be used to request cancellation in the future). Furthermore, if the function type does not have the async effect, the function must have returned a value.

In the synchronous case (when the async canonopt is not set), if the callee blocked before calling on_resolve, the synchronous caller's thread is non-cancellably suspended until the callee calls on_resolve to return a value. Note that just because the callee called on_resolve doesn't mean that the callee has finished execution: async functions are allowed to keep executing after returning their value. However, if the callee is also synchronous then (since post-return is prevented from blocking via may_leave) the callee cannot keep executing concurrently after returning a value and thus the implementation can avoid the creation of any Thread and use a plain synchronous function call instead, as expected.

  if not opts.async_:
    if not subtask.resolved():
      thread.wait_until(subtask.resolved)
    assert(types_match_values(flat_ft.results, flat_results))
    subtask.deliver_resolve()
    return flat_results

The call to Subtask.deliver_resolve decrements the counters on handles that were lent for borrowed parameters during the call. These counters are necessary even during a synchronous call to prevent a concurrent async task from dropping lent handles while the synchronous call is blocked.

In the async case, if the callee already called on_resolve, then the RETURNED code is eagerly returned to the core wasm caller without needing to add a Subtask to the current component instance's handles table. Otherwise, the index of a new Subtask is returned, bit-packed with the current state of the Subtask (which will either be STARTING or STARTED). STARTING tells the caller that they need to keep the memory for both the arguments and results allocated; STARTED tells the caller that the arguments have been read and thus any argument memory can be reused, but the result buffer has to be kept reserved.

  else:
    if subtask.resolved():
      assert(flat_results == [])
      subtask.deliver_resolve()
      return [Subtask.State.RETURNED]
    else:
      subtaski = thread.task.inst.handles.add(subtask)
      def on_progress():
        def subtask_event():
          if subtask.resolved():
            subtask.deliver_resolve()
          return (EventCode.SUBTASK, subtaski, subtask.state)
        subtask.set_pending_event(subtask_event)
      assert(0 < subtaski <= Table.MAX_LENGTH < 2**28)
      assert(0 <= subtask.state < 2**4)
      return [subtask.state | (subtaski << 4)]

When on_start and on_resolve are called after this initial async-lowered call returns, the on_progress callback (called by on_start and on_resolve) will set a pending event on the Subtask (which derives Waitable) so that it can be waited on via waitable-set.{wait,poll} or, if a callback is used, by returning to the event loop. If on_start is called followed by on_resolve before core wasm receives the first event, core wasm will only receive the second event, not two events. Note Subtask.drop prevents (via trap) a Subtask from being dropped before on_resolve is called and the event is delivered to core wasm to ensure that Subtask.deliver_resolve always performs its lend-count accounting.

canon resource.new

For a canonical definition:

(canon resource.new $rt (core func $f))

validation specifies:

  • $rt must refer to locally-defined (not imported) resource type
  • $f is given type (func (param $rt.rep) (result i32)), where $rt.rep is fixed to be i32
    • ๐Ÿ˜ - $rt.rep may be i32 or i64

Calling $f invokes the following function, which adds an owning handle containing the given resource representation to the current component instance's handles table:

def canon_resource_new(rt, rep):
  inst = current_instance()
  trap_if(not inst.may_leave)
  h = ResourceHandle(rt, rep, own = True)
  i = inst.handles.add(h)
  return [i]

canon resource.drop

For a canonical definition:

(canon resource.drop $rt $async? (core func $f))

validation specifies:

  • $rt must refer to resource type
  • $f is given type (func (param i32))

Calling $f invokes the following function, which removes the handle from the current component instance's handles table and, if the handle was owning, calls the resource's destructor.

def canon_resource_drop(rt, i):
  inst = current_instance()
  trap_if(not inst.may_leave)
  h = inst.handles.remove(i)
  trap_if(not isinstance(h, ResourceHandle))
  trap_if(h.rt is not rt)
  trap_if(h.num_lends != 0)
  if h.own:
    assert(h.borrow_scope is None)
    opts = CanonicalOptions(async_ = False)
    ft = FuncType([U32Type()], [], async_ = False)
    dtor = rt.dtor or (lambda rep: [])
    callee = inst.store.lift(dtor, ft, opts, rt.impl)
    caller = inst.store.lower(callee, ft, opts, inst)
    caller([h.rep])
  else:
    h.borrow_scope.num_borrows -= 1
  return []

The call to a resource's destructor passes the i32 representation value that was previously supplied to resource.new. The call works like a normal non-async cross-component call, using the same canon_lift and canon_lower rules to, for example, catch reentrance. Because the type, lifting and lowering are all non-async, the destructor may not block. However, the destructor may spawn a cooperative thread that does.

In particular, Store.lift may trap (if rt.impl.may_enter_from(inst) is False) if the call to the destructor would reenter the destructor's instance in a way that violates Component Invariant #2. In the special case where the current_instance is the same as the destructor's instance, may_enter_from will always return True (because the set of instances being freshly entered is empty) and so, as one might expect, component instances can resource.drop the owned handles of the resources they implement.

canon resource.rep

For a canonical definition:

(canon resource.rep $rt (core func $f))

validation specifies:

  • $rt must refer to a locally-defined (not imported) resource type
  • $f is given type (func (param i32) (result $rt.rep)), where $rt.rep is fixed to be i32
    • ๐Ÿ˜ - $rt.rep may be i32 or i64

Calling $f invokes the following function, which extracts the resource representation from the handle in the current component instance's handles table:

def canon_resource_rep(rt, i):
  h = current_instance().handles.get(i)
  trap_if(not isinstance(h, ResourceHandle))
  trap_if(h.rt is not rt)
  return [h.rep]

Note that the "locally-defined" requirement above ensures that only the component instance defining a resource can access its representation.

๐Ÿ”€ canon context.get

For a canonical definition:

(canon context.get $t $i (core func $f))

validation specifies:

  • $t must be i32 (see here).
    • ๐Ÿ˜ - $t may also be i64. All context.get and context.set built-ins defined in a single component must specify the same $t.
  • $i must be less than 2
  • $f is given type (func (result $t))

Calling $f invokes the following function, which reads the thread-local storage of the current thread.

def canon_context_get(t, i):
  thread = current_thread()
  assert(t == 'i32' or t == 'i64')
  assert(i < len(thread.storage))
  result = thread.storage[i]
  assert(result < (2 ** (ptr_size(t) * 8)))
  return [result]

๐Ÿ”€ canon context.set

For a canonical definition:

(canon context.set $t $i (core func $f))

validation specifies:

  • $t must be i32 (see here)
    • ๐Ÿ˜ - $t may also be i64. All context.get and context.set built-ins defined in a single component must specify the same $t.
  • $i must be less than 2
  • $f is given type (func (param $v $t))

Calling $f invokes the following function, which writes to the thread-local storage of the current thread:

def canon_context_set(t, i, v):
  thread = current_thread()
  assert(t == 'i32' or t == 'i64')
  assert(i < len(thread.storage))
  assert(v < (2 ** (ptr_size(t) * 8)))
  thread.storage[i] = v
  return []

๐Ÿ”€โœ• canon backpressure.set

This built-in is deprecated in favor of backpressure.{inc,dec} and will be removed once producer tools have transitioned. Producer tools should avoid emitting calls to both set and inc/dec since set will clobber the counter.

For a canonical definition:

(canon backpressure.set (core func $f))

validation specifies:

  • $f is given type (func (param $enabled i32))

Calling $f invokes the following function, which sets the backpressure counter to 1 or 0. Task.enter_implicit_thread waits for backpressure to be 0 before allowing new async-typed tasks to start.

def canon_backpressure_set(flat_args):
  assert(len(flat_args) == 1)
  current_instance().backpressure = int(bool(flat_args[0]))
  return []

๐Ÿ”€ canon backpressure.{inc,dec}

For a canonical definition:

(canon backpressure.inc (core func $inc))
(canon backpressure.dec (core func $dec))

validation specifies:

  • $inc/$dec are given type (func)

Calling $inc or $dec invokes one of the following functions:

def canon_backpressure_inc():
  inst = current_instance()
  assert(0 <= inst.backpressure < 2**16)
  inst.backpressure += 1
  trap_if(inst.backpressure == 2**16)
  return []

def canon_backpressure_dec():
  inst = current_instance()
  assert(0 <= inst.backpressure < 2**16)
  inst.backpressure -= 1
  trap_if(inst.backpressure < 0)
  return []

Task.enter_implicit_thread waits for backpressure to return to 0 before allowing new async-typed tasks to start, implementing backpressure.

๐Ÿ”€ canon task.return

For a canonical definition:

(canon task.return (result $t)? $opts (core func $f))

In addition to general validation of $opts validation specifies:

  • $f is given type flatten_functype($opts, (func (param $t)?), 'lower')
  • $opts may only contain memory and string-encoding
  • lift($f.result) above defines required options

Calling $f invokes the following function which lifts the results from core wasm state and passes them to the current task's caller via Task.return_:

def canon_task_return(result_type, opts: LiftOptions, flat_args):
  task = current_task()
  trap_if(not task.inst.may_leave)
  trap_if(not task.opts.async_)
  trap_if(result_type != task.ft.result)
  trap_if(not LiftOptions.equal(opts, task.opts))
  cx = LiftLowerContext(opts, task.inst, task)
  result = lift_flat_values(cx, MAX_FLAT_PARAMS, CoreValueIter(flat_args), task.ft.result_type())
  task.return_(result)
  return []

The trap_if(not task.opts.async_) prevents task.return from being called by synchronously-lifted functions (which return their value by returning from the lifted core function).

The trap_if(result_type != task.ft.result) guard ensures that, in a component with multiple exported functions of different types, task.return is not called with a mismatched result type (which, due to indirect control flow, can in general only be caught dynamically).

The trap_if(not LiftOptions.equal(opts, task.opts)) guard ensures that the return value is lifted the same way as the canon lift from which this task.return is returning. This ensures that AOT fusion of canon lift and canon lower can generate a thunk that is indirectly called by task.return after these guards. Inside LiftOptions.equal, opts.memory is compared with task.opts.memory via object identity of the mutable memory instance. Since memory refers to a mutable instance of memory, this comparison is not concerned with the static memory indices (in canon lift and canon task.return), only the identity of the memories created at instantiation-/ run-time. In Core WebAssembly spec terms, the test is on the equality of the memaddr values stored in the instance's memaddrs table which is indexed by the static memidx.

๐Ÿ”€ canon task.cancel

For a canonical definition:

(canon task.cancel (core func $f))

validation specifies:

  • $f is given type (func)

Calling $f cancels the current task, confirming a previous subtask.cancel request made by a supertask and claiming that all borrow handles lent to the current task have already been dropped (and trapping in Task.cancel if not).

def canon_task_cancel():
  task = current_task()
  trap_if(not task.inst.may_leave)
  trap_if(not task.opts.async_)
  task.cancel()
  return []

The trap_if(not task.opts.async_) prevents task.cancel from being called by synchronously-lifted functions (which must always return a value by returning from the lifted core function).

Task.cancel also traps if the cancellation has not been delivered to the task (in which case the callee should not yet know to cancel) or if the task has already returned a value or already called task.cancel.

๐Ÿ”€ canon waitable-set.new

For a canonical definition:

(canon waitable-set.new (core func $f))

validation specifies:

  • $f is given type (func (result i32))

Calling $f invokes the following function, which adds an empty waitable set to the current component instance's handles table:

def canon_waitable_set_new():
  inst = current_instance()
  trap_if(not inst.may_leave)
  i = inst.handles.add(WaitableSet())
  return [i]

๐Ÿ”€ canon waitable-set.wait

For a canonical definition:

(canon waitable-set.wait $cancellable? (memory $mem) (core func $f))

validation specifies:

  • $f is given type (func (param $si i32) (param $ptr T) (result i32)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the address type of $mem

Calling $f invokes the following function which waits for progress to be made on a Waitable in the given waitable set (indicated by index $si) and then returning its EventCode and writing the payload values into linear memory:

def canon_waitable_set_wait(cancellable, mem, si, ptr):
  task = current_task()
  trap_if(not task.inst.may_leave)
  wset = task.inst.handles.get(si)
  trap_if(not isinstance(wset, WaitableSet))
  event = wset.wait_for_event(cancellable)
  return unpack_event(mem, task.inst, ptr, event)

def unpack_event(mem, inst, ptr, e: EventTuple):
  event, p1, p2 = e
  cx = LiftLowerContext(LiftLowerOptions(memory = mem), inst)
  store(cx, p1, U32Type(), ptr)
  store(cx, p2, U32Type(), ptr + 4)
  return [event]

If cancellable is set, then waitable-set.wait will return whether the supertask has already or concurrently requested cancellation. waitable-set.wait (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿ”€ canon waitable-set.poll

For a canonical definition:

(canon waitable-set.poll $cancellable? (memory $mem) (core func $f))

validation specifies:

  • $f is given type (func (param $si i32) (param $ptr T) (result i32)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the address type of $mem

Calling $f invokes the following function, which either returns an event that was pending on one of the waitables in the given waitable set (the same way as waitable-set.wait) or, if there is none, returns 0.

def canon_waitable_set_poll(cancellable, mem, si, ptr):
  inst = current_instance()
  trap_if(not inst.may_leave)
  wset = inst.handles.get(si)
  trap_if(not isinstance(wset, WaitableSet))
  event = wset.poll(cancellable)
  return unpack_event(mem, inst, ptr, event)

If cancellable is set, then waitable-set.poll will return whether the supertask has already or concurrently requested cancellation. waitable-set.poll (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿ”€ canon waitable-set.drop

For a canonical definition:

(canon waitable-set.drop (core func $f))

validation specifies:

  • $f is given type (func (param i32))

Calling $f invokes the following function, which removes the indicated waitable set from the current component instance's handles table, performing the guards defined by WaitableSet.drop above:

def canon_waitable_set_drop(i):
  inst = current_instance()
  trap_if(not inst.may_leave)
  wset = inst.handles.remove(i)
  trap_if(not isinstance(wset, WaitableSet))
  wset.drop()
  return []

Note that WaitableSet.drop will trap if it is non-empty or there is a concurrent waitable-set.wait or async callback currently using this waitable set.

๐Ÿ”€ canon waitable.join

For a canonical definition:

(canon waitable.join (core func $f))

validation specifies:

  • $f is given type (func (param $wi i32) (param $si i32))

Calling $f invokes the following function which adds the waitable indicated by the index wi to the waitable set indicated by the index si, removing the waitable from any waitable set that it is currently a member of.

def canon_waitable_join(wi, si):
  inst = current_instance()
  trap_if(not inst.may_leave)
  w = inst.handles.get(wi)
  trap_if(not isinstance(w, Waitable))
  trap_if(w.has_sync_waiter)
  if si == 0:
    w.join(None)
  else:
    wset = inst.handles.get(si)
    trap_if(not isinstance(wset, WaitableSet))
    w.join(wset)
  return []

As described with the definition of Waitable above, to prevent surprising deadlocks, a waitable that is currently being synchronously waited on traps if added to a waitable set.

Note that tables do not allow elements at index 0, so 0 is a valid sentinel that tells join to remove the given waitable from any set that it is currently a part of. Waitables can be a member of at most one set, so if the given waitable is already in one set, it will be transferred.

๐Ÿ”€ canon subtask.cancel

For a canonical definition:

(canon subtask.cancel async? (core func $f))

validation specifies:

  • $f is given type (func (param i32) (result i32))
  • ๐Ÿš - async is allowed (otherwise it must be absent)

Calling $f sends a request to a nondeterministically-chosen thread of the subtask at the given index to cancel the subtask ASAP. This request is cooperative and the subtask may take arbitrarily long to receive and confirm the request. If the subtask doesn't immediately confirm the cancellation request, subtask.cancel returns BLOCKED and the caller must wait for a SUBTASK progress update using waitable-set methods as usual.

When cancellation is confirmed the supertask will receive the final state of the subtask which is one of:

  • RETURNED, if the subtask successfully returned a value via task.return;
  • CANCELLED_BEFORE_STARTED, if the subtask was cancelled before receiving its arguments (and thus no own handles were transferred); or
  • CANCELLED_BEFORE_RETURNED, if the subtask called task.cancel instead of task.return.

This state is either returned by subtask.cancel, if the subtask resolved without blocking, or, if subtask.cancel returns BLOCKED, then as part of the event payload of a future SUBTASK event.

BLOCKED = 0xffff_ffff

def canon_subtask_cancel(async_, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  subtask = thread.task.inst.handles.get(i)
  trap_if(not isinstance(subtask, Subtask))
  trap_if(subtask.resolve_delivered())
  trap_if(subtask.cancellation_requested)
  trap_if(subtask.in_waitable_set() and not async_)
  if subtask.resolved():
    assert(subtask.has_pending_event())
  else:
    subtask.cancellation_requested = True
    subtask.on_cancel()
    if not subtask.resolved():
      if not async_:
        subtask.wait_for_pending_event()
      else:
        return [BLOCKED]
  code,index,payload = subtask.get_pending_event()
  assert(code == EventCode.SUBTASK and index == i and payload == subtask.state)
  assert(subtask.resolve_delivered())
  return [subtask.state]

subtask.cancel starts by trapping if called twice for the same subtask or if the supertask has already been notified that the subtask has returned or if the subtask is already being asynchronously waited on via waitable set.

A race condition handled by the above code is that it's possible for a subtask to have already resolved (by calling task.return or task.cancel) and updated the state stored in the Subtask (such that Subtask.resolved() is True) but this fact has not yet been delivered to the supertask by the supertask calling get_pending_event on the Subtask in its table. This distinction is captured by Subtask.resolved vs. Subtask.resolve_delivered.

๐Ÿ”€ canon subtask.drop

For a canonical definition:

(canon subtask.drop (core func $f))

validation specifies:

  • $f is given type (func (param i32))

Calling $f removes the subtask at the given index from the current component instance's handles table, performing the guards and bookkeeping defined by Subtask.drop().

def canon_subtask_drop(i):
  inst = current_instance()
  trap_if(not inst.may_leave)
  s = inst.handles.remove(i)
  trap_if(not isinstance(s, Subtask))
  s.drop()
  return []

๐Ÿ”€ canon {stream,future}.new

For canonical definitions:

(canon stream.new $stream_t (core func $f))
(canon future.new $future_t (core func $f))

validation specifies:

  • $f is given type (func (result i64))
  • $stream_t/$future_t must be a type of the form (stream $t?)/(future $t?)

Calling $f calls canon_{stream,future}_new which adds two elements to the current component instance's handles table and returns their indices packed into a single i64. The first element (in the low 32 bits) is the readable end (of the new {stream, future}) and the second element (in the high 32 bits) is the writable end. The expectation is that, after calling {stream,future}.new, the readable end is subsequently transferred to another component (or the host) via stream or future parameter/result type (see lift_{stream,future} above).

def canon_stream_new(stream_t):
  inst = current_instance()
  trap_if(not inst.may_leave)
  shared = SharedStreamImpl(stream_t.t)
  ri = inst.handles.add(ReadableStreamEnd(shared))
  wi = inst.handles.add(WritableStreamEnd(shared))
  return [ ri | (wi << 32) ]

def canon_future_new(future_t):
  inst = current_instance()
  trap_if(not inst.may_leave)
  shared = SharedFutureImpl(future_t.t)
  ri = inst.handles.add(ReadableFutureEnd(shared))
  wi = inst.handles.add(WritableFutureEnd(shared))
  return [ ri | (wi << 32) ]

๐Ÿ”€ canon stream.{read,write}

For canonical definitions:

(canon stream.read $stream_t $opts (core func $f))
(canon stream.write $stream_t $opts (core func $f))

In addition to general validation of $opts validation specifies:

  • $f is given type (func (param i32 T T) (result T)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the address type of memory from $opts (or i32 by default if no memory is present)
  • $stream_t must be a type of the form (stream $t?)
  • If $t is present:
    • lower($t) above defines required options for stream.write
    • lift($t) above defines required options for stream.read
    • memory is required to be present
  • ๐Ÿš - async is allowed to be omitted, otherwise it must be present

The implementation of these built-ins funnels down to a single stream_copy function that is parameterized by the direction of the copy:

def canon_stream_read(stream_t, opts, i, ptr, n):
  return stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
                     stream_t, opts, i, ptr, n)

def canon_stream_write(stream_t, opts, i, ptr, n):
  return stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
                     stream_t, opts, i, ptr, n)

Introducing the stream_copy function in chunks, first, the element at index i is checked to be of the right type and allowed to start a new copy. (In the future, the "trap if not IDLE" condition could be relaxed to allow multiple pipelined reads or writes.) There is also a trap if attempting to synchronously read or write from a stream that is already being asynchronously waited on via waitable set.

def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  e = thread.task.inst.handles.get(i)
  trap_if(not isinstance(e, EndT))
  trap_if(e.shared.t != stream_t.t)
  trap_if(e.state != CopyState.IDLE)
  trap_if(e.in_waitable_set() and not opts.async_)

Then a readable or writable buffer is created which (in Buffer's constructor) eagerly checks the alignment and bounds of (ptr, n). (In the future, the restriction on futures/streams containing borrows could be relaxed by maintaining sufficient bookkeeping state to ensure that borrowed handles or streams/futures of borrowed handles could not outlive their originating call. Additionally, stream<char> will be allowed and defined to encode and decode according to the string-encoding.)

  assert(not isinstance(stream_t, CharType))
  assert(not contains_borrow(stream_t))
  cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None)
  buffer = BufferT(stream_t.t, cx, ptr, n)

Next, the copy method of {Readable,Writable}{Stream,Future}End is called to perform the actual read/write. The on_copy* callbacks passed to copy bind and store a stream_event closure on the readable/writable end (via the inherited Waitable.set_pending_event) which will be called right before the event is delivered to core wasm. stream_event first calls reclaim_buffer to regain ownership of buffer and prevent any further partial reads/writes. Thus, up until event delivery, the other end of the stream is free to repeatedly read/write from/to buffer, ideally filling it up and minimizing context switches. Next, the stream's state is updated based on the result being delivered to core wasm so that, once a stream end has been notified that the other end dropped, calling anything other than stream.drop-* traps. Lastly, stream_event packs the CopyResult and number of elements copied up until this point into a single i32 or i64-sized payload for core wasm. The size is determined by the addrtype coming from the memtype of the memory immediate. Note that even though the number of elements copied is packed into an addrtype, the maximum length of the buffer is fixed at 2^28 - 1 independently of the addrtype.

  def stream_event(result, reclaim_buffer):
    reclaim_buffer()
    assert(e.copying())
    if result == CopyResult.DROPPED:
      e.state = CopyState.DONE
    else:
      e.state = CopyState.IDLE
    assert(0 <= result < 2**4)
    assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
    packed_result = result | (buffer.progress << 4)
    return (event_code, i, packed_result)

  def on_copy(reclaim_buffer):
    e.set_pending_event(partial(stream_event, CopyResult.COMPLETED, reclaim_buffer))

  def on_copy_done(result):
    e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:()))

  e.state = CopyState.COPYING
  e.copy(thread.task.inst, buffer, on_copy, on_copy_done)

When this copy makes progress, a stream_event is set on the stream end's Waitable base object. If stream.{read,write} is called synchronously, the call suspends the current thread until an event is set, so that the event can be returned. Otherwise, asynchronous calls deliver the event if it was produced synchronously and return BLOCKED if not:

  if not e.has_pending_event():
    if not opts.async_:
      e.wait_for_pending_event()
    else:
      return [BLOCKED]
  code,index,payload = e.get_pending_event()
  assert(code == event_code and index == i and payload != BLOCKED)
  return [payload]

๐Ÿ”€ canon future.{read,write}

For canonical definitions:

(canon future.read $future_t $opts (core func $f))
(canon future.write $future_t $opts (core func $f))

In addition to general validation of $opts validation specifies:

  • $f is given type (func (param i32 T) (result i32)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the address type of memory from $opts (or i32 by default if no memory is present)
  • $future_t must be a type of the form (future $t?)
  • If $t is present:
    • lift($t) above defines required options for future.read
    • lower($t) above defines required options for future.write
    • memory is required to be present
  • ๐Ÿš - async is allowed to be omitted, otherwise it must be present

The implementation of these built-ins funnels down to a single future_copy function that is parameterized by the direction of the copy:

def canon_future_read(future_t, opts, i, ptr):
  return future_copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
                     future_t, opts, i, ptr)

def canon_future_write(future_t, opts, i, ptr):
  return future_copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
                     future_t, opts, i, ptr)

Introducing the future_copy function in chunks, future_copy starts with the same set of guards on the element i as stream_copy, except checking for a future end instead of a stream end:

def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  e = thread.task.inst.handles.get(i)
  trap_if(not isinstance(e, EndT))
  trap_if(e.shared.t != future_t.t)
  trap_if(e.state != CopyState.IDLE)
  trap_if(e.in_waitable_set() and not opts.async_)

Next, a readable or writable buffer is created, as with streams, except that the buffer length is fixed to 1 and there is no validation-time prohibition on future<char>:

  assert(not contains_borrow(future_t))
  cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None)
  buffer = BufferT(future_t.t, cx, ptr, 1)

Next, the copy method of {Readable,Writable}FutureEnd.copy is called to perform the actual read/write. Other than the simplifications allowed by the absence of repeated partial copies, the main difference in the following code from the stream code is that future_event transitions the end to the DONE state (in which the only valid operation is to call future.drop-*) on either the DROPPED and COMPLETED results. This ensures that futures are read/written at most once and futures are only passed to other components in a state where they are ready to be read/written. Another important difference is that, since the buffer length is always implied by the CopyResult, the number of elements copied is not packed in the high 28 bits; they're always zero.

  def future_event(result):
    assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED))
    assert(e.copying())
    if result == CopyResult.DROPPED or result == CopyResult.COMPLETED:
      e.state = CopyState.DONE
    else:
      e.state = CopyState.IDLE
    return (event_code, i, result)

  def on_copy_done(result):
    assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE)
    e.set_pending_event(partial(future_event, result))

  e.state = CopyState.COPYING
  e.copy(thread.task.inst, buffer, on_copy_done)

The end of future_copy is the exact same as stream_copy: waiting if called synchronously and returning either the progress made or BLOCKED.

  if not e.has_pending_event():
    if not opts.async_:
      e.wait_for_pending_event()
    else:
      return [BLOCKED]
  code,index,payload = e.get_pending_event()
  assert(code == event_code and index == i)
  return [payload]

๐Ÿ”€ canon {stream,future}.cancel-{read,write}

For canonical definitions:

(canon stream.cancel-read $stream_t $async? (core func $f))
(canon stream.cancel-write $stream_t $async? (core func $f))
(canon future.cancel-read $future_t $async? (core func $f))
(canon future.cancel-write $future_t $async? (core func $f))

validation specifies:

  • $f is given type (func (param i32) (result i32))
  • $stream_t/$future_t must be a type of the form (stream $t?)/(future $t?)
  • ๐Ÿš - async is allowed (otherwise it must be absent)

The implementation of these four built-ins all funnel down to a single parameterized cancel_copy function:

def canon_stream_cancel_read(stream_t, async_, i):
  return cancel_copy(ReadableStreamEnd, EventCode.STREAM_READ, stream_t, async_, i)

def canon_stream_cancel_write(stream_t, async_, i):
  return cancel_copy(WritableStreamEnd, EventCode.STREAM_WRITE, stream_t, async_, i)

def canon_future_cancel_read(future_t, async_, i):
  return cancel_copy(ReadableFutureEnd, EventCode.FUTURE_READ, future_t, async_, i)

def canon_future_cancel_write(future_t, async_, i):
  return cancel_copy(WritableFutureEnd, EventCode.FUTURE_WRITE, future_t, async_, i)

def cancel_copy(EndT, event_code, stream_or_future_t, async_, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  e = thread.task.inst.handles.get(i)
  trap_if(not isinstance(e, EndT))
  trap_if(e.shared.t != stream_or_future_t.t)
  trap_if(e.state != CopyState.COPYING or e.has_sync_waiter)
  trap_if(e.in_waitable_set() and not async_)
  e.state = CopyState.CANCELLING_COPY
  if not e.has_pending_event():
    e.shared.cancel()
    if not e.has_pending_event():
      if not async_:
        e.wait_for_pending_event()
      else:
        return [BLOCKED]
  code,index,payload = e.get_pending_event()
  assert(not e.copying() and code == event_code and index == i)
  return [payload]

Cancellation traps if there is not currently an async copy in progress (sync copies do not expect or check for cancellation and thus cannot be cancelled, and repeatedly cancelling the same async copy after the first call blocked is not allowed). There is also a trap if attempting to synchronously cancel a stream operation when the stream end is already being asynchronously waited on by a waitable set.

The first check for e.has_pending_event() catches the case where the copy has already racily finished, in which case we must not call cancel(). Calling cancel() may, but is not required to, recursively call one of the on_* callbacks (passed by canon_{stream,future}_{read,write} above) which will set a pending event that is caught by the second check for e.has_pending_event().

If the copy hasn't been cancelled, the synchronous case suspends the thread to wait for one of the on_* callbacks to eventually be called (which will set the pending event).

The asynchronous case simply returns BLOCKED and the client code must wait as usual for a {STREAM,FUTURE}_{READ,WRITE} event. In this case, cancellation has served only to asynchronously request that the host relinquish the buffer ASAP without waiting for anything to be read or written.

If BLOCKED is not returned, the pending event (which is necessarily a stream_event or future_event) is eagerly delivered to core wasm as the return value, thereby saving an additional turn of the event loop. In this case, the core wasm caller can assume that ownership of the buffer has been returned.

๐Ÿ”€ canon {stream,future}.drop-{readable,writable}

For canonical definitions:

(canon stream.drop-readable $stream_t (core func $f))
(canon stream.drop-writable $stream_t (core func $f))
(canon future.drop-readable $future_t (core func $f))
(canon future.drop-writable $future_t (core func $f))

validation specifies:

  • $f is given type (func (param i32))
  • $stream_t/$future_t must be a type of the form (stream $t?)/(future $t?)

Calling $f removes the readable or writable end of the stream or future at the given index from the current component instance's handles table, performing the guards and bookkeeping defined by {Readable,Writable}{Stream,Future}End.drop() above.

def canon_stream_drop_readable(stream_t, i):
  return drop(ReadableStreamEnd, stream_t, i)

def canon_stream_drop_writable(stream_t, hi):
  return drop(WritableStreamEnd, stream_t, hi)

def canon_future_drop_readable(future_t, i):
  return drop(ReadableFutureEnd, future_t, i)

def canon_future_drop_writable(future_t, hi):
  return drop(WritableFutureEnd, future_t, hi)

def drop(EndT, stream_or_future_t, hi):
  inst = current_instance()
  trap_if(not inst.may_leave)
  e = inst.handles.remove(hi)
  trap_if(not isinstance(e, EndT))
  trap_if(e.shared.t != stream_or_future_t.t)
  e.drop()
  return []

๐Ÿงต canon thread.index

For a canonical definition:

(canon thread.index (core func $index))

validation specifies:

  • $index is given type (func (result i32))

Calling $index invokes the following function, which extracts the index of the current thread:

def canon_thread_index():
  thread = current_thread()
  assert(thread.index is not None)
  return [thread.index]

๐Ÿงต canon thread.new-indirect

For a canonical definition:

(canon thread.new-indirect $ft $ftbl (core func $new_indirect))

validation specifies

  • $ft must refer to the type (func (param $c T)) where T is i32
    • ๐Ÿ˜ - T may be i32 or i64
  • $ftbl must refer to a table whose element type matches funcref
  • $new_indirect is given type (func (param $fi U) (param $c T) (result i32)) where T comes from $ft, as described above, and U is i32
    • ๐Ÿ˜ - U is i32 or i64 as determined by $ftbl's address type

Calling $new_indirect invokes the following function which reads a funcref from $ftbl (trapping if out-of-bounds, null or the wrong type), creates a new suspended thread that will call the funcref passing the closure parameter $c when resumed, and returns the index of the new thread in the current component instance's threads table.

@dataclass
class CoreFuncRef:
  t: CoreFuncType
  callee: Callable[[list[CoreValType]], list[CoreValType]]

def canon_thread_new_indirect(ft, ftbl: Table[CoreFuncRef], fi, c):
  task = current_task()
  trap_if(not task.inst.may_leave)
  f = ftbl.get(fi)
  assert(ft == CoreFuncType(['i32'], []) or ft == CoreFuncType(['i64'], []))
  trap_if(f.t != ft)
  def thread_func():
    [] = call_and_trap_on_throw(f.callee, [c])
    task.unregister_thread(new_thread)
  new_thread = Thread(task, thread_func)
  assert(new_thread.suspended())
  task.register_thread(new_thread)
  return [new_thread.index]

The newly-created thread starts out in a "suspended" state and so, to actually start executing, Core WebAssembly code must call one of the other thread.* built-ins defined below.

๐Ÿงต canon thread.resume-later

For a canonical definition:

(canon thread.resume-later (core func $resume-later))

validation specifies:

  • $resume-later is given type (func (param $i i32))

Calling $resume-later invokes the following function which loads a thread at index $i from the current component instance's threads table, traps if it's not suspended, and then marks that thread as ready to run at some nondeterministic point in the future chosen by the embedder.

def canon_thread_resume_later(i):
  inst = current_instance()
  trap_if(not inst.may_leave)
  other_thread = inst.threads.get(i)
  trap_if(not other_thread.suspended())
  other_thread.resume_later()
  return []

thread.resume-later never suspends the current thread and so there is no possibility of cancellation and thus no cancellable immediate.

๐Ÿงต canon thread.suspend

For a canonical definition:

(canon thread.suspend $cancellable? (core func $suspend))

validation specifies:

  • $suspend is given type (func (result i32))

Calling $suspend invokes the following function which suspends the current thread, immediately returning control flow to any transitive async-lowered calling component.

def canon_thread_suspend(cancellable):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  cancelled = thread.suspend(cancellable)
  return [cancelled]

If cancellable is set, then thread.suspend will return a Cancelled value to indicate whether the supertask has already or concurrently requested cancellation. thread.suspend (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿงต canon thread.yield

For a canonical definition:

(canon thread.yield $cancellable? (core func $yield))

validation specifies:

  • $yield is given type (func (result i32))

Calling $yield invokes the following function which yields execution so that others threads can execute, leaving the current thread ready to run at some nondeterministic point in the future chosen by the embedder. This allows a long-running computation that is not otherwise performing I/O to avoid starving other threads in a cooperative setting.

def canon_thread_yield(cancellable):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  cancelled = thread.yield_(cancellable)
  return [cancelled]

If cancellable is set, then thread.yield will return a Cancelled value indicating whether the supertask has already or concurrently requested cancellation. thread.yield (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿงต canon thread.suspend-then-resume

For a canonical definition:

(canon thread.suspend-then-resume $cancellable? (core func $suspend-then-resume))

validation specifies:

  • $suspend-then-resume is given type (func (param $i i32) (result i32))

Calling $suspend-then-resume invokes the following function which loads a thread at index $i from the current component instance's threads table, traps if it's not suspended, and then switches to that thread, leaving the current thread suspended.

def canon_thread_suspend_then_resume(cancellable, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  other_thread = thread.task.inst.threads.get(i)
  trap_if(not other_thread.suspended())
  cancelled = thread.suspend_then_resume(cancellable, other_thread)
  return [cancelled]

If cancellable is set, then thread.suspend-then-resume will return a Cancelled value to indicate whether the supertask has already or concurrently requested cancellation. thread.suspend-then-resume (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿงต canon thread.yield-then-resume

For a canonical definition:

(canon thread.yield-then-resume $cancellable? (core func $yield-then-resume))

validation specifies:

  • $yield-then-resume is given type (func (param $i i32) (result i32))

Calling $yield-then-resume invokes the following function which loads a thread at index $i from the current component instance's threads table, traps if it's not suspended, and then switches to that thread, leaving the current thread ready to run at some nondeterministic point in the future chosen by the embedder.

def canon_thread_yield_then_resume(cancellable, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  other_thread = thread.task.inst.threads.get(i)
  trap_if(not other_thread.suspended())
  cancelled = thread.yield_then_resume(cancellable, other_thread)
  return [cancelled]

If cancellable is set, then thread.yield-then-resume will return a Cancelled value indicating whether the supertask has already or concurrently requested cancellation. thread.yield-then-resume (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿงต canon thread.suspend-then-promote

For a canonical definition:

(canon thread.suspend-then-promote $cancellable? (core func $suspend-then-promote))

validation specifies:

  • $suspend-then-promote is given type (func (param $i i32) (result i32))

Calling $suspend-then-promote invokes the following function which loads a thread at index $i from the current component instance's threads table and then calls Thread.suspend_then_resume to resume the other_thread if it's ready and, in any case, leave the current thread suspended.

def canon_thread_suspend_then_promote(cancellable, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  other_thread = thread.task.inst.threads.get(i)
  cancelled = thread.suspend_then_promote(cancellable, other_thread)
  return [cancelled]

If cancellable is set, then thread.suspend-then-promote will return a Cancelled value indicating whether the supertask has already or concurrently requested cancellation. thread.suspend-then-promote (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿงต canon thread.yield-then-promote

For a canonical definition:

(canon thread.yield-then-promote $cancellable? (core func $yield-then-promote))

validation specifies:

  • $yield-then-promote is given type (func (param $i i32) (result i32))

Calling $yield-then-promote invokes the following function which loads a thread at index $i from the current component instance's threads table and then calls Thread.yield_then_resume to resume the other_thread if it's ready and, in any case, leave the current thread ready to run at some nondeterministic point in the future chosen by the embedder.

def canon_thread_yield_then_promote(cancellable, i):
  thread = current_thread()
  trap_if(not thread.task.inst.may_leave)
  other_thread = thread.task.inst.threads.get(i)
  cancelled = thread.yield_then_promote(cancellable, other_thread)
  return [cancelled]

If cancellable is set, then thread.yield-then-promote will return a Cancelled value indicating whether the supertask has already or concurrently requested cancellation. thread.yield-then-promote (and other cancellable operations) will only indicate cancellation once and thus, if a caller is not prepared to propagate cancellation, they can omit cancellable so that cancellation is instead delivered at a later cancellable call.

๐Ÿ“ canon error-context.new

For a canonical definition:

(canon error-context.new $opts (core func $f))

validation specifies:

  • $f is given type (func (param $ptr T) (param $units T) (result i32)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the memory field of $opts
  • async is not present
  • memory must be present

Calling $f calls the following function which uses the $opts immediate to (nondeterministically) lift the debug message, create a new ErrorContext value, store it in the current component instance's handles table and returns its index.

@dataclass
class ErrorContext:
  debug_message: String

def canon_error_context_new(opts, ptr, tagged_code_units):
  inst = current_instance()
  trap_if(not inst.may_leave)
  if DETERMINISTIC_PROFILE or random.randint(0,1):
    s = String(('', 'utf8', 0))
  else:
    cx = LiftLowerContext(opts, inst)
    s = load_string_from_range(cx, ptr, tagged_code_units)
    s = host_defined_transformation(s)
  i = inst.handles.add(ErrorContext(s))
  return [i]

Supporting the requirement (introduced in the explainer) that wasm code does not depend on the contents of error-context values for behavioral correctness, the debug message is completely discarded nondeterministically or, in the deterministic profile, always. Importantly (for performance), when the debug message is discarded, it is not even lifted and thus the O(N) well-formedness conditions are not checked. (Note that host_defined_transformation is not defined by the Canonical ABI and stands for an arbitrary host-defined function.)

๐Ÿ“ canon error-context.debug-message

For a canonical definition:

(canon error-context.debug-message $opts (core func $f))

validation specifies:

  • $f is given type (func (param i32) (param $ptr T)) where T is i32
    • ๐Ÿ˜ - T is i32 or i64 as determined by the address type of memory from $opts
  • async is not present
  • memory must be present
  • realloc must be present

Calling $f calls the following function which uses the $opts immediate to lowers the ErrorContext's debug message. While producing an error-context value may nondeterministically discard or transform the debug message, a single error-context value must return the same debug message from error-context.debug-message over time.

def canon_error_context_debug_message(opts, i, ptr):
  inst = current_instance()
  trap_if(not inst.may_leave)
  errctx = inst.handles.get(i)
  trap_if(not isinstance(errctx, ErrorContext))
  cx = LiftLowerContext(opts, inst)
  store_string(cx, errctx.debug_message, ptr)
  return []

Note that ptr points to a region of memory (8 bytes for memory32, 16 bytes for memory64) into which will be stored the pointer and length of the debug string (allocated via opts.realloc).

๐Ÿ“ canon error-context.drop

For a canonical definition:

(canon error-context.drop (core func $f))

validation specifies:

  • $f is given type (func (param i32))

Calling $f calls the following function, which drops the error context value at the given index from the current component instance's handles table:

def canon_error_context_drop(i):
  inst = current_instance()
  trap_if(not inst.may_leave)
  errctx = inst.handles.remove(i)
  trap_if(not isinstance(errctx, ErrorContext))
  return []

๐Ÿงตโ‘ก canon thread.spawn-ref

For a canonical definition:

(canon thread.spawn-ref shared? $ft (core func $spawn_ref))

validation specifies:

  • $ft must refer to the type (shared? (func (param $c T))) where T is i32
    • ๐Ÿ˜ - T may be i32 or i64.
  • $spawn_ref is given type (shared? (func (param $f (ref null $ft)) (param $c T) (result $e i32))) where T comes from $ft as defined above

When the shared immediate is not present, the spawned thread is cooperative, only switching at specific program points. When the shared immediate is present, the spawned thread is preemptive and able to execute in parallel with all other threads.

Note: ideally, a thread could be spawned with arbitrary thread parameters. Currently, that would require additional work in the toolchain to support so, for simplicity, the current proposal simply fixes a single i32 or i64 parameter type. However, thread.spawn-ref could be extended to allow arbitrary thread parameters in the future, once it's concretely beneficial to the toolchain. The inclusion of $ft ensures backwards compatibility for when arbitrary parameters are allowed.

Calling $spawn_ref invokes the following function which simply fuses the thread.new-ref and thread.resume-later built-ins, allowing thread-creation to skip the intermediate "suspended" state transition.

def canon_thread_spawn_ref(shared, ft, f, c):
  trap_if(not current_instance().may_leave)
  if DETERMINISTIC_PROFILE:
    return [0]
  [new_thread_index] = canon_thread_new_ref(shared, ft, f, c)
  [] = canon_thread_resume_later(shared, new_thread_index)
  return [new_thread_index]

Note: canon_thread_new_ref has not yet been defined, but will be added as part of adding a GC ABI option to the Canonical ABI and would work like canon_thread_new_indirect minus the table access and type check.

๐Ÿงตโ‘ก canon thread.spawn-indirect

For a canonical definition:

(canon thread.spawn-indirect shared? $ft $tbl (core func $spawn_indirect))

validation specifies:

  • $ft must refer to the type (shared? (func (param $c T))) where T is i32
    • ๐Ÿ˜ - T may be i32 or i64
  • $tbl must refer to a shared table whose element type matches (ref null (shared? func))
  • $spawn_indirect is given type (shared? (func (param $i U) (param $c T) (result $e i32))) where T comes from $ft as defined above and U is i32
    • ๐Ÿ˜ - U is i32 or i64 as determined by $tbl's address type

When the shared immediate is not present, the spawned thread is cooperative, only switching at specific program points. When the shared immediate is present, the spawned thread is preemptive and able to execute in parallel with all other threads.

Calling $spawn_indirect invokes the following function which simply fuses the thread.new-indirect and thread.resume-later built-ins, allowing thread-creation to skip the intermediate "suspended" state transition.

def canon_thread_spawn_indirect(shared, ft, ftbl: Table[CoreFuncRef], fi, c):
  trap_if(not current_instance().may_leave)
  if DETERMINISTIC_PROFILE:
    return [0]
  [new_thread_index] = canon_thread_new_indirect(shared, ft, ftbl, fi, c)
  [] = canon_thread_resume_later(shared, new_thread_index)
  return [new_thread_index]

Note: canon_thread_new_indirect has not yet been extended to take a shared parameter, but will be as shared-everything-threads progresses.

๐Ÿงตโ‘ก canon thread.available-parallelism

For a canonical definition:

(canon thread.available-parallelism shared? (core func $f))

validation specifies:

  • $f is given type (func shared? (result i32)).

Calling $f returns the number of threads the underlying hardware can be expected to execute in parallel. This value can be artificially limited by engine configuration and is not allowed to change over the lifetime of a component instance.

def canon_thread_available_parallelism():
  if DETERMINISTIC_PROFILE:
    return [1]
  else:
    return [NUM_ALLOWED_THREADS]