Canonical ABI Explainer
May 29, 2026 ยท View on GitHub
This document defines the Canonical ABI used to convert between the values and functions of components in the Component Model and the values and functions of modules in Core WebAssembly. See the AST explainer for a walkthrough of the static structure of a component and the concurrency explainer for a high-level description of the concurrency concepts being specified here.
- Introduction
- Component Instances
- Concurrency
- Embedding
- Lifting and Lowering Context
- Canonical ABI Options
- Runtime State
- Despecialization
- Type Predicates
- Alignment
- Element Size
- Loading
- Storing
- Flattening
- Flat Lifting
- Flat Lowering
- Lifting and Lowering Values
- Canonical definitions
canonoptValidationcanon liftcanon lowercanon resource.newcanon resource.dropcanon resource.repcanon context.get๐canon context.set๐canon backpressure.set๐โcanon backpressure.{inc,dec}๐canon task.return๐canon task.cancel๐canon waitable-set.new๐canon waitable-set.wait๐canon waitable-set.poll๐canon waitable-set.drop๐canon waitable.join๐canon subtask.cancel๐canon subtask.drop๐canon {stream,future}.new๐canon stream.{read,write}๐canon future.{read,write}๐canon {stream,future}.cancel-{read,write}๐canon {stream,future}.drop-{readable,writable}๐canon thread.index๐งตcanon thread.new-indirect๐งตcanon thread.resume-later๐งตcanon thread.suspend๐งตcanon thread.yield๐งตcanon thread.suspend-then-resume๐งตcanon thread.yield-then-resume๐งตcanon thread.suspend-then-promote๐งตcanon thread.yield-then-promote๐งตcanon error-context.new๐canon error-context.debug-message๐canon error-context.drop๐canon thread.spawn-ref๐งตโกcanon thread.spawn-indirect๐งตโกcanon thread.available-parallelism๐งตโก
Introduction
The Canonical ABI specifies, for each component function signature, a
corresponding core function signature and the process for reading
component-level values into and out of linear memory. While a full formal
specification would specify the Canonical ABI in terms of macro-expansion into
Core WebAssembly instructions augmented with a new set of (spec-internal)
administrative instructions, the informal presentation here instead specifies
the process in terms of Python code that would be logically executed at
validation- and run-time by a component model implementation. The Python code
is presented by interleaving definitions with descriptions and eliding some
boilerplate. For a complete listing of all Python definitions in a single
executable file with a small unit test suite, see the
canonical-abi directory.
The convention followed by the Python code below is that all traps are raised
by explicit trap()/trap_if() calls; Python assert() statements should
never fire and are only included as hints to the reader. Similarly, there
should be no uncaught Python exceptions.
While the Python code for lifting and lowering values appears to create an intermediate copy when lifting linear memory into high-level Python values, a real implementation should be able to fuse lifting and lowering into a single direct copy from the source linear memory into the destination linear memory.
Lastly, independently of Python, the Canonical ABI defined below assumes that
out-of-memory conditions (such as memory.grow returning -1 from within
realloc) will trap (via unreachable). This significantly simplifies the
Canonical ABI by avoiding the need to support the complicated protocols
necessary to support recovery in the middle of nested allocations. (Note: by
nature of eliminating realloc, switching to lazy lowering would obviate
this issue, allowing guest wasm code to handle failure by eagerly returning
some value of the declared return type to indicate failure.
Component Instances
Once a component has been parsed/decoded and validated, it can be loaded at
runtime by instantiating it to produce a component instance. The
ComponentInstance class tracks all the spec-internal state that is used by the
definitions below to specify the Canonical ABI. For example, all i32 handles
to resources, waitables, waitable sets, error contexts and threads will index
into the handles or threads fields of ComponentInstance.
class ComponentInstance:
store: Store
parent: Optional[ComponentInstance]
handles: Table[ResourceHandle | Waitable | WaitableSet | ErrorContext]
threads: Table[Thread]
may_enter: bool
may_leave: bool
backpressure: int
num_waiting_to_enter: int
exclusive_thread: Optional[Thread]
def __init__(self, store, parent = None):
assert(parent is None or parent.store is store)
self.store = store
self.parent = parent
self.handles = Table()
self.threads = Table()
self.may_enter = True
self.may_leave = True
self.backpressure = 0
self.num_waiting_to_enter = 0
self.exclusive_thread = None
Components are always instantiated in the context of a store (analogous to the
Core WebAssembly store) which is saved immutably in the instance's store
field. The Store class is defined below as part of the Embedding
interface and logically contains all the component instances loaded by the host
that can interact with each other. For example, in a browser, all component
instances in the same tab that were created via WebAssembly.instantiate or
ESM-integration would go into the same store.
When one component uses an instance definition to instantiate another
component, the component containing the instance definition is called the
parent and the component that gets instantiated is called the child.
Components immutably store their parent component or, if instantiated directly
by the host, None, in the parent field. Thus, the set of component instances
in a store forms a forest rooted by the component instances that were
instantiated directly by the host.
The ComponentInstance.may_enter_from, enter_from and leave_to methods
defined here are used to guard and record execution entering and exiting a
component instance. These methods are used by the Store methods and
Task.request_cancellation, defined below, to ensure Component Invariant #2.
def may_enter_from(self, caller: Optional[ComponentInstance]):
for inst in self.entering_set(caller):
if not inst.may_enter:
return False
return True
def enter_from(self, caller: Optional[ComponentInstance]):
for inst in self.entering_set(caller):
assert(inst.may_enter)
inst.may_enter = False
def leave_to(self, caller: Optional[ComponentInstance]):
for inst in self.entering_set(caller):
assert(not inst.may_enter)
inst.may_enter = True
def entering_set(self, caller: Optional[ComponentInstance]) -> set[ComponentInstance]:
if caller:
return self.self_and_ancestors() - caller.self_and_ancestors()
else:
return self.self_and_ancestors()
def self_and_ancestors(self) -> set[ComponentInstance]:
s = { self }
ancestor = self.parent
while ancestor is not None:
s.add(ancestor)
ancestor = ancestor.parent
return s
In may_enter_from, enter_from and leave_to, the caller parameter is
either the caller's ComponentInstance in a component-to-component call or
None for a host-to-component call. This caller is used to avoid trapping in
the case of a parent component donut wrapping a child component and being
reentered by a child component import call which by definition does not violate
Component Invariant #2.
To distinguish and allow donut-wrapping-reentrance, we say that entering a component instance C also implicitly enters all of C's transitive parents ("ancestors") but when calling from one component into another, any component instance already entered by the caller (including itself) is subtracted from the set of component instances being entered by the callee because execution is not "entering" but rather "staying inside" those instances held in common.
For example, given a parent component instance P which contains core module
instances M1 and M2 and child component instances C1 and C2,
may_enter_from allows every call in this callstack to succeed:
+-------------------------------------------------+
| P |
| +-----------+ +----+ +----+ +-----------+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) | |
| +-----------+ +----+ +----+ +-----------+ |
+-------------------------------------------------+
In particular, when the host first calls into P (via lifted M1),
P.entering_set(None) is { P }, so P.may_enter is tested and then set to
False. When P calls into C1, C1.entering_set(P) is { C1 } (since
P.self_and_ancestors() = { P } is subtracted from C1.self_and_ancestors() = { C1, P }) and thus C1.may_enter is tested and set to False. When C1
calls C2, C2.entering_set(C1) is { C2 }, so C2.may_enter is also set to
False. And then finally when C2 calls back into P (via lifted M2),
P.entering_set(C2) is empty (because C2.self_and_ancestors() = { C2, P } is
subtracted from P.self_and_ancestors() = { P }) and thus there is no
trap_if(not P.may_enter) (which would have otherwise failed).
If now P tries to call from M2 back into C1 (using the power of
call_indirect), there would be a trap, since C1.entering_set(P) is
{ C1 } and C1.may_enter is already False:
+-----------------------------------------------------------+
| P |
| +-----------+ +----+ +----+ +-----------+ +----+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) |-X->| C1 | |
| +-----------+ +----+ +----+ +-----------+ +----+ |
+-----------------------------------------------------------+
Alternatively, let's say P also contains a third child C3 whose exports are
re-exported by P so that they can be called directly by the host. Then if M2
calls back out into the host and the host tries to call C3 directly, it also
traps since C3.entering_set(None) is { C3, P } and P.may_enter is already
set to False:
+-------------------------------------------------+ +--------+
| P | | P |
| +-----------+ +----+ +----+ +-----------+ | | +----+ |
host-->| M1 (in P) |-->| C1 |-->| C2 |-->| M2 (in P) |-->host-X->| C3 | |
| +-----------+ +----+ +----+ +-----------+ | | +----+ |
+-------------------------------------------------+ +--------+
From an optimizing compiler's perspective, the set[ComponentInstance] returned
by entering_set is known statically when compiling a component-to-component
trampoline and thus the compiler can fully unroll the for loops in
may_enter_from, enter_from and leave_to into fixed sequences of branches
and stores with fixed memory locations for the may_enter flags. Furthermore,
because component-to-component reentrance is only possible via donut wrapping
and donut wrapping is only possible when a parent component contains a canon lower definition, whenever the compiler sees a component with no canon lower
definitions, it can mark the may_enter flags of all its direct children as
optimized-out and then completely ignore them. Since donut wrapping is rare,
this means that, in practice, only root component instances' may_enter flags
will be tested and only for host-to-component or component-to-component calls
between different root components (linked by the host). Thus, the overall cost
of reentrance should be very low, in exchange for allowing the producer
toolchain to not have to safely handle reentrance at every single import call.
The other fields of ComponentInstance are described below as they are used.
Concurrency
The Component Model has native concurrency support as summarized by the concurrency explainer. In the Canonical ABI, concurrency support is defined in 3 layers, defined in the next 3 subsections, resp.:
- The base layer is Core WebAssembly stack-switching's
cont.new,suspendandresumeprimitives, implemented here in terms of Python's standardthreadinglibrary. - The next layer up is
Thread, which stores a mutable, nullablecontref, updated over time as the thread suspends and resumes execution, along with some other thread-local state. - The top layer is
Task, which corresponds to a single cross-component concurrent function call, contains 1..NThreads, and holds the state needed to define the ABI rules for the passing of parameters and results, the dropping of borrowed handles, cancellation and backpressure.
Stack Switching
Component Model concurrency is defined in terms of the Core WebAssembly
stack-switching proposal's cont.new, resume and suspend instructions so
that there is a clear composition story between Component Model and Core
WebAssembly concurrency in the future. Since Python does not natively provide
algebraic effects, cont.new, resume and suspend are implemented in this
section in terms of other Python primitives. Thus, this section can be skimmed
since the semantics of stack-switching are already rigorously defined elsewhere;
it's only important to understand the Python function signatures and how they
correspond to the stack-switching instructions.
Since the Component Model only needs a subset of the full expressivity of stack-switching, only that subset is implemented, which significantly simplifies things. In particular, the Component Model uses stack-switching in the following restricted manner:
First, there are only two global control tags used with suspend:
(tag $block (param $switch-to (ref null $Thread)) (result $cancelled bool))
(tag $current-thread (result (ref $Thread)))
Consequently, instead of having a single generic Python suspend() function,
there are block() and current_thread() Python functions that implement
suspend $block and suspend $current-thread, resp.
The $block tag is used to suspend a thread until some future event. The
parameters and results will be described in the next section, where they are
used to define Thread.
The $current-thread tag is used to retrieve the current thread, which is
semantically stored in the resume handler's local state (although an
optimizing implementation would instead maintain the current thread in the VM's
execution context (or a special Core WebAssembly global) so that it could be
cheaply loaded and/or kept in register state).
Second, there is only a single type of continuation passed to resume that
corresponds to the $block tag ($current-thread continuations are
immediately resumed and never "escape"):
(type $ct (cont (func (param bool) (result (ref null $Thread)))))
Third, every resume performed by the Canonical ABI always handles both
$block and $current-thread and every Canonical ABI suspend is, by
construction, always scoped by a Canonical ABI resume. Thus, every Canonical
ABI suspend unconditionally transfers control flow directly to the innermost
enclosing Canonical ABI resume without a general handler/tag search.
Given this restricted usage, specialized versions of cont.new, resume and
suspend that are "monomorphized" to the above types and tags are implemented
in terms of Python's standard preemptive threading primitives, using
threading.Thread to provide a native stack, threading.Lock to only allow
a single threading.Thread to execute at a time, and threading.local to
maintain the dynamic handler scope using thread-local storage. This could have
been implemented more directly and efficiently using fibers, but the Python
standard library doesn't have fibers. However, a realistic implementation is
expected to use (a pool of) fibers.
Starting with cont.new, the monomorphized version takes a function type
matching $ct, as defined above:
class Cancelled(IntEnum):
FALSE = 0
TRUE = 1
class Continuation:
lock: threading.Lock
handler: Handler
block_result: Cancelled
class Handler:
lock: threading.Lock
current_thread: Thread
cont: Optional[Continuation]
block_arg: Optional[Thread]
thread_local_handler = threading.local()
def new_already_acquired_lock() -> threading.Lock:
lock = threading.Lock()
lock.acquire()
return lock
def cont_new(f: Callable[[Cancelled], Optional[Thread]]) -> Continuation:
cont = Continuation()
cont.lock = new_already_acquired_lock()
def thread_base():
cont.lock.acquire()
thread_local_handler.value = cont.handler
block_arg = f(cont.block_result)
handler = thread_local_handler.value
handler.cont = None
handler.block_arg = block_arg
handler.lock.release()
threading.Thread(target = thread_base).start()
return cont
Continuation.block_result and Continuation.handler are set by resume right
before resume calls Continuation.lock.release() to transfer control flow to
the continuation. After resuming the continuation, resume calls
Handler.lock.acquire() to wait until the continuation signals suspension or
return by calling Handler.lock.release(). The Handler is stored in
thread_local_handler.value to implement the dynamic scoping that is required
for suspend. Because the thread created by cont_new can be suspended and
resumed many times (each time with a new Continuation and Handler, resp.),
Handler must be re-loaded from thread_local_handler.value after f returns
since it may have changed since the initial resume.
Next, resume is monomorphized to take a continuation of type $ct, the
argument to pass to the continuation, and the Thread to use to implement the
(on $current-thread) handler. The remaining (on $block) and "returned" cases
join to produce a single return value, with the (on $block) case returning a
Continuation + argument passed to suspend $block and the "returned" case
returning None + the continuation function's return value.
def resume(cont: Continuation, block_result: Cancelled, current_thread: Thread) -> \
tuple[Optional[Continuation], Optional[Thread]]:
handler = Handler()
handler.lock = new_already_acquired_lock()
handler.current_thread = current_thread
cont.handler = handler
cont.block_result = block_result
cont.lock.release()
handler.lock.acquire()
return (handler.cont, handler.block_arg)
Next, the block function implements suspend $block, taking its signature
from the $block tag defined above. Following the locking scheme already
established by cont_new and resume, the implementation passes control flow
and event arguments back to the parent resume and then waits to be unblocked
by a future resume that provides the event results.
def block(switch_to: Optional[Thread]) -> Cancelled:
cont = Continuation()
cont.lock = new_already_acquired_lock()
handler = thread_local_handler.value
handler.cont = cont
handler.block_arg = switch_to
handler.lock.release()
cont.lock.acquire()
thread_local_handler.value = cont.handler
return cont.block_result
Lastly, the current_thread function implements suspend $current-thread,
taking its signature from the $current-thread tag defined above. As mentioned
above, the handler for $current-thread is hard-coded by the Component Model
to simply return the Thread passed to resume which is logically stored as
part of the handler's local state. Thus, the handler can be "inlined" at the
suspend-site by simply returning the Thread stored in thread-local storage
by resume. The other two functions are simply helpers for deriving the
current task and component instance from the current thread.
def current_thread() -> Thread:
return thread_local_handler.value.current_thread
def current_task() -> Task:
return current_thread().task
def current_instance() -> ComponentInstance:
return current_task().inst
Once Core WebAssembly gets stack-switching, the Component Model's $block and
$current-thread tags would not be exposed to Core WebAssembly. Thus, an
optimizing implementation would continue to be able to implement block() as a
direct control flow transfer and current_thread() with implicit execution
context, both without a general handler/tag search. In particular, this avoids
the pathological O(N2) behavior which would otherwise arise if
Component Model cooperative threads were used in conjunction with deeply-nested
Core WebAssembly handlers.
Additionally, once Core WebAssembly has stack switching, any unhandled events
that originate in Core WebAssembly would turn into traps if they reach a
component boundary (just like unhandled exceptions do now; see
call_and_trap_on_throw below). Thus, all cross-component/cross-language stack
switching would continue to be mediated by the Component Model's types and
Canonical ABI, with Core WebAssembly stack-switching used to implement
intra-component concurrency according to the language's own internal ABI.
Threads
As described in the concurrency explainer, threads are created both
implicitly, when calling a component export (in canon_lift below), and
explicitly, when core wasm code calls the thread.new-indirect built-in (in
canon_thread_new_indirect below). While threads are logically created for
each component export call, by design, non-async function calls and
non-async-lowered calls to non-async-lifted async function calls can be
implemented by the engine as a normal synchronous native function call.
Additionally, the engine can allocate the per-thread/task ABI state below
lazily to avoid overhead on small cross-component calls. To assist in this
optimization, threads are put into their own ComponentInstance.threads table
to reduce interference from the other kinds of handles.
Threads are represented in the Canonical ABI by the Thread class defined in
this section. The Thread class is implemented in terms of the stack-switching
primitives defined in the previous section and contains an optional mutable
continuation that is set and cleared whenever a thread is suspended and resumed,
resp. On top of the basic suspend and resume operations, Thread adds the
following higher-level concurrency concepts:
- waiting on external I/O and yielding
- async call stack
- cancellation
- thread index
- thread-local storage
Introducing the Thread class in chunks, a Thread can be in one of the
following 3 states:
running: actively executing on the stack (thus having no continuation)suspended: waiting to beresumed by another threadrunningin the same component instancewaiting: waiting to beresumed nondeterministically by the host after some condition is met, withreadyand non-readysub-states, depending on whether the condition is met.
class Thread:
cont: Optional[Continuation]
ready_func: Optional[Callable[[], bool]]
task: Task
cancellable: bool
index: Optional[int]
storage: tuple[int,int]
def running(self):
return self.cont is None
def suspended(self):
return not self.running() and self.ready_func is None
def waiting(self):
return not self.running() and self.ready_func is not None
def ready(self):
return self.waiting() and self.ready_func()
When a Thread is created, a new continuation is created for thread_func
(wrapping the thread_func with cont_func to exactly match the function type
expected by cont_new) and leaving the thread initially in the suspended
state.
def __init__(self, task, thread_func):
def cont_func(cancelled):
assert(self.running() and not cancelled)
thread_func()
return None
self.cont = cont_new(cont_func)
self.ready_func = None
self.task = task
self.cancellable = False
self.index = None
self.storage = [0,0]
assert(self.suspended())
The next two Thread methods are only called by Thread methods below to add
and remove a thread to the Store.waiting list at the same time as setting and
clearing, resp., the readiness function that Store.tick (defined below) will
test repeatedly to determine when the thread is ready to be resumed.
def start_waiting_internal(self, ready_func):
assert(not self.waiting() and not self.ready_func)
self.ready_func = ready_func
self.task.inst.store.waiting.append(self)
def stop_waiting_internal(self, cancelled):
assert(self.waiting() and self.ready_func)
assert(cancelled or self.ready())
self.ready_func = None
self.task.inst.store.waiting.remove(self)
One way to allow a newly-created thread to start executing is for core wasm to
call the thread.resume-later built-in. This built-in does not immediately
switch execution to the thread but instead transitions the thread to the ready
waiting state, so that it can be Thread.resumed immediately.
def resume_later(self):
assert(self.suspended())
self.start_waiting_internal(lambda: True)
assert(self.ready())
Once its time to execute a suspended or waiting thread, Thread.resume
is called on that thread. This method transitions the thread to the
running state by clearing and then resumeing the Thread's stored
continuation. If the resumed continuation suspends with a Thread to
switch_to, Thread.resume will resume that Thread's continuation, and
so on, repeatedly, until the continuation either returns or suspends with no
thread to switch_to.
def resume(self, cancelled = Cancelled.FALSE):
assert(not self.running() and (self.cancellable or not cancelled))
if self.waiting():
self.stop_waiting_internal(cancelled)
thread = self
while thread is not None:
cont = thread.cont
thread.cont = None
(thread.cont, switch_to) = resume(cont, cancelled, thread)
thread = switch_to
cancelled = Cancelled.FALSE
The Thread.resume method passes cancellation requests (from
Task.request_cancellation defined below) to the continuation being resumed,
allowing a thread that opted-in to being cancellable to be resumed even if
it's not ready.
Note that the while loop shown above is effectively implementing the switch
instruction of the stack-switching proposal since switch is just an
optimization of suspend followed by resume. The non-optimized version is
used here to simplify storing of the new Continuation into Thread.cont.
However, an optimized implementation could do the direct switch.
The next two Thread methods are only called by Thread methods below to
suspend with the block effect (defined in the preceding section).
Thread.block_internal passes no thread to switch_to and so causes
Thread.resume to actually block. In contrast, Thread.switch_to_internal
passes a thread to switch_to, causing the loop in Thread.resume to directly
switch to that thread without blocking.
def block_internal(self, cancellable):
self.cancellable = cancellable
cancelled = block(switch_to = None)
assert(self.running() and (cancellable or not cancelled))
return cancelled
def switch_to_internal(self, cancellable, other):
self.cancellable = cancellable
cancelled = block(switch_to = other)
assert(self.running() and (cancellable or not cancelled))
return cancelled
The cancellable parameters in these methods indicate whether the caller is
prepared to handle cancellation. If cancellable is false for all of a task's
threads, the cancellation request will be stored in Task.state and delivered
the next time Task.deliver_pending_cancel() is called with cancellable set
by one of the Thread methods below.
Once a thread is Thread.resume()ed and starts executing, it can suspend its
execution by calling the thread.suspend built-in which calls Thread.suspend
here. Thread.suspend first attempts to deliver any pending cancellation
requests and then otherwise simply blocks.
def suspend(self, cancellable) -> Cancelled:
assert(self.running())
if self.task.deliver_pending_cancel(cancellable):
return Cancelled.TRUE
return self.block_internal(cancellable)
The Thread.wait_until method is used by all the synchronous blocking
built-ins, as well as auto-backpressure and the callback event loop, to wait
until a particular readiness condition is met. Given wait_until, "yielding"
can simply be defined as waiting on a readiness condition that is already met.
def wait_until(self, ready_func, cancellable = False) -> Cancelled:
assert(self.running())
if self.task.deliver_pending_cancel(cancellable):
return Cancelled.TRUE
if ready_func() and not DETERMINISTIC_PROFILE and random.randint(0,1):
return Cancelled.FALSE
self.start_waiting_internal(ready_func)
return self.block_internal(cancellable)
def yield_(self, cancellable) -> Cancelled:
return self.wait_until(lambda: True, cancellable)
As with Thread.suspend, before anything else, wait_until reports any pending
cancellation requests if the caller is cancellable. The randomint conjunct
on the early return if ready_func() is already True means that, at any
potential suspension point, the embedder can nondeterministically decide whether
to switch to another thread or keep running the current one. In particular, when
a caller makes an async call to a callee which wait_untils a condition
that's already met (e.g. in the case of yield), the embedder can use
scheduling heuristics to decide whether or not to block the current thread.
The Thread.suspend_then_resume and Thread.yield_then_resume methods
immediately resume execution of some other suspended thread in the same
component instance, leaving the original thread in either a suspended or
ready waiting state, resp. Like other Thread methods, these methods first
report any pending cancellation if the caller is cancellable.
def suspend_then_resume(self, cancellable, other: Thread) -> Cancelled:
assert(self.running() and other.suspended())
if self.task.deliver_pending_cancel(cancellable):
return Cancelled.TRUE
return self.switch_to_internal(cancellable, other)
def yield_then_resume(self, cancellable, other: Thread) -> Cancelled:
assert(self.running() and other.suspended())
if self.task.deliver_pending_cancel(cancellable):
return Cancelled.TRUE
self.start_waiting_internal(lambda: True)
return self.switch_to_internal(cancellable, other)
Lastly, the Thread.suspend_then_promote and Thread.yield_then_promote
methods attempt to immediately resume execution of some other thread in the
same component instance if the other thread is in a ready waiting state.
If so, control flow is transferred directly and the current thread is left
suspended or in a ready waiting state, resp. If the other thread is
not ready to run, then these operations fall back to plain suspend or
yield_ behavior, resp.
def suspend_then_promote(self, cancellable, other: Thread) -> Cancelled:
assert(self.running())
if other.ready():
other.stop_waiting_internal(cancelled = False)
return self.suspend_then_resume(cancellable, other)
else:
return self.suspend(cancellable)
def yield_then_promote(self, cancellable, other: Thread) -> Cancelled:
assert(self.running())
if other.ready():
other.stop_waiting_internal(cancelled = False)
return self.yield_then_resume(cancellable, other)
else:
return self.yield_(cancellable)
Tasks
As described in the concurrency explainer, a "task" is created for each call
to a component export (in canon_lift below). Each task contains 1..N threads
that execute on behalf of the task, starting with the implicit thread that is
spawned by canon_lift and transitively including additional explicit threads
spawned by the implicit thread via thread.new-indirect. Tasks contain internal
state that is used to ensure (via trapping guards) that guest code obeys the
Canonical ABI rules.
At a high-level, all cross-component calls funnel through the same FuncInst
spec-level function type, where the host can be the caller, the callee or even
(in rare re-export cases) both:
OnStart = Callable[[], list[any]]
OnResolve = Callable[[Optional[list[any]]], None]
OnCancel = Callable[[], None]
FuncInst = Callable[[OnStart, OnResolve, Optional[ComponentInstance]], OnCancel]
The three parameters of FuncInst are:
- an
OnStartcallback that is called by the callee when it is ready to receive its arguments after waiting for any backpressure to subside; - an
OnResolvecallback that is called by the callee when it is ready to return its value or, if cancellation has been requested,None. - the caller's
ComponentInstance, if the caller is not the host
Critically, if the callee blocks at the wasm level, the spec-level FuncInst
returns immediately to the caller while continuing to execute the callee in a
separate Thread. The OnStart and OnResolve callbacks may be called at any
time before or after the callee returns. If the callee returns and the
OnResolve callback has not yet been called, the caller may invoke the
returned OnCancel callback at most once to cooperatively request that the
callee "hurry up" and call OnResolve (possibly, but not necessarily, passing
None and/or skipping the call to OnStart).
When FuncInst is implemented by wasm guest code (as opposed to the host), each
call creates a Task object to track the state of the call and ensure that the
wasm guest code adheres to the above FuncInst calling convention (or else
traps). Task is introduced in chunks, starting with fields and initialization:
class Task:
class State(Enum):
INITIAL = 1
STARTED = 2
PENDING_CANCEL = 3
CANCEL_DELIVERED = 4
RESOLVED = 5
ft: FuncType
opts: CanonicalOptions
inst: ComponentInstance
on_start: OnStart
on_resolve: OnResolve
caller: Optional[ComponentInstance]
state: State
num_borrows: int
implicit_thread: Optional[Thread]
threads: list[Thread]
def __init__(self, ft, opts, inst, on_start, on_resolve, caller):
self.ft = ft
self.opts = opts
self.inst = inst
self.on_start = on_start
self.on_resolve = on_resolve
self.caller = caller
self.state = Task.State.INITIAL
self.num_borrows = 0
self.implicit_thread = None
self.threads = []
The Task.needs_exclusive predicate returns whether this task's implicit thread
(Task.implicit_thread) has not opted in to multiple concurrent linear memory
shadow stacks (via "stackful" lift) and thus, according to Component Invariant
#3, requires serialization with all the other implicit threads in the component
instance that have similarly not opted in. This question only applies to
async-typed functions, since synchronous functions can't block and thus can
always execute in a LIFO fashion using a single linear memory shadow stack. When
needs_exclusive is true, core wasm execution is gated on acquiring the
ComponentInstance.exclusive_thread lock. Due to cooperativity, the
exclusive_thread "lock" is simply a mutable field holding either None, when
unlocked, or, when locked, a reference to the Task.implicit_thread currently
holding the lock.
def needs_exclusive(self):
assert(self.ft.async_)
return not self.opts.async_ or self.opts.callback
The Task.enter_implicit_thread method implements backpressure between when
the caller of an async-typed function initiates the call and when the callee's
core wasm entry point is executed. This interstitial placement allows a
component instance that has been overloaded with concurrent function invocations
to avoid OOM. When backpressure is enabled, enter_implicit_thread will block
new async-typed calls until backpressure is disabled. There are three sources
of backpressure:
- Explicit backpressure is triggered by core wasm calling
backpressure.{inc,dec}which modify theComponentInstance.backpressurecounter. - Implicit backpressure triggered when
Task.needs_exclusive()is true and theComponentInstance.exclusive_threadlock is already held. - Residual backpressure triggered by explicit or implicit backpressure having been enabled then disabled, but there still being tasks waiting to enter that need to be given the chance to start without getting starved by new tasks.
Note that, because non-async-typed functions ignore backpressure entirely,
they may reenter core wasm when an async-typed function would have been
blocked by implicit backpressure. Thus, export bindings generators must be
careful to handle this possibility (e.g., while maintaining the linear-memory
shadow stack pointer) for components with mixed async- and non-async- typed
exports.
def enter_implicit_thread(self):
assert(self.state == Task.State.INITIAL)
self.implicit_thread = current_thread()
if self.ft.async_:
def has_backpressure():
return (self.inst.backpressure > 0 or
(self.needs_exclusive() and self.inst.exclusive_thread is not None))
if has_backpressure() or self.inst.num_waiting_to_enter > 0:
self.inst.num_waiting_to_enter += 1
cancelled = self.implicit_thread.wait_until(lambda: not has_backpressure(), cancellable = True)
self.inst.num_waiting_to_enter -= 1
if cancelled:
self.cancel()
return False
if self.needs_exclusive():
assert(self.inst.exclusive_thread is None)
self.inst.exclusive_thread = self.implicit_thread
self.register_thread(self.implicit_thread)
return True
def register_thread(self, thread):
assert(thread not in self.threads and thread.task is self)
self.threads.append(thread)
assert(thread.index is None)
thread.index = self.inst.threads.add(thread)
Since the order in which suspended threads are resumed is nondeterministic (see
Store.tick below), once Task.enter_implicit_thread suspends the task's
implicit thread due to backpressure, the above definition allows the host to
arbitrarily select which threads to resume in which order. Additionally, the
above definition ensures the following properties:
- While a callee is waiting to enter, if the caller requests cancellation, the callee is immediately cancelled.
- When backpressure is disabled then reenabled, no new tasks start, even tasks that were blocked and then unblocked by the first occurrence of backpressure (i.e., disabling backpressure never unleashes an unstoppable thundering herd of pending tasks).
Once a task's implicit thread has cleared the backpressure gate, it is added to
the lists of threads running inside the current task and component instance by
Task.register_thread() (which is also called by thread.new-indirect, below).
Symmetrically, the Task.exit_implicit_thread method is called before a task's
implicit thread returns to reverse the effects of Task.enter_implicit_thread.
In particular, if the exclusive_thread lock was acquired, it is released.
Task.unregister_thread (which is also called by thread.new-indirect, below)
traps if the task's last thread is unregistered and the task has not yet
returned a value to its caller.
def exit_implicit_thread(self):
assert(current_thread() is self.implicit_thread)
self.unregister_thread(self.implicit_thread)
if self.ft.async_ and self.needs_exclusive():
assert(self.inst.exclusive_thread is self.implicit_thread)
self.inst.exclusive_thread = None
def unregister_thread(self, thread):
assert(thread in self.threads and thread.task is self)
self.threads.remove(thread)
if len(self.threads) == 0:
trap_if(self.state != Task.State.RESOLVED)
assert(self.num_borrows == 0)
assert(thread.index is not None)
self.inst.threads.remove(thread.index)
The Task.request_cancellation method is called by the host or wasm caller to
signal that they don't need the return value and that the callee should hurry up
and call the OnResolve callback. If a task's implicit thread is waiting to
start (in Task.enter_implicit_thread) due to backpressure, then it is
immediately cancelled without running any guest code. Otherwise, if any of a
cancelled task's threads are expecting cancellation (e.g., when an async callback export returns to the event loop or when waitable-set.wait or a
thread.* built-in is called with cancellable set), request_cancellation
considers resuming that thread (picking one nondeterministically if there are
multiple), giving the thread the chance to handle cancellation promptly so that
subtask.cancel completes without blocking.
def request_cancellation(self):
assert(not self.caller or self.caller is current_instance())
if self.state == Task.State.INITIAL:
self.state = Task.State.CANCEL_DELIVERED
self.implicit_thread.resume(Cancelled.TRUE)
else:
assert(self.state == Task.State.STARTED)
candidates = { t for t in self.threads if t.cancellable }
if self.needs_exclusive() and self.inst.exclusive_thread not in { None, self.implicit_thread }:
candidates.discard(self.implicit_thread)
if candidates and self.inst.may_enter_from(self.caller):
self.state = Task.State.CANCEL_DELIVERED
self.inst.enter_from(self.caller)
random.choice(list(candidates)).resume(Cancelled.TRUE)
self.inst.leave_to(self.caller)
else:
self.state = Task.State.PENDING_CANCEL
As handled above, cancellation must additionally avoid resuming a cancellable
thread when doing so would violate Component Invariant #2 or #3. In
particular, invariant #2 requires not resuming any thread while the task's
containing component instance may not be reentered and invariant #3 requires not
resuming a needs_exclusive task's implicit thread while another task's
implicit thread is running exclusively.
If cancellation cannot be immediately delivered by Task.request_cancellation,
the request is remembered in Task.state and delivered at the next opportunity
by Task.deliver_pending_cancel, which is checked at all cancellation points:
def deliver_pending_cancel(self, cancellable) -> bool:
if cancellable and self.state == Task.State.PENDING_CANCEL:
self.state = Task.State.CANCEL_DELIVERED
return True
return False
The Task.start method is called by canon_lift to get the list of
component-level values that will be subsequently lowered into the callee
component's memory. Since it is called involuntarily after backpressure, before
guest code runs, the definition can assert that it is called correctly.
def start(self) -> list[any]:
assert(self.state == Task.State.INITIAL)
self.state = Task.State.STARTED
return self.on_start()
The Task.return_ method is called by canon_task_return and canon_lift to
return a list of lifted values to the task's caller via the OnResolve
callback. There is a dynamic error if the callee has not dropped all borrowed
handles by the time task.return is called which means that the caller can
assume that all its lent handles have been returned to it when it receives the
SUBTASK RETURNED event. Note that the initial trap_if allows a task to
return a value even after cancellation has been requested.
def return_(self, result):
trap_if(self.state == Task.State.RESOLVED)
trap_if(self.num_borrows > 0)
assert(result is not None)
self.on_resolve(result)
self.state = Task.State.RESOLVED
Lastly, the Task.cancel method is called by canon_task_cancel and
enforces the same num_borrows condition as return_, ensuring that when
the caller's OnResolve callback is called, the caller knows all borrows
have been returned. The initial trap_if only allows cancellation after
cancellation has been delivered to core wasm. In particular, if
request_cancellation cannot synchronously deliver cancellation and sets
Task.state to PENDING_CANCEL, core wasm will still trap if it tries to
call task.cancel.
def cancel(self):
trap_if(self.state != Task.State.CANCEL_DELIVERED)
trap_if(self.num_borrows > 0)
self.on_resolve(None)
self.state = Task.State.RESOLVED
Embedding
A WebAssembly Component Model implementation will typically be embedded into a host environment. An embedder implements the connection between such a host environment and the Component Model semantics defined here. A full Embedding interface would contain functions for decoding, validating, instantiating and interrogating components, just like the Core WebAssembly Embedding. However, the Embedding interface here just covers the subset that is necessary to define the behavior of the Canonical ABI.
The Embedding interface is defined as the methods of the Store class, which is
the Component Model's version of a Core WebAssembly store. Defining Store
in chunks, the Store constructor is analogous to Core WebAssembly
store_init and defines the initial state of the Store:
class Store:
waiting: list[Thread]
nesting_depth: int
def __init__(self):
self.waiting = []
self.nesting_depth = 0
The waiting field is populated by Thread methods, as defined above, and the
nesting_depth field is purely a specification device used by Store methods
below to define the valid host call interleavings.
The Store.invoke method is analogous to Core WebAssembly's func_invoke and
takes a FuncInst (analogous to a Core WebAssembly funcinst) along with its
runtime OnStart and OnResolve arguments (which are described above alongside
their definitions). The Store.nesting_depth field tracks whether there are any
active Store.invoke calls for the benefit of Store.tick, defined below.
def invoke(self, f: FuncInst, on_start: OnStart, on_resolve: OnResolve) -> OnCancel:
self.nesting_depth += 1
on_cancel = f(on_start, on_resolve, caller = None)
self.nesting_depth -= 1
return on_cancel
The FuncInst passed to Store.invoke can be either a guest function (produced
by Store.lift, defined next) or (in the special case of component re-exports)
a host function. Symmetrically, FuncInsts can be called either from the host
(via Store.invoke) or core wasm code (via Store.lower). Store.invoke
passes a None caller to signal that the host is the caller.
The Store.lift method is called for each canon lift definition in a
component to wrap a core wasm CoreFuncInst into a component-level FuncInst,
passing canon lift's immediate arguments as well as the containing component
instance. Similarly, Store.lower is called for each canon lower definition
in a component to wrap a component-level FuncInst into a core wasm
CoreFuncInst. (In a complete Embedding API, Store.lift and Store.lower
would be replaced by a single, higher-level Store.instantiate method of type
Component -> ComponentInstance, analogous to the Core WebAssembly's
module_instantiate. But for the Canonical ABI, just lift and lower are
sufficient to define relevant ABI behavior.)
CoreFuncInst = Callable[[list[CoreValType]], list[CoreValType]]
def lift(self, f: CoreFuncInst, ft: FuncType, opts: CanonicalOptions, inst: ComponentInstance) -> FuncInst:
def func_inst(on_start: OnStart, on_resolve: OnResolve, caller: Optional[ComponentInstance]) -> OnCancel:
assert(not caller or caller is current_instance())
trap_if(not inst.may_enter_from(caller))
inst.enter_from(caller)
on_cancel = canon_lift(f, ft, opts, inst, on_start, on_resolve, caller)
inst.leave_to(caller)
return on_cancel
return func_inst
def lower(self, f: FuncInst, ft: FuncType, opts: CanonicalOptions, inst: ComponentInstance) -> CoreFuncInst:
def core_func_inst(args: list[CoreValType]) -> list[CoreValType]:
assert(inst is current_instance())
assert(all(not i.may_enter for i in inst.self_and_ancestors()))
results = canon_lower(f, ft, opts, args)
assert(all(not i.may_enter for i in inst.self_and_ancestors()))
return results
return core_func_inst
Before entering a component via core wasm export call, the FuncInst wrapper
produced by Store.lift traps if entering the component would violate
Component Invariant #2, and then records that the instance was entered by
calling ComponentInstance.enter_from. The rest of the trampoline is defined by
canon_lift below. Importantly though, canon_lift will return immediately if
it blocks, thereby calling ComponentInstance.leave_to and allowing
reentrance (via Store.invoke or Store.tick) without trapping.
Before temporarily leaving a component via core wasm import call, the
CoreFuncInst wrapper produced by Store.lower asserts that the may_enter
flags of the current component instance and all its ancestors are already
False (as set by ComponentInstance.enter_from in Store.lift). Thus,
by default, reentrance is disallowed. However, if the lowered FuncInst
callee blocks before returning a value and the canon lower definition didn't
specify the async ABI option (which opts in to the non-blocking async ABI),
canon_lower will block until the callee returns (via Thread.wait_until,
defined above) which will suspend the current thread and return from
canon_lift to Store.lift which then calls ComponentInstance.leave_to to
enable reentrance for as long as Thread.wait_until stays blocked. Thus,
in accordance with Component Invariant #2, synchronous (blocking) calls to
async-typed function imports may be reentered during canon_lower.
Lastly, the Store.tick method does not have an analogue in Core WebAssembly
but is necessary to enable native concurrency support in the Component Model.
Store.tick allows the runtime to nondeterministically resume a thread that
previously [blocked] but is now ready. As defined above, Thread.resume will
execute the thread until it either returns or blocks again. Thus, each call to
Store.tick just allows a single thread to make a single quantum of cooperative
progress and the expectation is that the host heuristically interleaves calls to
Store.invoke with calls to Store.tick so that concurrent tasks can complete
while new tasks are being started.
def tick(self):
assert(self.nesting_depth == 0)
assert(all(thread.task.inst.may_enter_from(None) for thread in self.waiting))
self.nesting_depth += 1
candidates = { thread for thread in self.waiting if thread.ready() }
if candidates:
thread = random.choice(list(candidates))
thread.task.inst.enter_from(None)
thread.resume()
thread.task.inst.leave_to(None)
self.nesting_depth -= 1
As shown above, Store.nesting_depth is greater than zero while calling
Store.invoke and thus the first assert prohibits the host from calling
Store.tick during an active Store.invoke. This prohibition ensures that the
second assert holds, which is that all component instances in the store can be
(re)entered. If this were not the case, a random thread might be resumed while
one of its imports' component instances was on the stack and not reenterable,
leading to a spurious trap when it was called.
Lifting and Lowering Context
Most Canonical ABI definitions depend on some ambient information which is
established by the canon lift- or canon lower-defined function that is
being called:
- the ABI options supplied via
canonopt - the containing component instance
- the
TaskorSubtaskused to lower or lift, resp.,borrowhandles
These three pieces of ambient information are stored in an LiftLowerContext
object that is threaded through all the Python functions below as the cx
parameter/field.
class LiftLowerContext:
opts: LiftLowerOptions
inst: ComponentInstance
borrow_scope: Optional[Task|Subtask]
def __init__(self, opts, inst, borrow_scope = None):
self.opts = opts
self.inst = inst
self.borrow_scope = borrow_scope
The borrow_scope field may be None if the types being lifted/lowered are
known to not contain borrow.
Canonical ABI Options
The following classes list the various Canonical ABI options (canonopt)
that can be set on various Canonical ABI definitions. The default values of
the Python fields are the default values when the associated canonopt is
not present in the binary or text format definition.
The MemInst class represents a core WebAssembly memory instance, with
bytes corresponding to the memory's bytes and addrtype coming from the
memtype.
def ptr_size(ptr_type):
match ptr_type:
case 'i32':
return 4
case 'i64':
return 8
@dataclass
class MemInst:
bytes: bytearray
addrtype: Literal['i32', 'i64']
def __getitem__(self, i):
return self.bytes[i]
def __setitem__(self, i, v):
self.bytes[i] = v
def __len__(self):
return len(self.bytes)
def ptr_type(self):
return self.addrtype
def ptr_size(self):
return ptr_size(self.ptr_type())
The ptr_type and ptr_size methods return the core value type and byte
size of memory pointers.
The LiftOptions class contains the subset of canonopt which are needed
when lifting individual parameters and results:
@dataclass
class LiftOptions:
string_encoding: str = 'utf8'
memory: Optional[MemInst] = None
def equal(lhs, rhs):
return lhs.string_encoding == rhs.string_encoding and \
lhs.memory is rhs.memory
The equal static method is used by task.return below to dynamically
compare equality of just this subset of canonopt.
The LiftLowerOptions class contains the subset of canonopt which are
needed when lifting or lowering individual parameters and results:
@dataclass
class LiftLowerOptions(LiftOptions):
realloc: Optional[Callable] = None
The CanonicalOptions class contains the rest of the canonopt
options that affect how an overall function is lifted/lowered:
@dataclass
class CanonicalOptions(LiftLowerOptions):
post_return: Optional[Callable] = None
async_: bool = False
callback: Optional[Callable] = None
Runtime State
The following Python classes define spec-internal state and utility methods that are used to define the externally-visible behavior of Canonical ABI's lifting, lowering and built-in definitions below. These fields are chosen for simplicity over performance and thus an optimizing implementation is expected to use a more optimized representations as long as it preserves the same externally-visible behavior. Some specific examples of expected optimizations are noted below.
Table State
The Table class encapsulates a mutable, growable array of opaque elements
that are represented in Core WebAssembly as i32 indices into the array.
Currently, every component instance contains two tables: a threads table
containing all the component's threads and a handles
table containing everything else (resource handles,
waitables and waitable sets and
error contexts).
class Table:
array: list[any]
free: list[int]
MAX_LENGTH = 2**28 - 1
def __init__(self):
self.array = [None]
self.free = []
def __iter__(self):
for e in self.array:
if e is not None:
yield e
def get(self, i):
trap_if(i >= len(self.array))
trap_if(self.array[i] is None)
return self.array[i]
def add(self, e):
if self.free:
i = self.free.pop()
assert(self.array[i] is None)
self.array[i] = e
else:
i = len(self.array)
trap_if(i > Table.MAX_LENGTH)
self.array.append(e)
return i
def remove(self, i):
e = self.get(i)
self.array[i] = None
self.free.append(i)
return e
Table maintains a dense array of elements that can contain holes created by
the remove method (defined above). When table elements are accessed (e.g., by
canon_lift and resource.rep, below), there are thus both a bounds check and
hole check necessary. Upon initialization, table element 0 is allocated and
set to None, effectively reserving index 0 which is both useful for
catching null/uninitialized accesses and allowing 0 to serve as a sentinel
value.
The add and remove methods work together to maintain a free list of holes
that are used in preference to growing the table. The free list is represented
as a Python list here, but an optimizing implementation could instead store the
free list in the free elements of array.
The limit of 2**28 ensures that the high 4 bits of table indices are unset
and available for other use in guest code (e.g., for tagging, packed words or
sentinel values).
Resource State
The ResourceHandle class defines the elements of the component instance's
handles table used to represent handles to resources:
class ResourceHandle:
rt: ResourceType
rep: int
own: bool
borrow_scope: Optional[Task]
num_lends: int
def __init__(self, rt, rep, own, borrow_scope = None):
self.rt = rt
self.rep = rep
self.own = own
self.borrow_scope = borrow_scope
self.num_lends = 0
The rt and rep fields of ResourceHandle store the resource type and
representation of the resource. The rep field is currently fixed to be an
i32, but will be generalized in the future to other types.
The own field indicates whether this element was created from an own type
(or, if false, a borrow type).
The borrow_scope field stores the Task that lowered the borrowed handle as a
parameter. When a component only uses sync-lifted exports, due to lack of
reentrance, there is at most one Task alive in a component instance at any
time and thus an optimizing implementation doesn't need to store the Task
per ResourceHandle.
The num_lends field maintains a conservative approximation of the number of
live handles that were lent from this handle (by calls to borrow-taking
functions). This count is maintained by the Subtask bookkeeping functions
(below) and is ensured to be zero when an own handle is dropped.
The ResourceType class represents a runtime instance of a resource type that
has been created either by the host or a component instance (where multiple
component instances of the same static component create multiple ResourceType
instances). ResourceType Python object identity is used by trapping guards on
the rt field of ResourceHandle (above) and thus resource type equality is
not defined structurally (on the contents of ResourceType).
class ResourceType(Type):
impl: ComponentInstance
dtor: Optional[Callable]
def __init__(self, impl, dtor = None):
self.impl = impl
self.dtor = dtor
Waitable State
A "waitable" is a concurrent activity that can be waited on by the built-ins
waitable-set.wait and waitable-set.poll. Currently, there are 5 different
kinds of waitables: subtasks and the 4 combinations of the readable and
writable ends of futures and streams.
Waitables deliver "events" which are values of the following EventTuple type.
The two int "payload" fields of EventTuple store core wasm i32s and are
to be interpreted based on the EventCode. The meaning of the different
EventCodes and their payloads will be introduced incrementally below by the
code that produces the events (specifically, in subtask_event, stream_event
or future_event).
class EventCode(IntEnum):
NONE = 0
SUBTASK = 1
STREAM_READ = 2
STREAM_WRITE = 3
FUTURE_READ = 4
FUTURE_WRITE = 5
TASK_CANCELLED = 6
EventTuple = tuple[EventCode, int, int]
The Waitable class factors out the state and behavior common to all 5 kinds
of waitables, which are each defined as subclasses of Waitable below.
Every Waitable can store at most one pending event in its pending_event
field which will be delivered to core wasm as soon as the core wasm code
explicitly waits on this Waitable (which may take an arbitrarily long time).
A pending_event is represented in the Python code below as a closure so
that the closure can specify behaviors that trigger right before events are
delivered to core wasm and so that the closure can compute the event based on
the state of the world at delivery time (as opposed to when pending_event was
first set). Currently, pending_event holds a closure of the subtask_event,
stream_event or future_event functions defined below. An optimizing
implementation would avoid closure allocation by inlining a union containing
the closure fields directly in the component instance table.
A waitable can belong to at most one "waitable set" (defined next) which is
referred to by the wset field. A Waitable's pending_event is delivered
(via get_pending_event) when core wasm code waits on its waitable set (via
waitable-set.wait or, when using callback, by returning to the event loop).
Lastly, a waitable cannot be waited on both asynchronously (via
waitable set) and synchronously (via synchronous subtask.cancel or
{stream,future}.{,cancel-}{read,write}) since this raises the possibility that
the waitable set "steals" events from the synchronous waiter, leaving the
synchronous waiter forever waiting. This condition is asserted by the Waitable
methods here and guarded via traps by the relevant built-ins below.
class Waitable:
pending_event: Optional[Callable[[], EventTuple]]
wset: Optional[WaitableSet]
has_sync_waiter: bool
def __init__(self):
self.pending_event = None
self.wset = None
self.has_sync_waiter = False
def set_pending_event(self, pending_event):
self.pending_event = pending_event
def has_pending_event(self):
return bool(self.pending_event)
def in_waitable_set(self):
return self.wset is not None
def wait_for_pending_event(self):
assert(not self.in_waitable_set() and not self.has_sync_waiter)
self.has_sync_waiter = True
current_thread().wait_until(self.has_pending_event, cancellable = False)
self.has_sync_waiter = False
def get_pending_event(self) -> EventTuple:
pending_event = self.pending_event
self.pending_event = None
return pending_event()
def join(self, wset):
assert(not self.has_sync_waiter)
if self.wset:
self.wset.elems.remove(self)
self.wset = wset
if wset:
wset.elems.append(self)
def drop(self):
assert(not self.has_pending_event())
assert(not self.has_sync_waiter)
self.join(None)
A "waitable set" contains a collection of waitables that can be waited on or
polled for any element to make progress. Although the WaitableSet class
below represents elems as a list and implements {has,get}_pending_event
with an O(n) search, because a waitable can be associated with at most one set
and can contain at most one pending event, a real implementation could instead
store a list of waitables-with-pending-events as a linked list embedded
directly in the component instance's table element to avoid the separate
allocation while providing O(1) polling.
class WaitableSet:
elems: list[Waitable]
num_waiting: int
def __init__(self):
self.elems = []
self.num_waiting = 0
def has_pending_event(self):
return any(w.has_pending_event() for w in self.elems)
def get_pending_event(self) -> EventTuple:
assert(self.has_pending_event())
random.shuffle(self.elems)
for w in self.elems:
assert(self is w.wset)
if w.has_pending_event():
return w.get_pending_event()
def wait_for_event_and(self, ready_func, cancellable) -> EventTuple:
def ready_and_has_event():
return ready_func() and self.has_pending_event()
self.num_waiting += 1
cancelled = current_thread().wait_until(ready_and_has_event, cancellable)
if cancelled:
event = (EventCode.TASK_CANCELLED, 0, 0)
else:
event = self.get_pending_event()
self.num_waiting -= 1
return event
def wait_for_event(self, cancellable) -> EventTuple:
return self.wait_for_event_and(lambda: True, cancellable)
def poll(self, cancellable) -> EventTuple:
if current_task().deliver_pending_cancel(cancellable):
return (EventCode.TASK_CANCELLED, 0, 0)
elif not self.has_pending_event():
return (EventCode.NONE, 0, 0)
else:
return self.get_pending_event()
def drop(self):
trap_if(len(self.elems) > 0)
trap_if(self.num_waiting > 0)
The random.shuffle in get_pending_event give embedders the semantic freedom
to schedule delivery of events nondeterministically (e.g., taking into account
priorities); runtimes do not have to literally randomize event delivery.
The ready_func passed to WaitableSet.wait_for_event_and allows the caller to
stipulate extra conditions that have to be met, in addition to there being an
event ready for delivery. In particular, this is used by the async callback
event loop to avoid overlapping callback execution.
The WaitableSet.drop method traps if dropped while it still contains elements
(whose Waitable.wset field would become dangling) or if it is being
waited-upon by another Task (as indicated by a non-zero num_waiting).
Subtask State
While canon_lift creates Task objects, canon_lower creates Subtask
objects, using Subtask to contain all the state relevant to the caller. See
the structured concurrency section for a summary of how Task and Subtask
relate with respect to component- and host-defined callers and callees. This
section introduces Subtask incrementally, starting with its fields and
initialization:
class Subtask(Waitable):
class State(IntEnum):
STARTING = 0
STARTED = 1
RETURNED = 2
CANCELLED_BEFORE_STARTED = 3
CANCELLED_BEFORE_RETURNED = 4
state: State
on_cancel: Optional[OnCancel]
lenders: Optional[list[ResourceHandle]]
cancellation_requested: bool
def __init__(self):
Waitable.__init__(self)
self.state = Subtask.State.STARTING
self.on_cancel = None
self.lenders = []
self.cancellation_requested = False
The state field of Subtask tracks the callee's progression from the initial
STARTING state along the subtask state machine.
A Subtask is considered "resolved" if it has returned a value or if, after
having had cancellation requested by the caller, called task.cancel (either
before or after calling OnStart):
def resolved(self):
match self.state:
case (Subtask.State.STARTING |
Subtask.State.STARTED):
return False
case (Subtask.State.RETURNED |
Subtask.State.CANCELLED_BEFORE_STARTED |
Subtask.State.CANCELLED_BEFORE_RETURNED):
return True
The Subtask.add_lender method is called by lift_borrow (below). This method
increments the num_lends counter on the handle being lifted, which is guarded
to be zero by canon_resource_drop (below). The Subtask.deliver_resolve
method is called right before the SUBTASK resolve event is delivered to
wasm (for any of the RETURNED, CANCELLED_BEFORE_STARTED or
CANCELLED_BEFORE_RETURNED states), at which point all the borrowed handles
are logically returned to the caller by decrementing all the num_lend counts
that were initially incremented.
def add_lender(self, lending_handle):
assert(not self.resolve_delivered() and not self.resolved())
lending_handle.num_lends += 1
self.lenders.append(lending_handle)
def deliver_resolve(self):
assert(not self.resolve_delivered() and self.resolved())
for h in self.lenders:
h.num_lends -= 1
self.lenders = None
def resolve_delivered(self):
assert(self.lenders is not None or self.resolved())
return self.lenders is None
Note, the lenders list usually has a fixed size (in all cases except when a
function signature has borrows in lists or streams) and thus can be
stored inline in the native stack frame.
The Subtask.drop method is only called for Subtasks that have been added to
the current component instance's handles table and checks that the callee has
been allowed to resolve and explicitly relinquish any borrowed handles.
def drop(self):
trap_if(not self.resolve_delivered())
Waitable.drop(self)
Buffer State
A "buffer" is an abstract region of memory that can either be read-from or written-to. This region of memory can either be owned by the host or by wasm. Currently wasm memory is always 32-bit or 64-bit linear memory, but soon GC memory will be added. Thus, buffers provide an abstraction over at least 4 different "kinds" of memory.
(Currently, buffers are only created implicitly as part of stream and future
built-ins such as stream.read. However, in the
future,
explicit component-level buffer types and canonical built-ins may be added to
allow explicitly creating buffers and passing them between components.)
A "readable buffer" allows reading t values from the buffer's memory. A
"writable buffer" allows writing t values into the buffer's memory. All
buffers have an associated component-level value type t and a remain method
that returns how many t values may still be read or written. Buffers mostly
hide their original/complete size. However, zero-length buffers need to be
treated specially (particularly when a zero-length read rendezvous with a
zero-length write), so there is a special query for detecting whether a buffer
is zero-length. Internally, buffers do have a maximum length of 2^28 - 1 which
is independent of the type of memory backing the buffer. Based on this, buffers
are represented by the following 3 abstract Python classes:
class Buffer:
MAX_LENGTH = 2**28 - 1
t: ValType
remain: Callable[[], int]
is_zero_length: Callable[[], bool]
class ReadableBuffer(Buffer):
read: Callable[[int], list[any]]
class WritableBuffer(Buffer):
write: Callable[[list[any]]]
As preconditions (internally ensured by the Canonical ABI code below):
readmay only be passed a positive number less than or equal toremainwritemay only be passed a non-empty list of length less than or equal toremaincontaining values of typet
Since read and write are synchronous Python functions, buffers inherently
guarantee synchronous access to a fixed-size backing memory and are thus
distinguished from streams (which provide asynchronous operations for reading
and writing an unbounded number of values to potentially-different regions of
memory over time).
The ReadableBuffer and WritableBuffer abstract classes may either be
implemented by the host or by another wasm component. In the latter case, these
abstract classes are implemented by the concrete ReadableBufferGuestImpl and
WritableBufferGuestImpl classes which eagerly check alignment and range
when the buffer is constructed so that read and write are infallible
operations (modulo traps):
class BufferGuestImpl(Buffer):
cx: LiftLowerContext
t: ValType
ptr: int
progress: int
length: int
def __init__(self, t, cx, ptr, length):
trap_if(length > Buffer.MAX_LENGTH)
if t and length > 0:
trap_if(ptr != align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
trap_if(ptr + length * elem_size(t, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
self.cx = cx
self.t = t
self.ptr = ptr
self.progress = 0
self.length = length
def remain(self):
return self.length - self.progress
def is_zero_length(self):
return self.length == 0
class ReadableBufferGuestImpl(BufferGuestImpl):
def read(self, n):
assert(n <= self.remain())
if self.t:
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
self.ptr += n * elem_size(self.t, self.cx.opts.memory.ptr_type())
else:
vs = n * [()]
self.progress += n
return vs
class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
def write(self, vs):
assert(len(vs) <= self.remain())
if self.t:
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
self.ptr += len(vs) * elem_size(self.t, self.cx.opts.memory.ptr_type())
else:
assert(all(v == () for v in vs))
self.progress += len(vs)
When t is None (arising from stream and future with empty element
types), the core-wasm-supplied ptr is entirely ignored, while the length
and progress are still semantically meaningful. Source bindings may represent
this case with a generic stream/future of [unit] type or a distinct type that
conveys events without values.
The load_list_from_valid_range and store_list_into_valid_range functions
that do all the heavy lifting are shared with function parameter/result lifting
and lowering and defined below.
Stream State
Values of stream type are represented in the Canonical ABI as i32 indices
into the current component instance's handles table referring to either the
readable or writable end of a stream. Reading from the readable end of a
stream is achieved by calling stream.read and supplying a WritableBuffer.
Conversely, writing to the writable end of a stream is achieved by calling
stream.write and supplying a ReadableBuffer. The runtime waits until both
a readable and writable buffer have been supplied and then performs a direct
copy between the two buffers. This rendezvous-based design avoids the need
for an intermediate buffer and copy (unlike, e.g., a Unix pipe; a Unix pipe
would instead be implemented as a resource type owning the buffer memory and
two streams; on going in and one coming out).
The result of a {stream,future}.{read,write} is communicated to the wasm
guest via a CopyResult code:
class CopyResult(IntEnum):
COMPLETED = 0
DROPPED = 1
CANCELLED = 2
The DROPPED code indicates that the other end has since been dropped and
thus no more reads/writes are possible. The CANCELLED code is only possible
after this end has performed a {stream,future}.{read,write} followed by a
{stream,future}.cancel-{read,write}; CANCELLED notifies the wasm code
that the cancellation finished and so ownership of the memory buffer has been
returned to the wasm code. Lastly, COMPLETED indicates that the copy is done
and neither DROPPED nor CANCELLED apply.
As with functions and buffers, native host code can be on either side of a
stream. Thus, streams are defined in terms of abstract interfaces that can be
implemented and consumed by wasm or host code (with all {wasm,host} pairings
being possible and well-defined). Since a stream in a function parameter or
result type always represents the transfer of the readable end of a stream,
only the ReadableStream interface can be implemented by either wasm or the
host; the WritableStream interface is always written to by wasm via a
writable stream end created by stream.new.
ReclaimBuffer = Callable[[], None]
OnCopy = Callable[[ReclaimBuffer], None]
OnCopyDone = Callable[[CopyResult], None]
class SharedBase:
t: ValType
cancel: Callable[[], None]
drop: Callable[[], None]
class ReadableStream(SharedBase):
read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None]
class WritableStream(SharedBase):
write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]
The key operations in these interfaces are read and write which work as
follows:
readnever blocks and returns its values by either synchronously or asynchronously writing to the givenWritableBufferand then calling the givenOnCopy*callbacks to notify the caller of progress.- Symmetrically,
writenever blocks and takes the value to be written from the givenReadableBuffer, calling the givenOnCopy*callbacks to notify the caller of progress. OnCopyDoneis called to indicate that thereadorwriteis finished copying and that the caller has regained ownership of the buffer.OnCopyis called to indicate a copy has been made to or from the buffer. However, there may be further copies made in the future, so the caller has not regained ownership of the buffer.- The
ReclaimBuffercallback passed toOnCopyallows the caller ofreadorwriteto immediately regain ownership of the buffer once the first copy has completed. cancelis non-blocking, but does not guarantee that ownership of the buffer has been returned;cancelonly lets the caller request that one of theOnCopy*callbacks be called ASAP (which may or may not happen duringcancel).- The client may not call
read,writeordropwhile there is a previousreadorwritein progress.
The OnCopy* callbacks are a spec-internal detail used to specify the allowed
concurrent behaviors of stream.{read,write} and not exposed directly to core
wasm code. Specifically, the point of the OnCopy* callbacks is to specify that
multiple reads or writes are allowed into the same Buffer up until the point
where either the buffer is full or the calling core wasm code receives a
STREAM_READ or STREAM_WRITE progress event (in which case ReclaimBuffer is
called). This reduces the number of context-switches required by the spec,
particularly when streaming between two components.
The SharedStreamImpl class implements both ReadableStream and
WritableStream for streams created by wasm (via stream.new) and tracks the
common state shared by both the readable and writable ends of streams (defined
below).
Introducing SharedStreamImpl in chunks, starting with the fields and initialization:
class SharedStreamImpl(ReadableStream, WritableStream):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
pending_on_copy: Optional[OnCopy]
pending_on_copy_done: Optional[OnCopyDone]
def __init__(self, t):
self.t = t
self.dropped = False
self.reset_pending()
def reset_pending(self):
self.set_pending(None, None, None, None)
def set_pending(self, inst, buffer, on_copy, on_copy_done):
self.pending_inst = inst
self.pending_buffer = buffer
self.pending_on_copy = on_copy
self.pending_on_copy_done = on_copy_done
If set, the pending_* fields record the Buffer and OnCopy* callbacks of a
read or write that is waiting to rendezvous with a complementary write or
read. Dropping the readable or writable end of a stream or cancelling a
read or write notifies any pending read or write via its OnCopyDone
callback:
def reset_and_notify_pending(self, result):
pending_on_copy_done = self.pending_on_copy_done
self.reset_pending()
pending_on_copy_done(result)
def cancel(self):
self.reset_and_notify_pending(CopyResult.CANCELLED)
def drop(self):
if not self.dropped:
self.dropped = True
if self.pending_buffer:
self.reset_and_notify_pending(CopyResult.DROPPED)
While the abstract ReadableStream and WritableStream interfaces allow
cancel to return without having returned ownership of the buffer (which, in
general, is necessary for various host APIs), when wasm is
implementing the stream, cancel always returns ownership of the buffer
immediately.
Note that cancel and drop notify in opposite directions:
cancelmust be called on a readable or writable end with an operation pending, and thuscancelnotifies the same end that called it.dropmust not be called on a readable or writable end with an operation pending, and thusdropnotifies the opposite end.
The read method implements ReadableStream.read and is called by either
stream.read or the host, depending on who is passed the readable end of the
stream. If the reader is first to rendezvous, then all the parameters are
stored in the pending_* fields, requiring the reader to wait for the writer
to rendezvous. If the writer was first to rendezvous, then there is already a
pending ReadableBuffer to read from, and so the reader copies as much as it
can (which may be less than a full buffer's worth) and eagerly completes the
copy without blocking. In the final special case where the pending writer has a
zero-length buffer, the writer is notified, but the reader remains blocked:
def read(self, inst, dst_buffer, on_copy, on_copy_done):
if self.dropped:
on_copy_done(CopyResult.DROPPED)
elif not self.pending_buffer:
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
else:
assert(self.t == dst_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if dst_buffer.remain() > 0:
n = min(dst_buffer.remain(), self.pending_buffer.remain())
dst_buffer.write(self.pending_buffer.read(n))
self.pending_on_copy(self.reset_pending)
on_copy_done(CopyResult.COMPLETED)
else:
self.reset_and_notify_pending(CopyResult.COMPLETED)
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
Currently, there is a trap when both the read and write come from the same
component instance and there is a non-empty, non-number element type. This trap
will be removed in a subsequent release; the reason for the trap is that when
lifting and lowering can alias the same memory, interleavings can be complex
and must be handled carefully. Future improvements to the Canonical ABI (lazy
lowering) can greatly simplify this interleaving and be more practical to
implement.
The write method implements WritableStream.write and is called by the
stream.write built-in (noting that the host cannot be passed the writable end
of a stream but may instead implement the ReadableStream interface and pass
the readable end into a component). The steps for write are the same as
read except for when a zero-length write rendezvous with a zero-length
read, in which case the write eagerly completes, leaving the read
pending:
def write(self, inst, src_buffer, on_copy, on_copy_done):
if self.dropped:
on_copy_done(CopyResult.DROPPED)
elif not self.pending_buffer:
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
else:
assert(self.t == src_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if src_buffer.remain() > 0:
n = min(src_buffer.remain(), self.pending_buffer.remain())
self.pending_buffer.write(src_buffer.read(n))
self.pending_on_copy(self.reset_pending)
on_copy_done(CopyResult.COMPLETED)
elif src_buffer.is_zero_length() and self.pending_buffer.is_zero_length():
on_copy_done(CopyResult.COMPLETED)
else:
self.reset_and_notify_pending(CopyResult.COMPLETED)
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
Putting together the behavior of zero-length read and write above, we can
see that, when both the reader and writer are zero-length, regardless of who
was first, the zero-length write always completes, leaving the zero-length
read pending. To avoid livelock, the Canonical ABI requires that a writer
must (eventually) follow a completed zero-length write with a
non-zero-length write that is allowed to block. This will break the loop,
notifying the reader end and allowing it to rendezvous with a non-zero-length
read and make progress. See the stream readiness section in the async
explainer for more background on purpose of zero-length reads and writes.
The none_or_number_type predicate used above includes both the integer and
floating point number types:
def none_or_number_type(t):
return t is None or isinstance(t, U8Type | U16Type | U32Type | U64Type |
S8Type | S16Type | S32Type | S64Type |
F32Type | F64Type)
The two ends of a stream are stored as separate elements in the component
instance handles table and each end has a separate CopyState that reflects
what that end is currently doing or has done. This state field is factored
out into the CopyEnd class that is derived below. The two ends also share some
state which is referenced by the shared field and either points to a
SharedStreamImpl (for component-created streams) or something host-defined for
(host-created streams).
class CopyState(Enum):
IDLE = 1
COPYING = 2
CANCELLING_COPY = 3
DONE = 4
class CopyEnd(Waitable):
state: CopyState
shared: SharedBase
def __init__(self, shared):
Waitable.__init__(self)
self.state = CopyState.IDLE
self.shared = shared
def copying(self):
match self.state:
case CopyState.IDLE | CopyState.DONE:
return False
case CopyState.COPYING | CopyState.CANCELLING_COPY:
return True
assert(False)
def drop(self):
trap_if(self.copying())
self.shared.drop()
Waitable.drop(self)
class ReadableStreamEnd(CopyEnd):
def copy(self, inst, dst, on_copy, on_copy_done):
self.shared.read(inst, dst, on_copy, on_copy_done)
class WritableStreamEnd(CopyEnd):
def copy(self, inst, src, on_copy, on_copy_done):
self.shared.write(inst, src, on_copy, on_copy_done)
As shown in drop, attempting to drop a readable or writable end while a copy
is in progress or in the process of being cancelled traps. This means that
client code must take care to wait for these operations to finish (potentially
cancelling them via stream.cancel-{read,write}) before dropping.
The polymorphic copy method dispatches to either ReadableStream.read or
WritableStream.write and allows the implementations of stream.{read,write}
to share a single definition (in stream_copy below).
Future State
Futures are similar to streams, except that instead of passing 0..N values, exactly one value is passed from the writer end to the reader end unless the reader end is explicitly dropped first.
Futures are defined in terms of abstract ReadableFuture and WritableFuture
interfaces:
class ReadableFuture(SharedBase):
read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None]
class WritableFuture(SharedBase):
write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]
These interfaces work like ReadableStream and WritableStream except that
there is no OnCopy callback passed to read or write to report partial
progress (since at most 1 value is copied) and the given Buffer must have
remain() == 1.
Introducing SharedFutureImpl in chunks, the first part is exactly
symmetric to SharedStreamImpl in how initialization and cancellation work:
class SharedFutureImpl(ReadableFuture, WritableFuture):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
pending_on_copy_done: Optional[OnCopyDone]
def __init__(self, t):
self.t = t
self.dropped = False
self.reset_pending()
def reset_pending(self):
self.set_pending(None, None, None)
def set_pending(self, inst, buffer, on_copy_done):
self.pending_inst = inst
self.pending_buffer = buffer
self.pending_on_copy_done = on_copy_done
def reset_and_notify_pending(self, result):
pending_on_copy_done = self.pending_on_copy_done
self.reset_pending()
pending_on_copy_done(result)
def cancel(self):
self.reset_and_notify_pending(CopyResult.CANCELLED)
Dropping works the same in futures as in streams, except that a future
writable end cannot be dropped without having written a value. This is guarded
by WritableFutureEnd.drop so it can be asserted here:
def drop(self):
if not self.dropped:
self.dropped = True
if self.pending_buffer:
assert(isinstance(self.pending_buffer, WritableBuffer))
self.reset_and_notify_pending(CopyResult.DROPPED)
Lastly, read and write work mostly like streams, but simplified based on
the fact that we're copying at most 1 value. The only asymmetric difference is
that, as mentioned above, only the writable end can observe that the readable
end was dropped before receiving a value.
def read(self, inst, dst_buffer, on_copy_done):
assert(not self.dropped and dst_buffer.remain() == 1)
if not self.pending_buffer:
self.set_pending(inst, dst_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
dst_buffer.write(self.pending_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
def write(self, inst, src_buffer, on_copy_done):
assert(src_buffer.remain() == 1)
if self.dropped:
on_copy_done(CopyResult.DROPPED)
elif not self.pending_buffer:
self.set_pending(inst, src_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
self.pending_buffer.write(src_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
As with streams, the # temporary limitation shown above is that a future
cannot be read and written from the same component instance when it has a
non-empty, non-number value type.
Lastly, the {Readable,Writable}FutureEnd classes are mostly symmetric with
{Readable,Writable}StreamEnd, defining a polymorphic copy method that
dispatches to either ReadableFuture.read or WritableFuture.write, which
allows the implementation of future.{read,write} to share a single
definition (in future_copy below). The only difference is that
WritableFutureEnd.drop traps if the writer hasn't successfully written a value
or been notified of the reader dropping their end:
class ReadableFutureEnd(CopyEnd):
def copy(self, inst, dst_buffer, on_copy_done):
self.shared.read(inst, dst_buffer, on_copy_done)
class WritableFutureEnd(CopyEnd):
def copy(self, inst, src_buffer, on_copy_done):
self.shared.write(inst, src_buffer, on_copy_done)
def drop(self):
trap_if(self.state != CopyState.DONE)
CopyEnd.drop(self)
Despecialization
In the explainer, component value types are classified as
either fundamental or specialized, where the specialized value types are
defined by expansion into fundamental value types. In most cases, the canonical
ABI of a specialized value type is the same as its expansion so, to avoid
repetition, the other definitions below use the following despecialize
function to replace specialized value types with their expansion:
def despecialize(t):
match t:
case TupleType(ts) : return RecordType([ FieldType(str(i), t) for i,t in enumerate(ts) ])
case EnumType(labels) : return VariantType([ CaseType(l, None) for l in labels ])
case OptionType(t) : return VariantType([ CaseType("none", None), CaseType("some", t) ])
case ResultType(ok, err) : return VariantType([ CaseType("ok", ok), CaseType("error", err) ])
case MapType(k, v) : return ListType(despecialize(TupleType([k, v])))
case _ : return t
The specialized value types string and flags are missing from this list
because they are given specialized canonical ABI representations distinct from
their respective expansions.
Type Predicates
The contains_borrow and contains_async_value predicates return whether the
given type contains a borrow or future/stream, respectively.
def contains_borrow(t):
return contains(t, lambda u: isinstance(u, BorrowType))
def contains_async_value(t):
return contains(t, lambda u: isinstance(u, StreamType | FutureType))
def contains(t, p):
t = despecialize(t)
match t:
case None:
return False
case PrimValType() | OwnType() | BorrowType():
return p(t)
case ListType(u) | StreamType(u) | FutureType(u):
return p(t) or contains(u, p)
case RecordType(fields):
return p(t) or any(contains(f.t, p) for f in fields)
case VariantType(cases):
return p(t) or any(contains(c.t, p) for c in cases)
case FuncType():
return any(p(u) for u in t.param_types() + t.result_type())
case _:
assert(False)
Alignment
Each value type is assigned an alignment which is used by subsequent
Canonical ABI definitions. Presenting the definition of alignment piecewise,
we start with the top-level case analysis:
def alignment(t, ptr_type):
match despecialize(t):
case BoolType() : return 1
case S8Type() | U8Type() : return 1
case S16Type() | U16Type() : return 2
case S32Type() | U32Type() : return 4
case S64Type() | U64Type() : return 8
case F32Type() : return 4
case F64Type() : return 8
case CharType() : return 4
case StringType() : return ptr_size(ptr_type)
case ErrorContextType() : return 4
case ListType(t, l) : return alignment_list(t, l, ptr_type)
case RecordType(fields) : return alignment_record(fields, ptr_type)
case VariantType(cases) : return alignment_variant(cases, ptr_type)
case FlagsType(labels) : return alignment_flags(labels)
case OwnType() | BorrowType() : return 4
case StreamType() | FutureType() : return 4
List alignment is the same as tuple alignment when the length is fixed and otherwise uses the alignment of pointers.
def alignment_list(elem_type, maybe_length, ptr_type):
if maybe_length is not None:
return alignment(elem_type, ptr_type)
return ptr_size(ptr_type)
Record alignment is tuple alignment, with the definitions split for reuse below:
def alignment_record(fields, ptr_type):
a = 1
for f in fields:
a = max(a, alignment(f.t, ptr_type))
return a
As an optimization, variant discriminants are represented by the smallest integer
covering the number of cases in the variant (with cases numbered in order from
0 to len(cases)-1). Depending on the payload type, this can allow more
compact representations of variants in memory. This smallest integer type is
selected by the following function, used above and below:
def alignment_variant(cases, ptr_type):
return max(alignment(discriminant_type(cases), ptr_type), max_case_alignment(cases, ptr_type))
def discriminant_type(cases):
n = len(cases)
assert(0 < n < (1 << 32))
match math.ceil(math.log2(n)/8):
case 0: return U8Type()
case 1: return U8Type()
case 2: return U16Type()
case 3: return U32Type()
def max_case_alignment(cases, ptr_type):
a = 1
for c in cases:
if c.t is not None:
a = max(a, alignment(c.t, ptr_type))
return a
As an optimization, flags are represented as packed bit-vectors. Like variant
discriminants, flags use the smallest integer that fits all the bits.
def alignment_flags(labels):
n = len(labels)
assert(0 < n <= 32)
if n <= 8: return 1
if n <= 16: return 2
return 4
Element Size
Each value type is also assigned an elem_size which is the number of bytes
used when values of the type are stored as elements of a list. Having this
byte size be a static property of the type instead of attempting to use a
variable-length element-encoding scheme both simplifies the implementation and
maps well to languages which represent lists as random-access arrays. Empty
types, such as records with no fields, are not permitted, to avoid
complications in source languages.
def elem_size(t, ptr_type):
match despecialize(t):
case BoolType() : return 1
case S8Type() | U8Type() : return 1
case S16Type() | U16Type() : return 2
case S32Type() | U32Type() : return 4
case S64Type() | U64Type() : return 8
case F32Type() : return 4
case F64Type() : return 8
case CharType() : return 4
case StringType() : return 2 * ptr_size(ptr_type)
case ErrorContextType() : return 4
case ListType(t, l) : return elem_size_list(t, l, ptr_type)
case RecordType(fields) : return elem_size_record(fields, ptr_type)
case VariantType(cases) : return elem_size_variant(cases, ptr_type)
case FlagsType(labels) : return elem_size_flags(labels)
case OwnType() | BorrowType() : return 4
case StreamType() | FutureType() : return 4
def elem_size_list(elem_type, maybe_length, ptr_type):
if maybe_length is not None:
return maybe_length * elem_size(elem_type, ptr_type)
return 2 * ptr_size(ptr_type)
def elem_size_record(fields, ptr_type):
s = 0
for f in fields:
s = align_to(s, alignment(f.t, ptr_type))
s += elem_size(f.t, ptr_type)
assert(s > 0)
return align_to(s, alignment_record(fields, ptr_type))
def align_to(ptr, alignment):
return math.ceil(ptr / alignment) * alignment
def elem_size_variant(cases, ptr_type):
s = elem_size(discriminant_type(cases), ptr_type)
s = align_to(s, max_case_alignment(cases, ptr_type))
cs = 0
for c in cases:
if c.t is not None:
cs = max(cs, elem_size(c.t, ptr_type))
s += cs
return align_to(s, alignment_variant(cases, ptr_type))
def elem_size_flags(labels):
n = len(labels)
assert(0 < n <= 32)
if n <= 8: return 1
if n <= 16: return 2
return 4
Loading
The load function defines how to read a value of a given value type t
out of linear memory starting at offset ptr, returning the value represented
as a Python value. Presenting the definition of load piecewise, we start with
the top-level case analysis:
def load(cx, ptr, t):
assert(ptr == align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
assert(ptr + elem_size(t, cx.opts.memory.ptr_type()) <= len(cx.opts.memory))
match despecialize(t):
case BoolType() : return convert_int_to_bool(load_int(cx, ptr, 1))
case U8Type() : return load_int(cx, ptr, 1)
case U16Type() : return load_int(cx, ptr, 2)
case U32Type() : return load_int(cx, ptr, 4)
case U64Type() : return load_int(cx, ptr, 8)
case S8Type() : return load_int(cx, ptr, 1, signed = True)
case S16Type() : return load_int(cx, ptr, 2, signed = True)
case S32Type() : return load_int(cx, ptr, 4, signed = True)
case S64Type() : return load_int(cx, ptr, 8, signed = True)
case F32Type() : return decode_i32_as_float(load_int(cx, ptr, 4))
case F64Type() : return decode_i64_as_float(load_int(cx, ptr, 8))
case CharType() : return convert_i32_to_char(cx, load_int(cx, ptr, 4))
case StringType() : return load_string(cx, ptr)
case ErrorContextType() : return lift_error_context(cx, load_int(cx, ptr, 4))
case ListType(t, l) : return load_list(cx, ptr, t, l)
case RecordType(fields) : return load_record(cx, ptr, fields)
case VariantType(cases) : return load_variant(cx, ptr, cases)
case FlagsType(labels) : return load_flags(cx, ptr, labels)
case OwnType() : return lift_own(cx, load_int(cx, ptr, 4), t)
case BorrowType() : return lift_borrow(cx, load_int(cx, ptr, 4), t)
case StreamType(t) : return lift_stream(cx, load_int(cx, ptr, 4), t)
case FutureType(t) : return lift_future(cx, load_int(cx, ptr, 4), t)
Integers are loaded directly from memory, with their high-order bit interpreted according to the signedness of the type.
def load_int(cx, ptr, nbytes, signed = False):
return int.from_bytes(cx.opts.memory[ptr : ptr+nbytes], 'little', signed = signed)
Integer-to-boolean conversions treats 0 as false and all other bit-patterns
as true:
def convert_int_to_bool(i):
assert(i >= 0)
return bool(i)
Floats are loaded directly from memory, with the sign and payload information of NaN values discarded. Consequently, there is only one unique NaN value per floating-point type. This reflects the practical reality that some languages and protocols do not preserve these bits. In the Python code below, this is expressed as canonicalizing NaNs to a particular bit pattern.
See the comments about lowering of float values for a discussion of possible optimizations.
DETERMINISTIC_PROFILE = False # or True
CANONICAL_FLOAT32_NAN = 0x7fc00000
CANONICAL_FLOAT64_NAN = 0x7ff8000000000000
def canonicalize_nan32(f):
if math.isnan(f):
f = core_f32_reinterpret_i32(CANONICAL_FLOAT32_NAN)
assert(math.isnan(f))
return f
def canonicalize_nan64(f):
if math.isnan(f):
f = core_f64_reinterpret_i64(CANONICAL_FLOAT64_NAN)
assert(math.isnan(f))
return f
def decode_i32_as_float(i):
return canonicalize_nan32(core_f32_reinterpret_i32(i))
def decode_i64_as_float(i):
return canonicalize_nan64(core_f64_reinterpret_i64(i))
def core_f32_reinterpret_i32(i):
return struct.unpack('<f', struct.pack('<I', i))[0] # f32.reinterpret_i32
def core_f64_reinterpret_i64(i):
return struct.unpack('<d', struct.pack('<Q', i))[0] # f64.reinterpret_i64
An i32 is converted to a char (a Unicode Scalar Value) by dynamically
testing that its unsigned integral value is in the valid Unicode Code Point
range and not a Surrogate:
def convert_i32_to_char(cx, i):
assert(i >= 0)
trap_if(i >= 0x110000)
trap_if(0xD800 <= i <= 0xDFFF)
return chr(i)
Strings are loaded from two pointer-sized values: a pointer (offset in linear
memory) and a number of code units. There are three supported string encodings
in canonopt: UTF-8, UTF-16 and latin1+utf16. This last option allows a
dynamic choice between Latin-1 and UTF-16, indicated by the high bit of the
second pointer-sized value (32nd bit on 32-bit memories, 64th bit on 64-bit
memories). String values include their original encoding and length in tagged
code units as a "hint" that enables store_string (defined below) to make
better up-front allocation size choices in many cases. Thus, the value produced
by load_string isn't simply a Python str, but a tuple containing a str,
the original encoding and the number of source code units.
The MAX_STRING_BYTE_LENGTH constant limits the byte length of a string when
loading. This limit is low enough to ensure that when storing the string via
store_string, the number of bytes allocated stays below REALLOC_I32_MAX even
when converting to other encodings. This means that any loaded string will be
short enough to be stored in 32-bit components. The worst case inflation in
string byte length is by a factor of 2 (e.g. this could occur when converting
from UTF-8 to UTF-16).
String = tuple[str, str, int]
def load_string(cx, ptr) -> String:
begin = load_int(cx, ptr, cx.opts.memory.ptr_size())
tagged_code_units = load_int(cx, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
return load_string_from_range(cx, begin, tagged_code_units)
def utf16_tag(ptr_type):
return 1 << (ptr_size(ptr_type) * 8 - 1)
REALLOC_I32_MAX = 2**32 - 1
MAX_STRING_BYTE_LENGTH = (1 << 28) - 1
assert(REALLOC_I32_MAX > 2 * MAX_STRING_BYTE_LENGTH)
def load_string_from_range(cx, ptr, tagged_code_units) -> String:
tag = utf16_tag(cx.opts.memory.ptr_type())
match cx.opts.string_encoding:
case 'utf8':
alignment = 1
byte_length = tagged_code_units
encoding = 'utf-8'
case 'utf16':
alignment = 2
byte_length = 2 * tagged_code_units
encoding = 'utf-16-le'
case 'latin1+utf16':
alignment = 2
if bool(tagged_code_units & tag):
byte_length = 2 * (tagged_code_units ^ tag)
encoding = 'utf-16-le'
else:
byte_length = tagged_code_units
encoding = 'latin-1'
trap_if(byte_length > MAX_STRING_BYTE_LENGTH)
trap_if(ptr != align_to(ptr, alignment))
trap_if(ptr + byte_length > len(cx.opts.memory))
try:
s = cx.opts.memory[ptr : ptr+byte_length].decode(encoding)
except UnicodeError:
trap()
return (s, cx.opts.string_encoding, tagged_code_units)
Error context values are lifted directly from the current component instance's
handles table:
def lift_error_context(cx, i):
errctx = cx.inst.handles.get(i)
trap_if(not isinstance(errctx, ErrorContext))
return errctx
Lists and records are loaded by recursively loading their elements/fields. The
byte length of a list is limited to MAX_LIST_BYTE_LENGTH which, similar to
MAX_STRING_BYTE_LENGTH, keeps list lengths well below the 32-bit address space
limit. The worst case inflation of a list length in bytes is by a factor of 2
(which could occur when pointer sizes increase from 4 to 8 bytes) so
MAX_LIST_BYTE_LENGTH is small enough that the byte length passed to realloc
stays below REALLOC_I32_MAX even when doubled.
MAX_LIST_BYTE_LENGTH = (1 << 28) - 1
assert(REALLOC_I32_MAX > 2 * MAX_LIST_BYTE_LENGTH)
def load_list(cx, ptr, elem_type, maybe_length):
if maybe_length is not None:
return load_list_from_valid_range(cx, ptr, maybe_length, elem_type)
begin = load_int(cx, ptr, cx.opts.memory.ptr_size())
length = load_int(cx, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
return load_list_from_range(cx, begin, length, elem_type)
def load_list_from_range(cx, ptr, length, elem_type):
trap_if(length * elem_size(elem_type, cx.opts.memory.ptr_type()) > MAX_LIST_BYTE_LENGTH)
trap_if(ptr != align_to(ptr, alignment(elem_type, cx.opts.memory.ptr_type())))
trap_if(ptr + length * elem_size(elem_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
return load_list_from_valid_range(cx, ptr, length, elem_type)
def load_list_from_valid_range(cx, ptr, length, elem_type):
a = []
for i in range(length):
a.append(load(cx, ptr + i * elem_size(elem_type, cx.opts.memory.ptr_type()), elem_type))
return a
def load_record(cx, ptr, fields):
record = {}
for field in fields:
ptr = align_to(ptr, alignment(field.t, cx.opts.memory.ptr_type()))
record[field.label] = load(cx, ptr, field.t)
ptr += elem_size(field.t, cx.opts.memory.ptr_type())
return record
As a technical detail: the align_to in the loop in load_record is
guaranteed to be a no-op on the first iteration because the record as
a whole starts out aligned (as asserted at the top of load).
Variants are loaded using the order of the cases in the type to determine the
case index, assigning 0 to the first case, 1 to the next case, etc.
While the code below appears to perform case-label lookup at runtime, a normal
implementation can build the appropriate index tables at compile-time so that
variant-passing is always O(1) and not involving string operations.
def load_variant(cx, ptr, cases):
disc_size = elem_size(discriminant_type(cases), cx.opts.memory.ptr_type())
case_index = load_int(cx, ptr, disc_size)
ptr += disc_size
trap_if(case_index >= len(cases))
c = cases[case_index]
ptr = align_to(ptr, max_case_alignment(cases, cx.opts.memory.ptr_type()))
if c.t is None:
return { c.label: None }
return { c.label: load(cx, ptr, c.t) }
Flags are converted from a bit-vector to a dictionary whose keys are
derived from the ordered labels of the flags type. The code here takes
advantage of Python's support for integers of arbitrary width.
def load_flags(cx, ptr, labels):
i = load_int(cx, ptr, elem_size_flags(labels))
return unpack_flags_from_int(i, labels)
def unpack_flags_from_int(i, labels):
record = {}
for l in labels:
record[l] = bool(i & 1)
i >>= 1
return record
own handles are lifted by removing the handle from the current component
instance's handles table so that ownership is transferred to the lowering
component. The lifting operation fails if unique ownership of the handle isn't
possible, for example if the index was actually a borrow or if the own
handle is currently being lent out as borrows.
def lift_own(cx, i, t):
h = cx.inst.handles.remove(i)
trap_if(not isinstance(h, ResourceHandle))
trap_if(h.rt is not t.rt)
trap_if(h.num_lends != 0)
trap_if(not h.own)
return h.rep
The abstract lifted value for handle types is currently just the internal
resource representation i32, which is kept opaque from the receiving
component (it's stored in the handles table and only accessed indirectly via
index). (This assumes that resource representations are immutable. If
representations were to become mutable, the address of the mutable cell would
be passed as the lifted value instead.)
In contrast to own, borrow handles are lifted by reading the representation
from the source handle, leaving the source handle intact in the current
component instance's handles table:
def lift_borrow(cx, i, t):
assert(isinstance(cx.borrow_scope, Subtask))
h = cx.inst.handles.get(i)
trap_if(not isinstance(h, ResourceHandle))
trap_if(h.rt is not t.rt)
cx.borrow_scope.add_lender(h)
return h.rep
The Subtask.add_lender participates in the enforcement of the dynamic borrow
rules, which keep the source handle alive until the end of the call (as a
conservative upper bound on how long the borrow handle can be held). Note
that add_lender is called for borrowed source handles so that they must be
kept alive until the subtask completes, which in turn prevents the current task
from task.returning while its non-returned subtask still holds a
transitively-borrowed handle.
Streams and futures are entirely symmetric, transferring ownership of the
readable end from the lifting component to the host or lowering component and
trapping if the readable end is in the middle of copying (which would create
a dangling-pointer situation) or is in the DONE state (in which case the only
valid operation is {stream,future}.drop-{readable,writable}).
def lift_stream(cx, i, t):
return lift_async_value(ReadableStreamEnd, cx, i, t)
def lift_future(cx, i, t):
return lift_async_value(ReadableFutureEnd, cx, i, t)
def lift_async_value(ReadableEndT, cx, i, t):
assert(not contains_borrow(t))
e = cx.inst.handles.remove(i)
trap_if(not isinstance(e, ReadableEndT))
trap_if(e.shared.t != t)
trap_if(e.state != CopyState.IDLE)
return e.shared
Storing
The store function defines how to write a value v of a given value type
t into linear memory starting at offset ptr. Presenting the definition of
store piecewise, we start with the top-level case analysis:
def store(cx, v, t, ptr):
assert(ptr == align_to(ptr, alignment(t, cx.opts.memory.ptr_type())))
assert(ptr + elem_size(t, cx.opts.memory.ptr_type()) <= len(cx.opts.memory))
match despecialize(t):
case BoolType() : store_int(cx, int(bool(v)), ptr, 1)
case U8Type() : store_int(cx, v, ptr, 1)
case U16Type() : store_int(cx, v, ptr, 2)
case U32Type() : store_int(cx, v, ptr, 4)
case U64Type() : store_int(cx, v, ptr, 8)
case S8Type() : store_int(cx, v, ptr, 1, signed = True)
case S16Type() : store_int(cx, v, ptr, 2, signed = True)
case S32Type() : store_int(cx, v, ptr, 4, signed = True)
case S64Type() : store_int(cx, v, ptr, 8, signed = True)
case F32Type() : store_int(cx, encode_float_as_i32(v), ptr, 4)
case F64Type() : store_int(cx, encode_float_as_i64(v), ptr, 8)
case CharType() : store_int(cx, char_to_i32(v), ptr, 4)
case StringType() : store_string(cx, v, ptr)
case ErrorContextType() : store_int(cx, lower_error_context(cx, v), ptr, 4)
case ListType(t, l) : store_list(cx, v, ptr, t, l)
case RecordType(fields) : store_record(cx, v, ptr, fields)
case VariantType(cases) : store_variant(cx, v, ptr, cases)
case FlagsType(labels) : store_flags(cx, v, ptr, labels)
case OwnType() : store_int(cx, lower_own(cx, v, t), ptr, 4)
case BorrowType() : store_int(cx, lower_borrow(cx, v, t), ptr, 4)
case StreamType(t) : store_int(cx, lower_stream(cx, v, t), ptr, 4)
case FutureType(t) : store_int(cx, lower_future(cx, v, t), ptr, 4)
Integers are stored directly into memory. Because the input domain is exactly
the integers in range for the given type, no extra range checks are necessary;
the signed parameter is only present to ensure that the internal range checks
of int.to_bytes are satisfied.
def store_int(cx, v, ptr, nbytes, signed = False):
cx.opts.memory[ptr : ptr+nbytes] = int.to_bytes(v, nbytes, 'little', signed = signed)
Floats are stored directly into memory, with the sign and payload bits of NaN values modified nondeterministically. This reflects the practical reality that different languages, protocols and CPUs have different effects on NaNs.
Although this nondeterminism is expressed in the Python code below as generating a "random" NaN bit-pattern, native implementations do not need to use the same "random" algorithm, or even any random algorithm at all. Hosts may instead chose to canonicalize to an arbitrary fixed NaN value, or even to the original value of the NaN before lifting, allowing them to optimize away both the canonicalization of lifting and the randomization of lowering.
When a host implements the deterministic profile, NaNs are canonicalized to a particular NaN bit-pattern.
def maybe_scramble_nan32(f):
if math.isnan(f):
if DETERMINISTIC_PROFILE:
f = core_f32_reinterpret_i32(CANONICAL_FLOAT32_NAN)
else:
f = core_f32_reinterpret_i32(random_nan_bits(32, 8))
assert(math.isnan(f))
return f
def maybe_scramble_nan64(f):
if math.isnan(f):
if DETERMINISTIC_PROFILE:
f = core_f64_reinterpret_i64(CANONICAL_FLOAT64_NAN)
else:
f = core_f64_reinterpret_i64(random_nan_bits(64, 11))
assert(math.isnan(f))
return f
def random_nan_bits(total_bits, exponent_bits):
fraction_bits = total_bits - exponent_bits - 1
bits = random.getrandbits(total_bits)
bits |= ((1 << exponent_bits) - 1) << fraction_bits
bits |= 1 << random.randrange(fraction_bits - 1)
return bits
def encode_float_as_i32(f):
return core_i32_reinterpret_f32(maybe_scramble_nan32(f))
def encode_float_as_i64(f):
return core_i64_reinterpret_f64(maybe_scramble_nan64(f))
def core_i32_reinterpret_f32(f):
return struct.unpack('<I', struct.pack('<f', f))[0] # i32.reinterpret_f32
def core_i64_reinterpret_f64(f):
return struct.unpack('<Q', struct.pack('<d', f))[0] # i64.reinterpret_f64
The integral value of a char (a Unicode Scalar Value) is a valid unsigned
i32 and thus no runtime conversion or checking is necessary:
def char_to_i32(c):
i = ord(c)
assert(0 <= i <= 0xD7FF or 0xE000 <= i <= 0x10FFFF)
return i
Storing strings is complicated by the goal of attempting to optimize the
different transcoding cases. In particular, one challenge is choosing the
linear memory allocation size before examining the contents of the string.
The reason for this constraint is that, in some settings where single-pass
iterators are involved (host calls and post-MVP adapter functions), examining
the contents of a string more than once would require making an engine-internal
temporary copy of the whole string, which the component model specifically aims
not to do. To avoid multiple passes, the canonical ABI instead uses a realloc
approach to update the allocation size during the single copy. A blind
realloc approach would normally suffer from multiple reallocations per string
(e.g., using the standard doubling-growth strategy). However, as already shown
in load_string above, string values come with two useful hints: their
original encoding and number of source code units. From this hint data,
store_string can do a much better job minimizing the number of reallocations.
We start with a case analysis to enumerate all the meaningful encoding
combinations, subdividing the latin1+utf16 encoding into either latin1 or
utf16 based on the utf16_tag flag set by load_string:
def store_string(cx, v: String, ptr):
begin, tagged_code_units = store_string_into_range(cx, v)
store_int(cx, begin, ptr, cx.opts.memory.ptr_size())
store_int(cx, tagged_code_units, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
def store_string_into_range(cx, v: String):
src, src_encoding, src_tagged_code_units = v
tag = utf16_tag(cx.opts.memory.ptr_type())
if src_encoding == 'latin1+utf16':
if bool(src_tagged_code_units & tag):
src_simple_encoding = 'utf16'
src_code_units = src_tagged_code_units ^ tag
else:
src_simple_encoding = 'latin1'
src_code_units = src_tagged_code_units
else:
src_simple_encoding = src_encoding
src_code_units = src_tagged_code_units
match cx.opts.string_encoding:
case 'utf8':
match src_simple_encoding:
case 'utf8' : return store_string_copy(cx, src, src_code_units, 1, 1, 'utf-8')
case 'utf16' : return store_utf16_to_utf8(cx, src, src_code_units)
case 'latin1' : return store_latin1_to_utf8(cx, src, src_code_units)
case 'utf16':
match src_simple_encoding:
case 'utf8' : return store_utf8_to_utf16(cx, src, src_code_units)
case 'utf16' : return store_string_copy(cx, src, src_code_units, 2, 2, 'utf-16-le')
case 'latin1' : return store_string_copy(cx, src, src_code_units, 2, 2, 'utf-16-le')
case 'latin1+utf16':
match src_encoding:
case 'utf8' : return store_string_to_latin1_or_utf16(cx, src, src_code_units)
case 'utf16' : return store_string_to_latin1_or_utf16(cx, src, src_code_units)
case 'latin1+utf16' :
match src_simple_encoding:
case 'latin1' : return store_string_copy(cx, src, src_code_units, 1, 2, 'latin-1')
case 'utf16' : return store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units)
The simplest 4 cases above can compute the exact destination size and then copy with a simply loop (that possibly inflates Latin-1 to UTF-16 by injecting a 0 byte after every Latin-1 byte).
def store_string_copy(cx, src, src_code_units, dst_code_unit_size, dst_alignment, dst_encoding):
dst_byte_length = dst_code_unit_size * src_code_units
assert(dst_byte_length <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, dst_alignment, dst_byte_length)
trap_if(ptr != align_to(ptr, dst_alignment))
trap_if(ptr + dst_byte_length > len(cx.opts.memory))
encoded = src.encode(dst_encoding)
assert(dst_byte_length == len(encoded))
cx.opts.memory[ptr : ptr+len(encoded)] = encoded
return (ptr, src_code_units)
The 2 cases of transcoding into UTF-8 share an algorithm that starts by optimistically assuming that each code unit of the source string fits in a single UTF-8 byte and then, failing that, reallocates to a worst-case size, finishes the copy, and then finishes with a shrinking reallocation.
def store_utf16_to_utf8(cx, src, src_code_units):
worst_case_size = src_code_units * 3
return store_string_to_utf8(cx, src, src_code_units, worst_case_size)
def store_latin1_to_utf8(cx, src, src_code_units):
worst_case_size = src_code_units * 2
return store_string_to_utf8(cx, src, src_code_units, worst_case_size)
def store_string_to_utf8(cx, src, src_code_units, worst_case_size):
assert(src_code_units <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, 1, src_code_units)
trap_if(ptr + src_code_units > len(cx.opts.memory))
for i,code_point in enumerate(src):
if ord(code_point) < 2**7:
cx.opts.memory[ptr + i] = ord(code_point)
else:
assert(worst_case_size <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(ptr, src_code_units, 1, worst_case_size)
trap_if(ptr + worst_case_size > len(cx.opts.memory))
encoded = src.encode('utf-8')
cx.opts.memory[ptr+i : ptr+len(encoded)] = encoded[i : ]
if worst_case_size > len(encoded):
ptr = cx.opts.realloc(ptr, worst_case_size, 1, len(encoded))
trap_if(ptr + len(encoded) > len(cx.opts.memory))
return (ptr, len(encoded))
return (ptr, src_code_units)
Converting from UTF-8 to UTF-16 performs an initial worst-case size allocation (assuming each UTF-8 byte encodes a whole code point that inflates into a two-byte UTF-16 code unit) and then does a shrinking reallocation at the end if multiple UTF-8 bytes were collapsed into a single 2-byte UTF-16 code unit:
def store_utf8_to_utf16(cx, src, src_code_units):
worst_case_size = 2 * src_code_units
assert(worst_case_size <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, 2, worst_case_size)
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + worst_case_size > len(cx.opts.memory))
encoded = src.encode('utf-16-le')
cx.opts.memory[ptr : ptr+len(encoded)] = encoded
if len(encoded) < worst_case_size:
ptr = cx.opts.realloc(ptr, worst_case_size, 2, len(encoded))
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + len(encoded) > len(cx.opts.memory))
code_units = int(len(encoded) / 2)
return (ptr, code_units)
The next transcoding case handles latin1+utf16 encoding, where there general
goal is to fit the incoming string into Latin-1 if possible based on the code
points of the incoming string. The algorithm speculates that all code points
do fit into Latin-1 and then falls back to a worst-case allocation size when
a code point is found outside Latin-1. In this fallback case, the
previously-copied Latin-1 bytes are inflated in place, inserting a 0 byte
after every Latin-1 byte (iterating in reverse to avoid clobbering later
bytes):
def store_string_to_latin1_or_utf16(cx, src, src_code_units):
assert(src_code_units <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, 2, src_code_units)
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + src_code_units > len(cx.opts.memory))
dst_byte_length = 0
for usv in src:
if ord(usv) < (1 << 8):
cx.opts.memory[ptr + dst_byte_length] = ord(usv)
dst_byte_length += 1
else:
worst_case_size = 2 * src_code_units
assert(worst_case_size <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(ptr, src_code_units, 2, worst_case_size)
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + worst_case_size > len(cx.opts.memory))
for j in range(dst_byte_length-1, -1, -1):
cx.opts.memory[ptr + 2*j] = cx.opts.memory[ptr + j]
cx.opts.memory[ptr + 2*j + 1] = 0
encoded = src.encode('utf-16-le')
cx.opts.memory[ptr+2*dst_byte_length : ptr+len(encoded)] = encoded[2*dst_byte_length : ]
if worst_case_size > len(encoded):
ptr = cx.opts.realloc(ptr, worst_case_size, 2, len(encoded))
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + len(encoded) > len(cx.opts.memory))
tagged_code_units = int(len(encoded) / 2) | utf16_tag(cx.opts.memory.ptr_type())
return (ptr, tagged_code_units)
if dst_byte_length < src_code_units:
ptr = cx.opts.realloc(ptr, src_code_units, 2, dst_byte_length)
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + dst_byte_length > len(cx.opts.memory))
return (ptr, dst_byte_length)
The final transcoding case takes advantage of the extra heuristic
information that the incoming UTF-16 bytes were intentionally chosen over
Latin-1 by the producer, indicating that they probably contain code points
outside Latin-1 and thus probably require inflation. Based on this
information, the transcoding algorithm pessimistically allocates storage for
UTF-16, deflating at the end if indeed no non-Latin-1 code points were
encountered. This Latin-1 deflation ensures that if a group of components
are all using latin1+utf16 and one component over-uses UTF-16, other
components can recover the Latin-1 compression. (The Latin-1 check can be
inexpensively fused with the UTF-16 validate+copy loop.)
def store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units):
src_byte_length = 2 * src_code_units
assert(src_byte_length <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, 2, src_byte_length)
trap_if(ptr != align_to(ptr, 2))
trap_if(ptr + src_byte_length > len(cx.opts.memory))
encoded = src.encode('utf-16-le')
cx.opts.memory[ptr : ptr+len(encoded)] = encoded
if any(ord(c) >= (1 << 8) for c in src):
tagged_code_units = int(len(encoded) / 2) | utf16_tag(cx.opts.memory.ptr_type())
return (ptr, tagged_code_units)
latin1_size = int(len(encoded) / 2)
for i in range(latin1_size):
cx.opts.memory[ptr + i] = cx.opts.memory[ptr + 2*i]
ptr = cx.opts.realloc(ptr, src_byte_length, 1, latin1_size)
trap_if(ptr + latin1_size > len(cx.opts.memory))
return (ptr, latin1_size)
Error context values are lowered by storing them directly into the current
component instance's handles table and passing the i32 index to wasm:
def lower_error_context(cx, v):
return cx.inst.handles.add(v)
Lists and records are stored by recursively storing their elements and are symmetric to the loading functions. Unlike strings, lists can simply allocate based on the up-front knowledge of length and static element size. Storing a list that exceeds the size of a 32-bit memory traps even when storing on 64-bit platform to avoid having interfaces that 32-bit components can't use.
def store_list(cx, v, ptr, elem_type, maybe_length):
if maybe_length is not None:
assert(maybe_length == len(v))
store_list_into_valid_range(cx, v, ptr, elem_type)
return
begin, length = store_list_into_range(cx, v, elem_type)
store_int(cx, begin, ptr, cx.opts.memory.ptr_size())
store_int(cx, length, ptr + cx.opts.memory.ptr_size(), cx.opts.memory.ptr_size())
def store_list_into_range(cx, v, elem_type):
byte_length = len(v) * elem_size(elem_type, cx.opts.memory.ptr_type())
assert(byte_length <= REALLOC_I32_MAX)
ptr = cx.opts.realloc(0, 0, alignment(elem_type, cx.opts.memory.ptr_type()), byte_length)
trap_if(ptr != align_to(ptr, alignment(elem_type, cx.opts.memory.ptr_type())))
trap_if(ptr + byte_length > len(cx.opts.memory))
store_list_into_valid_range(cx, v, ptr, elem_type)
return (ptr, len(v))
def store_list_into_valid_range(cx, v, ptr, elem_type):
for i,e in enumerate(v):
store(cx, e, elem_type, ptr + i * elem_size(elem_type, cx.opts.memory.ptr_type()))
def store_record(cx, v, ptr, fields):
for f in fields:
ptr = align_to(ptr, alignment(f.t, cx.opts.memory.ptr_type()))
store(cx, v[f.label], f.t, ptr)
ptr += elem_size(f.t, cx.opts.memory.ptr_type())
Variant values are represented as Python dictionaries containing exactly one
entry whose key is the label of the lifted case and whose value is the
(optional) case payload. While this code appears to do an O(n) search of the
variant type for a matching case label, a normal implementation can
statically fuse store_variant with its matching load_variant to ultimately
build a dense array that maps producer's case indices to the consumer's case
indices.
def store_variant(cx, v, ptr, cases):
case_index, case_value = match_case(v, cases)
disc_size = elem_size(discriminant_type(cases), cx.opts.memory.ptr_type())
store_int(cx, case_index, ptr, disc_size)
ptr += disc_size
ptr = align_to(ptr, max_case_alignment(cases, cx.opts.memory.ptr_type()))
c = cases[case_index]
if c.t is not None:
store(cx, case_value, c.t, ptr)
def match_case(v, cases):
[label] = v.keys()
[index] = [i for i,c in enumerate(cases) if c.label == label]
[value] = v.values()
return (index, value)
Flags are converted from a dictionary to a bit-vector by iterating through the labels of the flags type in the order they were listed in the type definition and OR-ing all the bits together. Flag lifting/lowering can be statically fused into array/integer operations (with a simple byte copy when the case lists are the same) to avoid any string operations in a similar manner to variants.
def store_flags(cx, v, ptr, labels):
i = pack_flags_into_int(v, labels)
store_int(cx, i, ptr, elem_size_flags(labels))
def pack_flags_into_int(v, labels):
i = 0
shift = 0
for l in labels:
i |= (int(bool(v[l])) << shift)
shift += 1
return i
Finally, own and borrow handles are lowered by initializing new handle
elements in the current component instance's handles table. The increment of
num_borrows is complemented by a decrement in canon_resource_drop and
ensures that all borrowed handles are dropped before the end of the task.
def lower_own(cx, rep, t):
h = ResourceHandle(t.rt, rep, own = True)
return cx.inst.handles.add(h)
def lower_borrow(cx, rep, t):
assert(isinstance(cx.borrow_scope, Task))
if cx.inst is t.rt.impl:
return rep
h = ResourceHandle(t.rt, rep, own = False, borrow_scope = cx.borrow_scope)
h.borrow_scope.num_borrows += 1
return cx.inst.handles.add(h)
The special case in lower_borrow is an optimization, recognizing that, when
a borrowed handle is passed to the component that implemented the resource
type, the only thing the borrowed handle is good for is calling
resource.rep, so lowering might as well avoid the overhead of creating an
intermediate borrow handle.
Lowering a stream or future is entirely symmetric and simply adds a new
readable end to the current component instance's handles table, passing the
index of the new element to core wasm:
def lower_stream(cx, v, t):
assert(isinstance(v, ReadableStream))
assert(not contains_borrow(t))
return cx.inst.handles.add(ReadableStreamEnd(v))
def lower_future(cx, v, t):
assert(isinstance(v, ReadableFuture))
assert(not contains_borrow(t))
return cx.inst.handles.add(ReadableFutureEnd(v))
Flattening
With only the definitions above, the Canonical ABI would be forced to place all
parameters and results in linear memory. While this is necessary in the general
case, in many cases performance can be improved by passing small-enough values
in registers by using core function parameters and results. To support this
optimization, the Canonical ABI defines flatten_functype to map component
function types to core function types by attempting to decompose all the
non-dynamically-sized component value types into core value types.
For a variety of practical reasons, we need to limit the total number of flattened parameters and results, falling back to storing everything in linear memory. The number of flattened results is currently limited to 1 due to various parts of the toolchain (notably the C ABI) not yet being able to express multi-value returns. Hopefully this limitation is temporary and can be lifted before the Component Model is fully standardized.
When there are too many flat values, in general, a single i32 pointer can be
passed instead (pointing to a tuple in linear memory). When lowering into
linear memory, this requires the Canonical ABI to call realloc (in lower
below) to allocate space to put the tuple. As an optimization, when lowering
the return value of an imported function (via canon lower), the caller can
have already allocated space for the return value (e.g., efficiently on the
stack), passing in an i32 pointer as an parameter instead of returning an
i32 as a return value.
Given all this, the top-level definition of flatten_functype is:
MAX_FLAT_PARAMS = 16
MAX_FLAT_ASYNC_PARAMS = 4
MAX_FLAT_RESULTS = 1
def flatten_functype(opts, ft, context):
flat_params = flatten_types(ft.param_types(), opts)
flat_results = flatten_types(ft.result_type(), opts)
if not opts.async_:
if len(flat_params) > MAX_FLAT_PARAMS:
flat_params = [opts.memory.ptr_type()]
if len(flat_results) > MAX_FLAT_RESULTS:
match context:
case 'lift':
flat_results = [opts.memory.ptr_type()]
case 'lower':
flat_params += [opts.memory.ptr_type()]
flat_results = []
return CoreFuncType(flat_params, flat_results)
else:
match context:
case 'lift':
if len(flat_params) > MAX_FLAT_PARAMS:
flat_params = [opts.memory.ptr_type()]
if opts.callback:
flat_results = ['i32']
else:
flat_results = []
case 'lower':
if len(flat_params) > MAX_FLAT_ASYNC_PARAMS:
flat_params = [opts.memory.ptr_type()]
if len(flat_results) > 0:
flat_params += [opts.memory.ptr_type()]
flat_results = ['i32']
return CoreFuncType(flat_params, flat_results)
def flatten_types(ts, opts):
return [ft for t in ts for ft in flatten_type(t, opts)]
As shown here, the core signatures of async-lowered functions use a lower
limit on the maximum number of parameters (4) and results (0) passed as scalars
before falling back to passing through memory.
Presenting the definition of flatten_type piecewise, we start with the
top-level case analysis:
def flatten_type(t, opts):
match despecialize(t):
case BoolType() : return ['i32']
case U8Type() | U16Type() | U32Type() : return ['i32']
case S8Type() | S16Type() | S32Type() : return ['i32']
case S64Type() | U64Type() : return ['i64']
case F32Type() : return ['f32']
case F64Type() : return ['f64']
case CharType() : return ['i32']
case StringType() : return [opts.memory.ptr_type(), opts.memory.ptr_type()]
case ErrorContextType() : return ['i32']
case ListType(t, l) : return flatten_list(t, l, opts)
case RecordType(fields) : return flatten_record(fields, opts)
case VariantType(cases) : return flatten_variant(cases, opts)
case FlagsType(labels) : return ['i32']
case OwnType() | BorrowType() : return ['i32']
case StreamType() | FutureType() : return ['i32']
List flattening of a fixed-length list uses the same flattening as a tuple
(via flatten_record below).
def flatten_list(elem_type, maybe_length, opts):
if maybe_length is not None:
return flatten_type(elem_type, opts) * maybe_length
return [opts.memory.ptr_type(), opts.memory.ptr_type()]
Record flattening simply flattens each field in sequence.
def flatten_record(fields, opts):
flat = []
for f in fields:
flat += flatten_type(f.t, opts)
return flat
Variant flattening is more involved due to the fact that each case payload can
have a totally different flattening. Rather than giving up when there is a type
mismatch, the Canonical ABI relies on the fact that the 4 core value types can
be easily bit-cast between each other and defines a join operator to pick the
tightest approximation. What this means is that, regardless of the dynamic
case, all flattened variants are passed with the same static set of core types,
which may involve, e.g., reinterpreting an f32 as an i32 or zero-extending
an i32 into an i64.
def flatten_variant(cases, opts):
flat = []
for c in cases:
if c.t is not None:
for i,ft in enumerate(flatten_type(c.t, opts)):
if i < len(flat):
flat[i] = join(flat[i], ft)
else:
flat.append(ft)
return flatten_type(discriminant_type(cases), opts) + flat
def join(a, b):
if a == b: return a
if (a == 'i32' and b == 'f32') or (a == 'f32' and b == 'i32'): return 'i32'
return 'i64'
Flat Lifting
Values are lifted by iterating over a list of parameter or result Core WebAssembly values:
class CoreValueIter:
values: list[int|float]
i: int
def __init__(self, vs):
self.values = vs
self.i = 0
def next(self, t):
v = self.values[self.i]
self.i += 1
match t:
case 'i32': assert(isinstance(v, int) and 0 <= v < 2**32)
case 'i64': assert(isinstance(v, int) and 0 <= v < 2**64)
case 'f32': assert(isinstance(v, (int,float)))
case 'f64': assert(isinstance(v, (int,float)))
case _ : assert(False)
return v
def done(self):
return self.i == len(self.values)
The match is only used for spec-level assertions; no runtime typecase is
required.
The lift_flat function defines how to convert a list of core values into a
single high-level value of type t. Presenting the definition of lift_flat
piecewise, we start with the top-level case analysis:
def lift_flat(cx, vi, t):
match despecialize(t):
case BoolType() : return convert_int_to_bool(vi.next('i32'))
case U8Type() : return lift_flat_unsigned(vi, 32, 8)
case U16Type() : return lift_flat_unsigned(vi, 32, 16)
case U32Type() : return lift_flat_unsigned(vi, 32, 32)
case U64Type() : return lift_flat_unsigned(vi, 64, 64)
case S8Type() : return lift_flat_signed(vi, 32, 8)
case S16Type() : return lift_flat_signed(vi, 32, 16)
case S32Type() : return lift_flat_signed(vi, 32, 32)
case S64Type() : return lift_flat_signed(vi, 64, 64)
case F32Type() : return canonicalize_nan32(vi.next('f32'))
case F64Type() : return canonicalize_nan64(vi.next('f64'))
case CharType() : return convert_i32_to_char(cx, vi.next('i32'))
case StringType() : return lift_flat_string(cx, vi)
case ErrorContextType() : return lift_error_context(cx, vi.next('i32'))
case ListType(t, l) : return lift_flat_list(cx, vi, t, l)
case RecordType(fields) : return lift_flat_record(cx, vi, fields)
case VariantType(cases) : return lift_flat_variant(cx, vi, cases)
case FlagsType(labels) : return lift_flat_flags(vi, labels)
case OwnType() : return lift_own(cx, vi.next('i32'), t)
case BorrowType() : return lift_borrow(cx, vi.next('i32'), t)
case StreamType(t) : return lift_stream(cx, vi.next('i32'), t)
case FutureType(t) : return lift_future(cx, vi.next('i32'), t)
Integers are lifted from core i32 or i64 values using the signedness of the
target type to interpret the high-order bit. When the target type is narrower
than an i32, the Canonical ABI ignores the unused high bits (like load_int).
The conversion logic here assumes that i32 values are always represented as
unsigned Python ints and thus lifting to a signed type performs a manual 2s
complement conversion in the Python (which would be a no-op in hardware).
def lift_flat_unsigned(vi, core_width, t_width):
i = vi.next('i' + str(core_width))
assert(0 <= i < (1 << core_width))
return i % (1 << t_width)
def lift_flat_signed(vi, core_width, t_width):
i = vi.next('i' + str(core_width))
assert(0 <= i < (1 << core_width))
i %= (1 << t_width)
if i >= (1 << (t_width - 1)):
return i - (1 << t_width)
return i
The contents of strings and variable-length lists are stored in memory so lifting these types is essentially the same as loading them from memory; the only difference is that the pointer and length come from ptr-sized values instead of from linear memory. Fixed-length lists are lifted the same way as a tuple.
def lift_flat_string(cx, vi):
ptr = vi.next(cx.opts.memory.ptr_type())
packed_length = vi.next(cx.opts.memory.ptr_type())
return load_string_from_range(cx, ptr, packed_length)
def lift_flat_list(cx, vi, elem_type, maybe_length):
if maybe_length is not None:
a = []
for i in range(maybe_length):
a.append(lift_flat(cx, vi, elem_type))
return a
ptr = vi.next(cx.opts.memory.ptr_type())
length = vi.next(cx.opts.memory.ptr_type())
return load_list_from_range(cx, ptr, length, elem_type)
Records are lifted by recursively lifting their fields:
def lift_flat_record(cx, vi, fields):
record = {}
for f in fields:
record[f.label] = lift_flat(cx, vi, f.t)
return record
Variants are also lifted recursively. Lifting a variant must carefully follow
the definition of flatten_variant above, consuming the exact same core types
regardless of the dynamic case payload being lifted. Because of the join
performed by flatten_variant, we need a more-permissive value iterator that
reinterprets between the different types appropriately and also wraps
i64 values to 32-bit when needed:
def lift_flat_variant(cx, vi, cases):
flat_types = flatten_variant(cases, cx.opts)
assert(flat_types.pop(0) == 'i32')
case_index = vi.next('i32')
trap_if(case_index >= len(cases))
class CoerceValueIter:
def next(self, want):
have = flat_types.pop(0)
x = vi.next(have)
match (have, want):
case ('i32', 'f32') : return decode_i32_as_float(x)
case ('i64', 'i32') : return wrap_i64_to_i32(x)
case ('i64', 'f32') : return decode_i32_as_float(wrap_i64_to_i32(x))
case ('i64', 'f64') : return decode_i64_as_float(x)
case _ : assert(have == want); return x
c = cases[case_index]
if c.t is None:
v = None
else:
v = lift_flat(cx, CoerceValueIter(), c.t)
for have in flat_types:
_ = vi.next(have)
return { c.label: v }
def wrap_i64_to_i32(i):
assert(0 <= i < (1 << 64))
return i % (1 << 32)
Finally, flags are lifted by lifting to a record the same way as when loading flags from linear memory.
def lift_flat_flags(vi, labels):
assert(0 < len(labels) <= 32)
i = vi.next('i32')
return unpack_flags_from_int(i, labels)
Flat Lowering
The lower_flat function defines how to convert a value v of a given type
t into zero or more core values. Presenting the definition of lower_flat
piecewise, we start with the top-level case analysis:
def lower_flat(cx, v, t):
match despecialize(t):
case BoolType() : return [int(v)]
case U8Type() : return [v]
case U16Type() : return [v]
case U32Type() : return [v]
case U64Type() : return [v]
case S8Type() : return lower_flat_signed(v, 32)
case S16Type() : return lower_flat_signed(v, 32)
case S32Type() : return lower_flat_signed(v, 32)
case S64Type() : return lower_flat_signed(v, 64)
case F32Type() : return [maybe_scramble_nan32(v)]
case F64Type() : return [maybe_scramble_nan64(v)]
case CharType() : return [char_to_i32(v)]
case StringType() : return lower_flat_string(cx, v)
case ErrorContextType() : return [lower_error_context(cx, v)]
case ListType(t, l) : return lower_flat_list(cx, v, t, l)
case RecordType(fields) : return lower_flat_record(cx, v, fields)
case VariantType(cases) : return lower_flat_variant(cx, v, cases)
case FlagsType(labels) : return lower_flat_flags(v, labels)
case OwnType() : return [lower_own(cx, v, t)]
case BorrowType() : return [lower_borrow(cx, v, t)]
case StreamType(t) : return [lower_stream(cx, v, t)]
case FutureType(t) : return [lower_future(cx, v, t)]
Since component-level values are assumed in-range and, as previously stated,
core i32 values are always internally represented as unsigned ints,
unsigned integer values need no extra conversion. Signed integer values are
converted to unsigned core i32 or i64 values by 2s complement arithmetic
(which again would be a no-op in hardware):
def lower_flat_signed(i, core_bits):
if i < 0:
i += (1 << core_bits)
return [i]
Since strings and variable-length lists are stored in linear memory, lowering can reuse the previous definitions; only the resulting pointers are returned differently (as flat values instead of as a pair in linear memory). Fixed-length lists are lowered the same way as tuples.
def lower_flat_string(cx, v):
ptr, packed_length = store_string_into_range(cx, v)
return [ptr, packed_length]
def lower_flat_list(cx, v, elem_type, maybe_length):
if maybe_length is not None:
assert(maybe_length == len(v))
flat = []
for e in v:
flat += lower_flat(cx, e, elem_type)
return flat
(ptr, length) = store_list_into_range(cx, v, elem_type)
return [ptr, length]
Records are lowered by recursively lowering their fields:
def lower_flat_record(cx, v, fields):
flat = []
for f in fields:
flat += lower_flat(cx, v[f.label], f.t)
return flat
Variants are also lowered recursively. Symmetric to lift_flat_variant above,
lower_flat_variant must consume all flattened types of flatten_variant,
manually coercing the otherwise-incompatible type pairings allowed by join:
def lower_flat_variant(cx, v, cases):
case_index, case_value = match_case(v, cases)
flat_types = flatten_variant(cases, cx.opts)
assert(flat_types.pop(0) == 'i32')
c = cases[case_index]
if c.t is None:
payload = []
else:
payload = lower_flat(cx, case_value, c.t)
for i,(fv,have) in enumerate(zip(payload, flatten_type(c.t, cx.opts))):
want = flat_types.pop(0)
match (have, want):
case ('f32', 'i32') : payload[i] = encode_float_as_i32(fv)
case ('i32', 'i64') : payload[i] = fv
case ('f32', 'i64') : payload[i] = encode_float_as_i32(fv)
case ('f64', 'i64') : payload[i] = encode_float_as_i64(fv)
case _ : assert(have == want)
for _ in flat_types:
payload.append(0)
return [case_index] + payload
Finally, flags are lowered by packing the flags into an i32 bitvector.
def lower_flat_flags(v, labels):
assert(0 < len(labels) <= 32)
return [pack_flags_into_int(v, labels)]
Lifting and Lowering Values
The lift_flat_values function defines how to lift a list of core
parameters or results (given by the CoreValueIter vi) into a tuple
of component-level values with types ts.
def lift_flat_values(cx, max_flat, vi, ts):
flat_types = flatten_types(ts, cx.opts)
if len(flat_types) > max_flat:
ptr = vi.next(cx.opts.memory.ptr_type())
tuple_type = TupleType(ts)
trap_if(ptr != align_to(ptr, alignment(tuple_type, cx.opts.memory.ptr_type())))
trap_if(ptr + elem_size(tuple_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
return list(load(cx, ptr, tuple_type).values())
else:
return [ lift_flat(cx, vi, t) for t in ts ]
Symmetrically, the lower_flat_values function defines how to lower a
list of component-level values vs of types ts into a list of core
values. As already described for flatten_functype above,
lowering handles the greater-than-max_flat case by either allocating
storage with realloc or accepting a caller-allocated buffer as an
out-param:
def lower_flat_values(cx, max_flat, vs, ts, out_param = None):
cx.inst.may_leave = False
flat_types = flatten_types(ts, cx.opts)
if len(flat_types) > max_flat:
tuple_type = TupleType(ts)
tuple_value = {str(i): v for i,v in enumerate(vs)}
if out_param is None:
ptr = cx.opts.realloc(0, 0, alignment(tuple_type, cx.opts.memory.ptr_type()), elem_size(tuple_type, cx.opts.memory.ptr_type()))
flat_vals = [ptr]
else:
ptr = out_param.next(cx.opts.memory.ptr_type())
flat_vals = []
trap_if(ptr != align_to(ptr, alignment(tuple_type, cx.opts.memory.ptr_type())))
trap_if(ptr + elem_size(tuple_type, cx.opts.memory.ptr_type()) > len(cx.opts.memory))
store(cx, tuple_value, tuple_type, ptr)
else:
flat_vals = []
for i in range(len(vs)):
flat_vals += lower_flat(cx, vs[i], ts[i])
cx.inst.may_leave = True
return flat_vals
The may_leave flag is guarded by canon_lower below to prevent a component
from calling out of the component while in the middle of lowering, ensuring
that the relative ordering of the side effects of lifting followed by lowering
cannot be observed and thus an implementation may reliably fuse lifting with
lowering when making a cross-component call to avoid the intermediate copy.
Canonical Definitions
Using the above supporting definitions, we can describe the static and dynamic
semantics of component-level canon definitions. The following subsections
cover each of these canon cases.
canonopt Validation
Canonical options, often referred to as $opts in the definitions below,
can be specified at most once in any particular list of options. For example
specifying string-encoding=utf8 twice is an error. Each individual option, if
present, is validated as such:
string-encoding=N- can be passed at most once, regardless ofN.memory- this is a subtype of(memory 0)- ๐
memorymay also be a subtype of(memory i64 0)
- ๐
realloc- the function has type(func (param addr addr addr addr) (result addr))whereaddris the address type coming from thememtypeof thememorycanonopt (restricted toi32, but with ๐ may also bei64).- if
reallocis present thenmemorymust be present post-return- only allowed oncanon lift, which has rules for validation- ๐
async- is only allowed when used with anasyncfunction type incanon liftorcanon lowerand cannot be present withpost-return - ๐,not(๐)
async-callbackmust also be present. Note that with the ๐ feature (the "stackful" ABI), this restriction is lifted. - ๐
callback- the function has type(func (param i32 i32 i32) (result i32))and cannot be present withoutasyncand is only allowed withcanon lift
Additionally some options are required depending on lift/lower operations performed for a component. These are defined as:
-
lower(T)- requires
memoryifTcontains alistorstring
- requires
-
lift(T)- requires
reallocifTcontains alistorstring
- requires
canon lift
For a canonical definition:
(canon lift $callee:<funcidx> $opts:<canonopt>* (func $f (type $ft)))
In addition to general validation of $opts the additional
validation is performed:
$calleemust have typeflatten_functype($opts, $ft, 'lift')$fis given type$ft- if a
post-returnis present, it has type(func (param flatten_functype($opts, $ft, 'lift').results)) - requires options based on
lift(param)for all parameters inft - requires options based on
lower(result)iffthas a result - if
len(flatten_types(ft.param_types())) > MAX_FLAT_PARAMS,reallocis required - if
len(flatten_types(ft.result_type())) > max(wheremax = MAX_FLAT_RESULTSfor sync lifts, andmax = MAX_FLAT_PARAMSfor async lifts),memoryis required
Note that an async-lifted function whose result type requires a memory to lift
(either because it contains lists or strings or because the number of flattened
types exceeds MAX_FLAT_PARAMS) must include a memory option, and that option
must exactly match that of the task.return built-in called at runtime.
When instantiating a component instance, the runtime calls Store.lift (defined
above) to capture the $callee, $ft and $opts immediates of canon lift
along with the component instance being instantiated. These are then passed into
canon_lift every time the generated FuncInst is called, along with the
runtime on_start, on_resolve and caller arguments.
Based on this, canon_lift is defined in chunks as follows. The whole call
executes in a new implicit thread defined here by thread_func. The first
thing this implicit thread does is to wait for any backpressure, as defined by
Task.enter_implicit_thread above:
def canon_lift(callee, ft, opts, inst, on_start, on_resolve, caller) -> OnCancel:
def thread_func():
if not task.enter_implicit_thread():
return
Once the backpressure gate is cleared, the arguments are lowered into core wasm
values and memory according to the canonopt immediates of canon lift (as
defined by lower_flat_values above). Note that if the caller cancels a task
while the task is still waiting on backpressure, the call will be aborted
before the arguments are lowered and thus owned handles are not transferred
to the callee. The CANCELLED_BEFORE_{STARTED,RETURNED} return codes from
subtask.cancel are thus necessary for the caller to distinguish whether
their arguments were lowered or not.
cx = LiftLowerContext(opts, inst, task)
args = task.start()
flat_args = lower_flat_values(cx, MAX_FLAT_PARAMS, args, ft.param_types())
flat_ft = flatten_functype(opts, ft, 'lift')
assert(types_match_values(flat_ft.params, flat_args))
If the async canonopt is not specified, a lifted function then calls
the core wasm callee, passing the lowered arguments in core function parameters
and receiving the return value as core function results. Once the core results
are lifted according to lift_flat_values above, the optional post-return
function (specified as a canonopt immediate of canon lift) is called,
passing the same core wasm results as parameters so that the post-return
function can free any associated allocations.
if not opts.async_:
flat_results = call_and_trap_on_throw(callee, flat_args)
assert(types_match_values(flat_ft.results, flat_results))
result = lift_flat_values(cx, MAX_FLAT_RESULTS, CoreValueIter(flat_results), ft.result_type())
task.return_(result)
if opts.post_return is not None:
inst.may_leave = False
[] = call_and_trap_on_throw(opts.post_return, flat_results)
inst.may_leave = True
task.exit_implicit_thread()
return
By clearing may_leave for the duration of the post-return call, the
Canonical ABI ensures that synchronously-lowered calls to synchronously-lifted
functions can always be implemented by a plain synchronous function call
without the need for fibers which would otherwise be necessary if the
post-return function performed a blocking operation.
In both of the async cases below (with or without callback), the
task.return built-in must be called, providing the return value as core wasm
parameters to the task.return built-in (rather than as core function
results as in the synchronous case). If task.return is not called by the
time the Task's last Thread exits, there is a trap (in Task.unregister_thread).
In the async non-callback ("stackful async") case, there is a single call
to the core wasm callee which must return empty core results. Waiting for async
I/O happens by the callee synchronously calling built-ins like
waitable-set.wait. Note that, since Task.enter_implicit_thread does not
acquire the exclusive_thread lock for stackful async functions, calls to
waitable-set.wait made by a stackful async function do not prevent any other
threads from starting or resuming in the same component instance.
if not opts.callback:
[] = call_and_trap_on_throw(callee, flat_args)
assert(types_match_values(flat_ft.results, []))
task.exit_implicit_thread()
return
Lastly, in the async callback ("stackless async") case, waiting happens by
first calling the core wasm callee and then repeatedly calling the callback
function (specified as a funcidx immediate in canon lift) until the
EXIT code (0) is returned:
[packed] = call_and_trap_on_throw(callee, flat_args)
code,si = unpack_callback_result(packed)
while code != CallbackCode.EXIT:
assert(task.needs_exclusive() and inst.exclusive_thread is task.implicit_thread)
inst.exclusive_thread = None
match code:
case CallbackCode.YIELD:
cancelled = thread.wait_until(lambda: not inst.exclusive_thread, cancellable = True)
if cancelled:
event = (EventCode.TASK_CANCELLED, 0, 0)
else:
event = (EventCode.NONE, 0, 0)
case CallbackCode.WAIT:
wset = inst.handles.get(si)
trap_if(not isinstance(wset, WaitableSet))
event = wset.wait_for_event_and(lambda: not inst.exclusive_thread, cancellable = True)
case _:
trap()
assert(inst.exclusive_thread is None)
inst.exclusive_thread = task.implicit_thread
event_code, p1, p2 = event
[packed] = call_and_trap_on_throw(opts.callback, [event_code, p1, p2])
code,si = unpack_callback_result(packed)
task.exit_implicit_thread()
return
The Thread.wait_until and WaitableSet.wait_for_event_and methods called by
the event loop are the same methods called by the thread.yield and
waitable-set.wait built-ins. Thus, the main difference between stackful and
stackless async is whether these suspending operations are performed from an
empty or non-empty core wasm callstack (with the former allowing additional
engine optimization).
The event loop releases ComponentInstance.exclusive_thread (which was acquired
by Task.enter_implicit_thread) before potentially blocking the thread to allow
other needs_exclusive tasks to execute in the interim. However, the
exclusive_thread lock is held throughout each core wasm invocation from the
event loop to maintain Component Invariant #3. Thus, async callback-lifted
tasks allow more concurrency than synchronously-lifted tasks (which only
release the exclusive_thread lock after they've returned) but less
concurrency than (stackful) non-callback async-lifted tasks, which entirely
ignore exclusive_thread.
The end of canon_lift creates a new task/thread pair for the call and then
calls Thread.resume on the new thread to synchronously transfer control flow
to it (jumping to the top of thread_func above). The new thread executes until
it either returns from thread_func or blocks by (transitively) calling
Thread.block_internal. If a non-async-typed call blocks before the implicit
thread has returned a value and there are no other ready threads in the same
component instance, canon_lift traps, since non-async-typed calls may not
block. Otherwise, canon_lift switches to a thread (nondeterministically, if
multiple are ready), as if the guest code had done so itself using a built-in
like thread.suspend-then-promote. This allows fully-synchronous components to
still use cooperative pthreads that interleave via threading built-ins (e.g.,
thread.yield) and even perform blocking I/O as long as the blocking I/O does
not transitively block returning a value to the caller (as would also be
expressible with a CPS transform like Asyncify). Lastly, canon_lift returns
Task.request_cancellation, bound to the call's new task, as the OnCancel
return value of FuncInst.
task = Task(ft, opts, inst, on_start, on_resolve, caller)
thread = Thread(task, thread_func)
thread.resume()
if not ft.async_:
while task.state != Task.State.RESOLVED:
candidates = { t for t in inst.threads if t.ready() and t is not inst.exclusive_thread }
trap_if(not candidates)
random.choice(list(candidates)).resume()
return task.request_cancellation
The special case that excludes any thread (created by a previous blocked async
call) holding the instance's exclusive_thread lock is necessary to preserve
Component Invariant #3, which might otherwise be violated if the current
synchronous call is using the single global linear memory shadow stack.
Note that, because non-async-typed functions can't block, they do not actually
require a separate thread/fiber/stack to implement the above specified behavior
and a normal synchronous native function call can be used instead. Even if the
non-async-typed callee uses cooperative threads, since control flow must
inevitably make its way back to the implicit thread to return a value
(task.return is not allowed), the caller's stack can always be used for the
callee's implicit thread, knowing that control flow must switch back before
returning to the caller (or else trap and tear down the whole store).
The bit-packing scheme used for the i32 packed return value is defined as
follows:
class CallbackCode(IntEnum):
EXIT = 0
YIELD = 1
WAIT = 2
MAX = 2
def unpack_callback_result(packed):
code = packed & 0xf
trap_if(code > CallbackCode.MAX)
assert(packed < 2**32)
assert(Table.MAX_LENGTH < 2**28)
waitable_set_index = packed >> 4
return (CallbackCode(code), waitable_set_index)
The ability to asynchronously wait, yield and exit is thus available to
both the callback and non-callback cases, making callback just an
optimization to avoid allocating stacks for async languages that have avoided
the need for stackful coroutines by design (e.g., async/await in JS,
Python, C# and Rust).
Uncaught Core WebAssembly exceptions or, in a future with stack-switching,
unhandled events, result in a trap at component boundaries. Thus, if a component
wishes to signal an error, it must use some sort of explicit type such as
result (whose error case particular language bindings may choose to map to
and from exceptions):
def call_and_trap_on_throw(callee, args):
try:
return callee(args)
except CoreWebAssemblyException:
trap()
canon lower
For a canonical definition:
(canon lower $callee:<funcidx> $opts:<canonopt>* (core func $f))
In addition to general validation of $opts, additional
validation is performed where $callee has type $ft:
$fis given typeflatten_functype($opts, $ft, 'lower')- requires options based on
lower(param)for all parameters inft - requires options based on
lift(result)iffthas a result - if
len(flatten_types(ft.param_types())) > max_flat_params,memoryis required - if
len(flatten_types(ft.result_type())) > max_flat_results,reallocis required - ๐ if
asyncis specified,memorymust be present
When instantiating a component instance, the runtime calls Store.lower (defined
above) to capture the $callee, $ft and $opts immediates of canon lower
along with the component instance being instantiated. These are then passed into
canon_lower every time the generated CoreFuncInst is called, along with the
runtime Core WebAssembly arguments.
Based on this, canon_lower is defined in chunks as follows. First, each call
to canon_lower creates a new Subtask. However, this Subtask is only added
to the current component instance's handles table (below) if async is
specified and callee blocks. In any case, this Subtask is used as the
LiftLowerContext.borrow_scope for borrow arguments, ensuring that owned
handles are not dropped before Subtask.deliver_resolve is called (below).
def canon_lower(callee, ft, opts, flat_args: list[CoreValType]) -> list[CoreValType]:
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
subtask = Subtask()
cx = LiftLowerContext(opts, thread.task.inst, subtask)
The next chunk makes the call to callee using the opts immediates of the
canon lower definition to configure lift_flat_values and lower_flat_values
(both defined above) and the current instance as the caller.
flat_ft = flatten_functype(opts, ft, 'lower')
assert(types_match_values(flat_ft.params, flat_args))
flat_args = CoreValueIter(flat_args)
if not opts.async_:
max_flat_params = MAX_FLAT_PARAMS
max_flat_results = MAX_FLAT_RESULTS
else:
max_flat_params = MAX_FLAT_ASYNC_PARAMS
max_flat_results = 0
on_progress = lambda:()
flat_results = None
def on_start():
on_progress()
assert(subtask.state == Subtask.State.STARTING)
subtask.state = Subtask.State.STARTED
return lift_flat_values(cx, max_flat_params, flat_args, ft.param_types())
def on_resolve(result):
on_progress()
if result is None:
assert(subtask.cancellation_requested)
if subtask.state == Subtask.State.STARTING:
subtask.state = Subtask.State.CANCELLED_BEFORE_STARTED
else:
assert(subtask.state == Subtask.State.STARTED)
subtask.state = Subtask.State.CANCELLED_BEFORE_RETURNED
else:
assert(subtask.state == Subtask.State.STARTED)
subtask.state = Subtask.State.RETURNED
nonlocal flat_results
flat_results = lower_flat_values(cx, max_flat_results, result, ft.result_type(), flat_args)
subtask.on_cancel = callee(on_start, on_resolve, caller = thread.task.inst)
assert(ft.async_ or subtask.state == Subtask.State.RETURNED)
The Subtask.state field is updated by the callbacks to keep track of the
call progress. The on_progress variable starts as a no-op, but is used by the
async case below to trigger event delivery.
According to the FuncInst calling contract, the call to callee should never
"block" (i.e., wait on I/O). If the callee would block, it will instead
return an OnCancel callback which is stored in the Subtask (so that it can
be used to request cancellation in the future). Furthermore, if the function
type does not have the async effect, the function must have returned a
value.
In the synchronous case (when the async canonopt is not set), if the
callee blocked before calling on_resolve, the synchronous caller's thread
is non-cancellably suspended until the callee calls on_resolve to return a
value. Note that just because the callee called on_resolve doesn't mean
that the callee has finished execution: async functions are allowed to keep
executing after returning their value. However, if the callee is also
synchronous then (since post-return is prevented from blocking via
may_leave) the callee cannot keep executing concurrently after returning a
value and thus the implementation can avoid the creation of any Thread and
use a plain synchronous function call instead, as expected.
if not opts.async_:
if not subtask.resolved():
thread.wait_until(subtask.resolved)
assert(types_match_values(flat_ft.results, flat_results))
subtask.deliver_resolve()
return flat_results
The call to Subtask.deliver_resolve decrements the counters on handles that
were lent for borrowed parameters during the call. These counters are
necessary even during a synchronous call to prevent a concurrent async task
from dropping lent handles while the synchronous call is blocked.
In the async case, if the callee already called on_resolve, then the
RETURNED code is eagerly returned to the core wasm caller without needing to
add a Subtask to the current component instance's handles table. Otherwise,
the index of a new Subtask is returned, bit-packed with the current state of
the Subtask (which will either be STARTING or STARTED). STARTING tells
the caller that they need to keep the memory for both the arguments and results
allocated; STARTED tells the caller that the arguments have been read and
thus any argument memory can be reused, but the result buffer has to be kept
reserved.
else:
if subtask.resolved():
assert(flat_results == [])
subtask.deliver_resolve()
return [Subtask.State.RETURNED]
else:
subtaski = thread.task.inst.handles.add(subtask)
def on_progress():
def subtask_event():
if subtask.resolved():
subtask.deliver_resolve()
return (EventCode.SUBTASK, subtaski, subtask.state)
subtask.set_pending_event(subtask_event)
assert(0 < subtaski <= Table.MAX_LENGTH < 2**28)
assert(0 <= subtask.state < 2**4)
return [subtask.state | (subtaski << 4)]
When on_start and on_resolve are called after this initial async-lowered
call returns, the on_progress callback (called by on_start and on_resolve)
will set a pending event on the Subtask (which derives Waitable) so that it
can be waited on via waitable-set.{wait,poll} or, if a callback is used, by
returning to the event loop. If on_start is called followed by on_resolve
before core wasm receives the first event, core wasm will only receive the
second event, not two events. Note Subtask.drop prevents (via trap) a
Subtask from being dropped before on_resolve is called and the event is
delivered to core wasm to ensure that Subtask.deliver_resolve always performs
its lend-count accounting.
canon resource.new
For a canonical definition:
(canon resource.new $rt (core func $f))
validation specifies:
$rtmust refer to locally-defined (not imported) resource type$fis given type(func (param $rt.rep) (result i32)), where$rt.repis fixed to bei32- ๐ -
$rt.repmay bei32ori64
- ๐ -
Calling $f invokes the following function, which adds an owning handle
containing the given resource representation to the current component
instance's handles table:
def canon_resource_new(rt, rep):
inst = current_instance()
trap_if(not inst.may_leave)
h = ResourceHandle(rt, rep, own = True)
i = inst.handles.add(h)
return [i]
canon resource.drop
For a canonical definition:
(canon resource.drop $rt $async? (core func $f))
validation specifies:
$rtmust refer to resource type$fis given type(func (param i32))
Calling $f invokes the following function, which removes the handle from the
current component instance's handles table and, if the handle was owning,
calls the resource's destructor.
def canon_resource_drop(rt, i):
inst = current_instance()
trap_if(not inst.may_leave)
h = inst.handles.remove(i)
trap_if(not isinstance(h, ResourceHandle))
trap_if(h.rt is not rt)
trap_if(h.num_lends != 0)
if h.own:
assert(h.borrow_scope is None)
opts = CanonicalOptions(async_ = False)
ft = FuncType([U32Type()], [], async_ = False)
dtor = rt.dtor or (lambda rep: [])
callee = inst.store.lift(dtor, ft, opts, rt.impl)
caller = inst.store.lower(callee, ft, opts, inst)
caller([h.rep])
else:
h.borrow_scope.num_borrows -= 1
return []
The call to a resource's destructor passes the i32 representation value that
was previously supplied to resource.new. The call works like a normal
non-async cross-component call, using the same canon_lift and canon_lower
rules to, for example, catch reentrance. Because the type, lifting and
lowering are all non-async, the destructor may not block. However, the
destructor may spawn a cooperative thread that does.
In particular, Store.lift may trap (if rt.impl.may_enter_from(inst) is
False) if the call to the destructor would reenter the destructor's instance
in a way that violates Component Invariant #2. In the special case where the
current_instance is the same as the destructor's instance, may_enter_from
will always return True (because the set of instances being freshly entered is
empty) and so, as one might expect, component instances can resource.drop the
owned handles of the resources they implement.
canon resource.rep
For a canonical definition:
(canon resource.rep $rt (core func $f))
validation specifies:
$rtmust refer to a locally-defined (not imported) resource type$fis given type(func (param i32) (result $rt.rep)), where$rt.repis fixed to bei32- ๐ -
$rt.repmay bei32ori64
- ๐ -
Calling $f invokes the following function, which extracts the resource
representation from the handle in the current component instance's handles
table:
def canon_resource_rep(rt, i):
h = current_instance().handles.get(i)
trap_if(not isinstance(h, ResourceHandle))
trap_if(h.rt is not rt)
return [h.rep]
Note that the "locally-defined" requirement above ensures that only the component instance defining a resource can access its representation.
๐ canon context.get
For a canonical definition:
(canon context.get $t $i (core func $f))
validation specifies:
$tmust bei32(see here).- ๐ -
$tmay also bei64. Allcontext.getandcontext.setbuilt-ins defined in a single component must specify the same$t.
- ๐ -
$imust be less than2$fis given type(func (result $t))
Calling $f invokes the following function, which reads the thread-local
storage of the current thread.
def canon_context_get(t, i):
thread = current_thread()
assert(t == 'i32' or t == 'i64')
assert(i < len(thread.storage))
result = thread.storage[i]
assert(result < (2 ** (ptr_size(t) * 8)))
return [result]
๐ canon context.set
For a canonical definition:
(canon context.set $t $i (core func $f))
validation specifies:
$tmust bei32(see here)- ๐ -
$tmay also bei64. Allcontext.getandcontext.setbuilt-ins defined in a single component must specify the same$t.
- ๐ -
$imust be less than2$fis given type(func (param $v $t))
Calling $f invokes the following function, which writes to the thread-local
storage of the current thread:
def canon_context_set(t, i, v):
thread = current_thread()
assert(t == 'i32' or t == 'i64')
assert(i < len(thread.storage))
assert(v < (2 ** (ptr_size(t) * 8)))
thread.storage[i] = v
return []
๐โ canon backpressure.set
This built-in is deprecated in favor of
backpressure.{inc,dec}and will be removed once producer tools have transitioned. Producer tools should avoid emitting calls to bothsetandinc/decsincesetwill clobber the counter.
For a canonical definition:
(canon backpressure.set (core func $f))
validation specifies:
$fis given type(func (param $enabled i32))
Calling $f invokes the following function, which sets the backpressure
counter to 1 or 0. Task.enter_implicit_thread waits for backpressure to
be 0 before allowing new async-typed tasks to start.
def canon_backpressure_set(flat_args):
assert(len(flat_args) == 1)
current_instance().backpressure = int(bool(flat_args[0]))
return []
๐ canon backpressure.{inc,dec}
For a canonical definition:
(canon backpressure.inc (core func $inc))
(canon backpressure.dec (core func $dec))
validation specifies:
$inc/$decare given type(func)
Calling $inc or $dec invokes one of the following functions:
def canon_backpressure_inc():
inst = current_instance()
assert(0 <= inst.backpressure < 2**16)
inst.backpressure += 1
trap_if(inst.backpressure == 2**16)
return []
def canon_backpressure_dec():
inst = current_instance()
assert(0 <= inst.backpressure < 2**16)
inst.backpressure -= 1
trap_if(inst.backpressure < 0)
return []
Task.enter_implicit_thread waits for backpressure to return to 0 before
allowing new async-typed tasks to start, implementing backpressure.
๐ canon task.return
For a canonical definition:
(canon task.return (result $t)? $opts (core func $f))
In addition to general validation of $opts validation
specifies:
$fis given typeflatten_functype($opts, (func (param $t)?), 'lower')$optsmay only containmemoryandstring-encodinglift($f.result)above defines required options
Calling $f invokes the following function which lifts the results from core
wasm state and passes them to the current task's caller via Task.return_:
def canon_task_return(result_type, opts: LiftOptions, flat_args):
task = current_task()
trap_if(not task.inst.may_leave)
trap_if(not task.opts.async_)
trap_if(result_type != task.ft.result)
trap_if(not LiftOptions.equal(opts, task.opts))
cx = LiftLowerContext(opts, task.inst, task)
result = lift_flat_values(cx, MAX_FLAT_PARAMS, CoreValueIter(flat_args), task.ft.result_type())
task.return_(result)
return []
The trap_if(not task.opts.async_) prevents task.return from being called by
synchronously-lifted functions (which return their value by returning from the
lifted core function).
The trap_if(result_type != task.ft.result) guard ensures that, in a
component with multiple exported functions of different types, task.return is
not called with a mismatched result type (which, due to indirect control flow,
can in general only be caught dynamically).
The trap_if(not LiftOptions.equal(opts, task.opts)) guard ensures that the
return value is lifted the same way as the canon lift from which this
task.return is returning. This ensures that AOT fusion of canon lift and
canon lower can generate a thunk that is indirectly called by task.return
after these guards. Inside LiftOptions.equal, opts.memory is compared with
task.opts.memory via object identity of the mutable memory instance. Since
memory refers to a mutable instance of memory, this comparison is not
concerned with the static memory indices (in canon lift and canon task.return), only the identity of the memories created
at instantiation-/ run-time. In Core WebAssembly spec terms, the test is on the
equality of the memaddr values stored in the instance's memaddrs table
which is indexed by the static memidx.
๐ canon task.cancel
For a canonical definition:
(canon task.cancel (core func $f))
validation specifies:
$fis given type(func)
Calling $f cancels the current task, confirming a previous subtask.cancel
request made by a supertask and claiming that all borrow handles lent to the
current task have already been dropped (and trapping in Task.cancel if not).
def canon_task_cancel():
task = current_task()
trap_if(not task.inst.may_leave)
trap_if(not task.opts.async_)
task.cancel()
return []
The trap_if(not task.opts.async_) prevents task.cancel from being called by
synchronously-lifted functions (which must always return a value by returning
from the lifted core function).
Task.cancel also traps if the cancellation has not been delivered to the task
(in which case the callee should not yet know to cancel) or if the task has
already returned a value or already called task.cancel.
๐ canon waitable-set.new
For a canonical definition:
(canon waitable-set.new (core func $f))
validation specifies:
$fis given type(func (result i32))
Calling $f invokes the following function, which adds an empty waitable set
to the current component instance's handles table:
def canon_waitable_set_new():
inst = current_instance()
trap_if(not inst.may_leave)
i = inst.handles.add(WaitableSet())
return [i]
๐ canon waitable-set.wait
For a canonical definition:
(canon waitable-set.wait $cancellable? (memory $mem) (core func $f))
validation specifies:
$fis given type(func (param $si i32) (param $ptr T) (result i32))whereTisi32- ๐ -
Tisi32ori64as determined by the address type of$mem
- ๐ -
Calling $f invokes the following function which waits for progress to be made
on a Waitable in the given waitable set (indicated by index $si) and then
returning its EventCode and writing the payload values into linear memory:
def canon_waitable_set_wait(cancellable, mem, si, ptr):
task = current_task()
trap_if(not task.inst.may_leave)
wset = task.inst.handles.get(si)
trap_if(not isinstance(wset, WaitableSet))
event = wset.wait_for_event(cancellable)
return unpack_event(mem, task.inst, ptr, event)
def unpack_event(mem, inst, ptr, e: EventTuple):
event, p1, p2 = e
cx = LiftLowerContext(LiftLowerOptions(memory = mem), inst)
store(cx, p1, U32Type(), ptr)
store(cx, p2, U32Type(), ptr + 4)
return [event]
If cancellable is set, then waitable-set.wait will return whether the
supertask has already or concurrently requested cancellation.
waitable-set.wait (and other cancellable operations) will only indicate
cancellation once and thus, if a caller is not prepared to propagate
cancellation, they can omit cancellable so that cancellation is instead
delivered at a later cancellable call.
๐ canon waitable-set.poll
For a canonical definition:
(canon waitable-set.poll $cancellable? (memory $mem) (core func $f))
validation specifies:
$fis given type(func (param $si i32) (param $ptr T) (result i32))whereTisi32- ๐ -
Tisi32ori64as determined by the address type of$mem
- ๐ -
Calling $f invokes the following function, which either returns an event that
was pending on one of the waitables in the given waitable set (the same way as
waitable-set.wait) or, if there is none, returns 0.
def canon_waitable_set_poll(cancellable, mem, si, ptr):
inst = current_instance()
trap_if(not inst.may_leave)
wset = inst.handles.get(si)
trap_if(not isinstance(wset, WaitableSet))
event = wset.poll(cancellable)
return unpack_event(mem, inst, ptr, event)
If cancellable is set, then waitable-set.poll will return whether the
supertask has already or concurrently requested cancellation.
waitable-set.poll (and other cancellable operations) will only indicate
cancellation once and thus, if a caller is not prepared to propagate
cancellation, they can omit cancellable so that cancellation is instead
delivered at a later cancellable call.
๐ canon waitable-set.drop
For a canonical definition:
(canon waitable-set.drop (core func $f))
validation specifies:
$fis given type(func (param i32))
Calling $f invokes the following function, which removes the indicated
waitable set from the current component instance's handles table, performing
the guards defined by WaitableSet.drop above:
def canon_waitable_set_drop(i):
inst = current_instance()
trap_if(not inst.may_leave)
wset = inst.handles.remove(i)
trap_if(not isinstance(wset, WaitableSet))
wset.drop()
return []
Note that WaitableSet.drop will trap if it is non-empty or there is a
concurrent waitable-set.wait or async callback currently using this
waitable set.
๐ canon waitable.join
For a canonical definition:
(canon waitable.join (core func $f))
validation specifies:
$fis given type(func (param $wi i32) (param $si i32))
Calling $f invokes the following function which adds the waitable indicated
by the index wi to the waitable set indicated by the index si, removing the
waitable from any waitable set that it is currently a member of.
def canon_waitable_join(wi, si):
inst = current_instance()
trap_if(not inst.may_leave)
w = inst.handles.get(wi)
trap_if(not isinstance(w, Waitable))
trap_if(w.has_sync_waiter)
if si == 0:
w.join(None)
else:
wset = inst.handles.get(si)
trap_if(not isinstance(wset, WaitableSet))
w.join(wset)
return []
As described with the definition of Waitable above, to prevent surprising
deadlocks, a waitable that is currently being synchronously waited on traps if
added to a waitable set.
Note that tables do not allow elements at index 0, so 0 is a valid sentinel
that tells join to remove the given waitable from any set that it is
currently a part of. Waitables can be a member of at most one set, so if the
given waitable is already in one set, it will be transferred.
๐ canon subtask.cancel
For a canonical definition:
(canon subtask.cancel async? (core func $f))
validation specifies:
$fis given type(func (param i32) (result i32))- ๐ -
asyncis allowed (otherwise it must be absent)
Calling $f sends a request to a nondeterministically-chosen thread of the
subtask at the given index to cancel the subtask ASAP. This request is
cooperative and the subtask may take arbitrarily long to receive and confirm
the request. If the subtask doesn't immediately confirm the cancellation
request, subtask.cancel returns BLOCKED and the caller must wait for a
SUBTASK progress update using waitable-set methods as usual.
When cancellation is confirmed the supertask will receive the final state of the subtask which is one of:
RETURNED, if the subtask successfully returned a value viatask.return;CANCELLED_BEFORE_STARTED, if the subtask was cancelled before receiving its arguments (and thus noownhandles were transferred); orCANCELLED_BEFORE_RETURNED, if the subtask calledtask.cancelinstead oftask.return.
This state is either returned by subtask.cancel, if the subtask resolved
without blocking, or, if subtask.cancel returns BLOCKED, then as part of
the event payload of a future SUBTASK event.
BLOCKED = 0xffff_ffff
def canon_subtask_cancel(async_, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
subtask = thread.task.inst.handles.get(i)
trap_if(not isinstance(subtask, Subtask))
trap_if(subtask.resolve_delivered())
trap_if(subtask.cancellation_requested)
trap_if(subtask.in_waitable_set() and not async_)
if subtask.resolved():
assert(subtask.has_pending_event())
else:
subtask.cancellation_requested = True
subtask.on_cancel()
if not subtask.resolved():
if not async_:
subtask.wait_for_pending_event()
else:
return [BLOCKED]
code,index,payload = subtask.get_pending_event()
assert(code == EventCode.SUBTASK and index == i and payload == subtask.state)
assert(subtask.resolve_delivered())
return [subtask.state]
subtask.cancel starts by trapping if called twice for the same subtask or if
the supertask has already been notified that the subtask has returned or if the
subtask is already being asynchronously waited on via waitable set.
A race condition handled by the above code is that it's possible for a subtask
to have already resolved (by calling task.return or task.cancel) and
updated the state stored in the Subtask (such that Subtask.resolved() is
True) but this fact has not yet been delivered to the supertask by the
supertask calling get_pending_event on the Subtask in its table. This
distinction is captured by Subtask.resolved vs. Subtask.resolve_delivered.
๐ canon subtask.drop
For a canonical definition:
(canon subtask.drop (core func $f))
validation specifies:
$fis given type(func (param i32))
Calling $f removes the subtask at the given index from the current component
instance's handles table, performing the guards and bookkeeping defined by
Subtask.drop().
def canon_subtask_drop(i):
inst = current_instance()
trap_if(not inst.may_leave)
s = inst.handles.remove(i)
trap_if(not isinstance(s, Subtask))
s.drop()
return []
๐ canon {stream,future}.new
For canonical definitions:
(canon stream.new $stream_t (core func $f))
(canon future.new $future_t (core func $f))
validation specifies:
$fis given type(func (result i64))$stream_t/$future_tmust be a type of the form(stream $t?)/(future $t?)
Calling $f calls canon_{stream,future}_new which adds two elements to the
current component instance's handles table and returns their indices packed
into a single i64. The first element (in the low 32 bits) is the readable end
(of the new {stream, future}) and the second element (in the high 32 bits) is
the writable end. The expectation is that, after calling {stream,future}.new,
the readable end is subsequently transferred to another component (or the host)
via stream or future parameter/result type (see lift_{stream,future}
above).
def canon_stream_new(stream_t):
inst = current_instance()
trap_if(not inst.may_leave)
shared = SharedStreamImpl(stream_t.t)
ri = inst.handles.add(ReadableStreamEnd(shared))
wi = inst.handles.add(WritableStreamEnd(shared))
return [ ri | (wi << 32) ]
def canon_future_new(future_t):
inst = current_instance()
trap_if(not inst.may_leave)
shared = SharedFutureImpl(future_t.t)
ri = inst.handles.add(ReadableFutureEnd(shared))
wi = inst.handles.add(WritableFutureEnd(shared))
return [ ri | (wi << 32) ]
๐ canon stream.{read,write}
For canonical definitions:
(canon stream.read $stream_t $opts (core func $f))
(canon stream.write $stream_t $opts (core func $f))
In addition to general validation of $opts validation
specifies:
$fis given type(func (param i32 T T) (result T))whereTisi32- ๐ -
Tisi32ori64as determined by the address type ofmemoryfrom$opts(ori32by default if nomemoryis present)
- ๐ -
$stream_tmust be a type of the form(stream $t?)- If
$tis present:lower($t)above defines required options forstream.writelift($t)above defines required options forstream.readmemoryis required to be present
- ๐ -
asyncis allowed to be omitted, otherwise it must be present
The implementation of these built-ins funnels down to a single stream_copy
function that is parameterized by the direction of the copy:
def canon_stream_read(stream_t, opts, i, ptr, n):
return stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
stream_t, opts, i, ptr, n)
def canon_stream_write(stream_t, opts, i, ptr, n):
return stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
stream_t, opts, i, ptr, n)
Introducing the stream_copy function in chunks, first, the element at index
i is checked to be of the right type and allowed to start a new copy. (In the
future, the "trap if not IDLE" condition could be relaxed to allow multiple
pipelined reads or writes.) There is also a trap if attempting to synchronously
read or write from a stream that is already being asynchronously waited on via
waitable set.
def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
e = thread.task.inst.handles.get(i)
trap_if(not isinstance(e, EndT))
trap_if(e.shared.t != stream_t.t)
trap_if(e.state != CopyState.IDLE)
trap_if(e.in_waitable_set() and not opts.async_)
Then a readable or writable buffer is created which (in Buffer's constructor)
eagerly checks the alignment and bounds of (ptr, n). (In the future, the
restriction on futures/streams containing borrows could be relaxed by
maintaining sufficient bookkeeping state to ensure that borrowed handles or
streams/futures of borrowed handles could not outlive their originating call.
Additionally, stream<char> will be allowed and defined to encode and decode
according to the string-encoding.)
assert(not isinstance(stream_t, CharType))
assert(not contains_borrow(stream_t))
cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None)
buffer = BufferT(stream_t.t, cx, ptr, n)
Next, the copy method of {Readable,Writable}{Stream,Future}End is called to
perform the actual read/write. The on_copy* callbacks passed to copy bind
and store a stream_event closure on the readable/writable end (via the
inherited Waitable.set_pending_event) which will be called right before the
event is delivered to core wasm. stream_event first calls reclaim_buffer to
regain ownership of buffer and prevent any further partial reads/writes.
Thus, up until event delivery, the other end of the stream is free to
repeatedly read/write from/to buffer, ideally filling it up and minimizing
context switches. Next, the stream's state is updated based on the result
being delivered to core wasm so that, once a stream end has been notified that
the other end dropped, calling anything other than stream.drop-* traps.
Lastly, stream_event packs the CopyResult and number of elements copied up
until this point into a single i32 or i64-sized payload for core wasm. The
size is determined by the addrtype coming from the memtype of the
memory immediate. Note that even though the number of elements copied is
packed into an addrtype, the maximum length of the buffer is fixed at 2^28 - 1
independently of the addrtype.
def stream_event(result, reclaim_buffer):
reclaim_buffer()
assert(e.copying())
if result == CopyResult.DROPPED:
e.state = CopyState.DONE
else:
e.state = CopyState.IDLE
assert(0 <= result < 2**4)
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
packed_result = result | (buffer.progress << 4)
return (event_code, i, packed_result)
def on_copy(reclaim_buffer):
e.set_pending_event(partial(stream_event, CopyResult.COMPLETED, reclaim_buffer))
def on_copy_done(result):
e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:()))
e.state = CopyState.COPYING
e.copy(thread.task.inst, buffer, on_copy, on_copy_done)
When this copy makes progress, a stream_event is set on the stream end's
Waitable base object. If stream.{read,write} is called synchronously, the
call suspends the current thread until an event is set, so that the event can
be returned. Otherwise, asynchronous calls deliver the event if it was produced
synchronously and return BLOCKED if not:
if not e.has_pending_event():
if not opts.async_:
e.wait_for_pending_event()
else:
return [BLOCKED]
code,index,payload = e.get_pending_event()
assert(code == event_code and index == i and payload != BLOCKED)
return [payload]
๐ canon future.{read,write}
For canonical definitions:
(canon future.read $future_t $opts (core func $f))
(canon future.write $future_t $opts (core func $f))
In addition to general validation of $opts validation
specifies:
$fis given type(func (param i32 T) (result i32))whereTisi32- ๐ -
Tisi32ori64as determined by the address type ofmemoryfrom$opts(ori32by default if nomemoryis present)
- ๐ -
$future_tmust be a type of the form(future $t?)- If
$tis present:lift($t)above defines required options forfuture.readlower($t)above defines required options forfuture.writememoryis required to be present
- ๐ -
asyncis allowed to be omitted, otherwise it must be present
The implementation of these built-ins funnels down to a single future_copy
function that is parameterized by the direction of the copy:
def canon_future_read(future_t, opts, i, ptr):
return future_copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
future_t, opts, i, ptr)
def canon_future_write(future_t, opts, i, ptr):
return future_copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
future_t, opts, i, ptr)
Introducing the future_copy function in chunks, future_copy starts with the
same set of guards on the element i as stream_copy, except checking for a
future end instead of a stream end:
def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
e = thread.task.inst.handles.get(i)
trap_if(not isinstance(e, EndT))
trap_if(e.shared.t != future_t.t)
trap_if(e.state != CopyState.IDLE)
trap_if(e.in_waitable_set() and not opts.async_)
Next, a readable or writable buffer is created, as with streams, except that the
buffer length is fixed to 1 and there is no validation-time prohibition on
future<char>:
assert(not contains_borrow(future_t))
cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None)
buffer = BufferT(future_t.t, cx, ptr, 1)
Next, the copy method of {Readable,Writable}FutureEnd.copy is called to
perform the actual read/write. Other than the simplifications allowed by the
absence of repeated partial copies, the main difference in the following code
from the stream code is that future_event transitions the end to the DONE
state (in which the only valid operation is to call future.drop-*) on
either the DROPPED and COMPLETED results. This ensures that futures are
read/written at most once and futures are only passed to other components in a
state where they are ready to be read/written. Another important difference is
that, since the buffer length is always implied by the CopyResult, the number
of elements copied is not packed in the high 28 bits; they're always zero.
def future_event(result):
assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED))
assert(e.copying())
if result == CopyResult.DROPPED or result == CopyResult.COMPLETED:
e.state = CopyState.DONE
else:
e.state = CopyState.IDLE
return (event_code, i, result)
def on_copy_done(result):
assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE)
e.set_pending_event(partial(future_event, result))
e.state = CopyState.COPYING
e.copy(thread.task.inst, buffer, on_copy_done)
The end of future_copy is the exact same as stream_copy: waiting if called
synchronously and returning either the progress made or BLOCKED.
if not e.has_pending_event():
if not opts.async_:
e.wait_for_pending_event()
else:
return [BLOCKED]
code,index,payload = e.get_pending_event()
assert(code == event_code and index == i)
return [payload]
๐ canon {stream,future}.cancel-{read,write}
For canonical definitions:
(canon stream.cancel-read $stream_t $async? (core func $f))
(canon stream.cancel-write $stream_t $async? (core func $f))
(canon future.cancel-read $future_t $async? (core func $f))
(canon future.cancel-write $future_t $async? (core func $f))
validation specifies:
$fis given type(func (param i32) (result i32))$stream_t/$future_tmust be a type of the form(stream $t?)/(future $t?)- ๐ -
asyncis allowed (otherwise it must be absent)
The implementation of these four built-ins all funnel down to a single
parameterized cancel_copy function:
def canon_stream_cancel_read(stream_t, async_, i):
return cancel_copy(ReadableStreamEnd, EventCode.STREAM_READ, stream_t, async_, i)
def canon_stream_cancel_write(stream_t, async_, i):
return cancel_copy(WritableStreamEnd, EventCode.STREAM_WRITE, stream_t, async_, i)
def canon_future_cancel_read(future_t, async_, i):
return cancel_copy(ReadableFutureEnd, EventCode.FUTURE_READ, future_t, async_, i)
def canon_future_cancel_write(future_t, async_, i):
return cancel_copy(WritableFutureEnd, EventCode.FUTURE_WRITE, future_t, async_, i)
def cancel_copy(EndT, event_code, stream_or_future_t, async_, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
e = thread.task.inst.handles.get(i)
trap_if(not isinstance(e, EndT))
trap_if(e.shared.t != stream_or_future_t.t)
trap_if(e.state != CopyState.COPYING or e.has_sync_waiter)
trap_if(e.in_waitable_set() and not async_)
e.state = CopyState.CANCELLING_COPY
if not e.has_pending_event():
e.shared.cancel()
if not e.has_pending_event():
if not async_:
e.wait_for_pending_event()
else:
return [BLOCKED]
code,index,payload = e.get_pending_event()
assert(not e.copying() and code == event_code and index == i)
return [payload]
Cancellation traps if there is not currently an async copy in progress (sync copies do not expect or check for cancellation and thus cannot be cancelled, and repeatedly cancelling the same async copy after the first call blocked is not allowed). There is also a trap if attempting to synchronously cancel a stream operation when the stream end is already being asynchronously waited on by a waitable set.
The first check for e.has_pending_event() catches the case where the copy has
already racily finished, in which case we must not call cancel(). Calling
cancel() may, but is not required to, recursively call one of the on_*
callbacks (passed by canon_{stream,future}_{read,write} above) which will set
a pending event that is caught by the second check for
e.has_pending_event().
If the copy hasn't been cancelled, the synchronous case suspends the thread to
wait for one of the on_* callbacks to eventually be called (which will set
the pending event).
The asynchronous case simply returns BLOCKED and the client code must wait
as usual for a {STREAM,FUTURE}_{READ,WRITE} event. In this case, cancellation
has served only to asynchronously request that the host relinquish the buffer
ASAP without waiting for anything to be read or written.
If BLOCKED is not returned, the pending event (which is necessarily a
stream_event or future_event) is eagerly delivered to core wasm as the return value, thereby
saving an additional turn of the event loop. In this case, the core wasm
caller can assume that ownership of the buffer has been returned.
๐ canon {stream,future}.drop-{readable,writable}
For canonical definitions:
(canon stream.drop-readable $stream_t (core func $f))
(canon stream.drop-writable $stream_t (core func $f))
(canon future.drop-readable $future_t (core func $f))
(canon future.drop-writable $future_t (core func $f))
validation specifies:
$fis given type(func (param i32))$stream_t/$future_tmust be a type of the form(stream $t?)/(future $t?)
Calling $f removes the readable or writable end of the stream or future at
the given index from the current component instance's handles table,
performing the guards and bookkeeping defined by
{Readable,Writable}{Stream,Future}End.drop() above.
def canon_stream_drop_readable(stream_t, i):
return drop(ReadableStreamEnd, stream_t, i)
def canon_stream_drop_writable(stream_t, hi):
return drop(WritableStreamEnd, stream_t, hi)
def canon_future_drop_readable(future_t, i):
return drop(ReadableFutureEnd, future_t, i)
def canon_future_drop_writable(future_t, hi):
return drop(WritableFutureEnd, future_t, hi)
def drop(EndT, stream_or_future_t, hi):
inst = current_instance()
trap_if(not inst.may_leave)
e = inst.handles.remove(hi)
trap_if(not isinstance(e, EndT))
trap_if(e.shared.t != stream_or_future_t.t)
e.drop()
return []
๐งต canon thread.index
For a canonical definition:
(canon thread.index (core func $index))
validation specifies:
$indexis given type(func (result i32))
Calling $index invokes the following function, which extracts the index
of the current thread:
def canon_thread_index():
thread = current_thread()
assert(thread.index is not None)
return [thread.index]
๐งต canon thread.new-indirect
For a canonical definition:
(canon thread.new-indirect $ft $ftbl (core func $new_indirect))
validation specifies
$ftmust refer to the type(func (param $c T))whereTisi32- ๐ -
Tmay bei32ori64
- ๐ -
$ftblmust refer to a table whose element type matchesfuncref$new_indirectis given type(func (param $fi U) (param $c T) (result i32))whereTcomes from$ft, as described above, andUisi32- ๐ -
Uisi32ori64as determined by$ftbl's address type
- ๐ -
Calling $new_indirect invokes the following function which reads a funcref
from $ftbl (trapping if out-of-bounds, null or the wrong type), creates a new
suspended thread that will call the funcref passing the closure parameter $c
when resumed, and returns the index of the new thread in the current component
instance's threads table.
@dataclass
class CoreFuncRef:
t: CoreFuncType
callee: Callable[[list[CoreValType]], list[CoreValType]]
def canon_thread_new_indirect(ft, ftbl: Table[CoreFuncRef], fi, c):
task = current_task()
trap_if(not task.inst.may_leave)
f = ftbl.get(fi)
assert(ft == CoreFuncType(['i32'], []) or ft == CoreFuncType(['i64'], []))
trap_if(f.t != ft)
def thread_func():
[] = call_and_trap_on_throw(f.callee, [c])
task.unregister_thread(new_thread)
new_thread = Thread(task, thread_func)
assert(new_thread.suspended())
task.register_thread(new_thread)
return [new_thread.index]
The newly-created thread starts out in a "suspended" state and so, to
actually start executing, Core WebAssembly code must call one of the other
thread.* built-ins defined below.
๐งต canon thread.resume-later
For a canonical definition:
(canon thread.resume-later (core func $resume-later))
validation specifies:
$resume-lateris given type(func (param $i i32))
Calling $resume-later invokes the following function which loads a thread at
index $i from the current component instance's threads table, traps if it's
not suspended, and then marks that thread as ready to run at some
nondeterministic point in the future chosen by the embedder.
def canon_thread_resume_later(i):
inst = current_instance()
trap_if(not inst.may_leave)
other_thread = inst.threads.get(i)
trap_if(not other_thread.suspended())
other_thread.resume_later()
return []
thread.resume-later never suspends the current thread and so there is no
possibility of cancellation and thus no cancellable immediate.
๐งต canon thread.suspend
For a canonical definition:
(canon thread.suspend $cancellable? (core func $suspend))
validation specifies:
$suspendis given type(func (result i32))
Calling $suspend invokes the following function which suspends the current
thread, immediately returning control flow to any transitive async-lowered
calling component.
def canon_thread_suspend(cancellable):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
cancelled = thread.suspend(cancellable)
return [cancelled]
If cancellable is set, then thread.suspend will return a Cancelled
value to indicate whether the supertask has already or concurrently requested
cancellation. thread.suspend (and other cancellable operations) will only
indicate cancellation once and thus, if a caller is not prepared to propagate
cancellation, they can omit cancellable so that cancellation is instead
delivered at a later cancellable call.
๐งต canon thread.yield
For a canonical definition:
(canon thread.yield $cancellable? (core func $yield))
validation specifies:
$yieldis given type(func (result i32))
Calling $yield invokes the following function which yields execution so that
others threads can execute, leaving the current thread ready to run at some
nondeterministic point in the future chosen by the embedder. This allows a
long-running computation that is not otherwise performing I/O to avoid starving
other threads in a cooperative setting.
def canon_thread_yield(cancellable):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
cancelled = thread.yield_(cancellable)
return [cancelled]
If cancellable is set, then thread.yield will return a Cancelled
value indicating whether the supertask has already or concurrently requested
cancellation. thread.yield (and other cancellable operations) will only
indicate cancellation once and thus, if a caller is not prepared to propagate
cancellation, they can omit cancellable so that cancellation is instead
delivered at a later cancellable call.
๐งต canon thread.suspend-then-resume
For a canonical definition:
(canon thread.suspend-then-resume $cancellable? (core func $suspend-then-resume))
validation specifies:
$suspend-then-resumeis given type(func (param $i i32) (result i32))
Calling $suspend-then-resume invokes the following function which loads a
thread at index $i from the current component instance's threads table,
traps if it's not suspended, and then switches to that thread, leaving the
current thread suspended.
def canon_thread_suspend_then_resume(cancellable, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
other_thread = thread.task.inst.threads.get(i)
trap_if(not other_thread.suspended())
cancelled = thread.suspend_then_resume(cancellable, other_thread)
return [cancelled]
If cancellable is set, then thread.suspend-then-resume will return a
Cancelled value to indicate whether the supertask has already or concurrently
requested cancellation. thread.suspend-then-resume (and other cancellable
operations) will only indicate cancellation once and thus, if a caller is not
prepared to propagate cancellation, they can omit cancellable so that
cancellation is instead delivered at a later cancellable call.
๐งต canon thread.yield-then-resume
For a canonical definition:
(canon thread.yield-then-resume $cancellable? (core func $yield-then-resume))
validation specifies:
$yield-then-resumeis given type(func (param $i i32) (result i32))
Calling $yield-then-resume invokes the following function which loads a thread
at index $i from the current component instance's threads table, traps if
it's not suspended, and then switches to that thread, leaving the current
thread ready to run at some nondeterministic point in the future chosen by the
embedder.
def canon_thread_yield_then_resume(cancellable, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
other_thread = thread.task.inst.threads.get(i)
trap_if(not other_thread.suspended())
cancelled = thread.yield_then_resume(cancellable, other_thread)
return [cancelled]
If cancellable is set, then thread.yield-then-resume will return a
Cancelled value indicating whether the supertask has already or concurrently
requested cancellation. thread.yield-then-resume (and other cancellable
operations) will only indicate cancellation once and thus, if a caller is not
prepared to propagate cancellation, they can omit cancellable so that
cancellation is instead delivered at a later cancellable call.
๐งต canon thread.suspend-then-promote
For a canonical definition:
(canon thread.suspend-then-promote $cancellable? (core func $suspend-then-promote))
validation specifies:
$suspend-then-promoteis given type(func (param $i i32) (result i32))
Calling $suspend-then-promote invokes the following function which loads a
thread at index $i from the current component instance's threads table and
then calls Thread.suspend_then_resume to resume the other_thread if it's
ready and, in any case, leave the current thread suspended.
def canon_thread_suspend_then_promote(cancellable, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
other_thread = thread.task.inst.threads.get(i)
cancelled = thread.suspend_then_promote(cancellable, other_thread)
return [cancelled]
If cancellable is set, then thread.suspend-then-promote will return a
Cancelled value indicating whether the supertask has already or concurrently
requested cancellation. thread.suspend-then-promote (and other cancellable
operations) will only indicate cancellation once and thus, if a caller is not
prepared to propagate cancellation, they can omit cancellable so that
cancellation is instead delivered at a later cancellable call.
๐งต canon thread.yield-then-promote
For a canonical definition:
(canon thread.yield-then-promote $cancellable? (core func $yield-then-promote))
validation specifies:
$yield-then-promoteis given type(func (param $i i32) (result i32))
Calling $yield-then-promote invokes the following function which loads a
thread at index $i from the current component instance's threads table and
then calls Thread.yield_then_resume to resume the other_thread if it's
ready and, in any case, leave the current thread ready to run at some
nondeterministic point in the future chosen by the embedder.
def canon_thread_yield_then_promote(cancellable, i):
thread = current_thread()
trap_if(not thread.task.inst.may_leave)
other_thread = thread.task.inst.threads.get(i)
cancelled = thread.yield_then_promote(cancellable, other_thread)
return [cancelled]
If cancellable is set, then thread.yield-then-promote will return a
Cancelled value indicating whether the supertask has already or concurrently
requested cancellation. thread.yield-then-promote (and other cancellable
operations) will only indicate cancellation once and thus, if a caller is not
prepared to propagate cancellation, they can omit cancellable so that
cancellation is instead delivered at a later cancellable call.
๐ canon error-context.new
For a canonical definition:
(canon error-context.new $opts (core func $f))
validation specifies:
$fis given type(func (param $ptr T) (param $units T) (result i32))whereTisi32- ๐ -
Tisi32ori64as determined by thememoryfield of$opts
- ๐ -
asyncis not presentmemorymust be present
Calling $f calls the following function which uses the $opts immediate to
(nondeterministically) lift the debug message, create a new ErrorContext
value, store it in the current component instance's handles table and returns
its index.
@dataclass
class ErrorContext:
debug_message: String
def canon_error_context_new(opts, ptr, tagged_code_units):
inst = current_instance()
trap_if(not inst.may_leave)
if DETERMINISTIC_PROFILE or random.randint(0,1):
s = String(('', 'utf8', 0))
else:
cx = LiftLowerContext(opts, inst)
s = load_string_from_range(cx, ptr, tagged_code_units)
s = host_defined_transformation(s)
i = inst.handles.add(ErrorContext(s))
return [i]
Supporting the requirement (introduced in the
explainer) that wasm code does not depend on
the contents of error-context values for behavioral correctness, the debug
message is completely discarded nondeterministically or, in the deterministic
profile, always. Importantly (for performance), when the debug message is
discarded, it is not even lifted and thus the O(N) well-formedness conditions
are not checked. (Note that host_defined_transformation is not defined by the
Canonical ABI and stands for an arbitrary host-defined function.)
๐ canon error-context.debug-message
For a canonical definition:
(canon error-context.debug-message $opts (core func $f))
validation specifies:
$fis given type(func (param i32) (param $ptr T))whereTisi32- ๐ -
Tisi32ori64as determined by the address type ofmemoryfrom$opts
- ๐ -
asyncis not presentmemorymust be presentreallocmust be present
Calling $f calls the following function which uses the $opts immediate to
lowers the ErrorContext's debug message. While producing an error-context
value may nondeterministically discard or transform the debug message, a
single error-context value must return the same debug message from
error-context.debug-message over time.
def canon_error_context_debug_message(opts, i, ptr):
inst = current_instance()
trap_if(not inst.may_leave)
errctx = inst.handles.get(i)
trap_if(not isinstance(errctx, ErrorContext))
cx = LiftLowerContext(opts, inst)
store_string(cx, errctx.debug_message, ptr)
return []
Note that ptr points to a region of memory (8 bytes for memory32, 16 bytes
for memory64) into which will be stored the pointer and length of the debug
string (allocated via opts.realloc).
๐ canon error-context.drop
For a canonical definition:
(canon error-context.drop (core func $f))
validation specifies:
$fis given type(func (param i32))
Calling $f calls the following function, which drops the error context value
at the given index from the current component instance's handles table:
def canon_error_context_drop(i):
inst = current_instance()
trap_if(not inst.may_leave)
errctx = inst.handles.remove(i)
trap_if(not isinstance(errctx, ErrorContext))
return []
๐งตโก canon thread.spawn-ref
For a canonical definition:
(canon thread.spawn-ref shared? $ft (core func $spawn_ref))
validation specifies:
$ftmust refer to the type(shared? (func (param $c T)))whereTisi32- ๐ -
Tmay bei32ori64.
- ๐ -
$spawn_refis given type(shared? (func (param $f (ref null $ft)) (param $c T) (result $e i32)))whereTcomes from$ftas defined above
When the shared immediate is not present, the spawned thread is
cooperative, only switching at specific program points. When the shared
immediate is present, the spawned thread is preemptive and able to execute in
parallel with all other threads.
Note: ideally, a thread could be spawned with arbitrary thread parameters. Currently, that would require additional work in the toolchain to support so, for simplicity, the current proposal simply fixes a single
i32ori64parameter type. However,thread.spawn-refcould be extended to allow arbitrary thread parameters in the future, once it's concretely beneficial to the toolchain. The inclusion of$ftensures backwards compatibility for when arbitrary parameters are allowed.
Calling $spawn_ref invokes the following function which simply fuses the
thread.new-ref and thread.resume-later built-ins, allowing
thread-creation to skip the intermediate "suspended" state transition.
def canon_thread_spawn_ref(shared, ft, f, c):
trap_if(not current_instance().may_leave)
if DETERMINISTIC_PROFILE:
return [0]
[new_thread_index] = canon_thread_new_ref(shared, ft, f, c)
[] = canon_thread_resume_later(shared, new_thread_index)
return [new_thread_index]
Note: canon_thread_new_ref has not yet been defined, but will be added as
part of adding a GC ABI option to the Canonical ABI and would work
like canon_thread_new_indirect minus the table access and type check.
๐งตโก canon thread.spawn-indirect
For a canonical definition:
(canon thread.spawn-indirect shared? $ft $tbl (core func $spawn_indirect))
validation specifies:
$ftmust refer to the type(shared? (func (param $c T)))whereTisi32- ๐ -
Tmay bei32ori64
- ๐ -
$tblmust refer to a shared table whose element type matches(ref null (shared? func))$spawn_indirectis given type(shared? (func (param $i U) (param $c T) (result $e i32)))whereTcomes from$ftas defined above andUisi32- ๐ -
Uisi32ori64as determined by$tbl's address type
- ๐ -
When the shared immediate is not present, the spawned thread is
cooperative, only switching at specific program points. When the shared
immediate is present, the spawned thread is preemptive and able to execute in
parallel with all other threads.
Calling $spawn_indirect invokes the following function which simply fuses
the thread.new-indirect and thread.resume-later built-ins, allowing
thread-creation to skip the intermediate "suspended" state transition.
def canon_thread_spawn_indirect(shared, ft, ftbl: Table[CoreFuncRef], fi, c):
trap_if(not current_instance().may_leave)
if DETERMINISTIC_PROFILE:
return [0]
[new_thread_index] = canon_thread_new_indirect(shared, ft, ftbl, fi, c)
[] = canon_thread_resume_later(shared, new_thread_index)
return [new_thread_index]
Note: canon_thread_new_indirect has not yet been extended to take a
shared parameter, but will be as shared-everything-threads progresses.
๐งตโก canon thread.available-parallelism
For a canonical definition:
(canon thread.available-parallelism shared? (core func $f))
validation specifies:
$fis given type(func shared? (result i32)).
Calling $f returns the number of threads the underlying hardware can be
expected to execute in parallel. This value can be artificially limited by
engine configuration and is not allowed to change over the lifetime of a
component instance.
def canon_thread_available_parallelism():
if DETERMINISTIC_PROFILE:
return [1]
else:
return [NUM_ALLOWED_THREADS]