Transaction Processing in the Data Plane

May 2, 2026 · View on GitHub

We'll write transaction commit logic in a SQL view, which can enable higher throughput than performing transaction commits in the control plane. Incremental view maintenance (IVM) makes the critical path fast enough that the determination of which transactions to commit and which to roll back can be tolerable (and beats collapsing under load). Not only tolerable, but .. interactive timescales (~30ms).

Caveats: This post is largely for educational purposes; please do not actually implement transactions this way without having a hard think about what you need. Also, I'm sure I'm not the only one to think of things this way, and there may already be real products that do this for you by now. Ververica's Streaming Ledger is an example that does something similar, but I don't understand it well enough to say. Let me know if you know better, or of other approaches!

Database Transactions

Databases are built on the back of "transactions": bundles of commands that need to appear to happen at a single moment in time, or not at all. Transactions can read some data, write some data, write, read, write, maybe do some other things, and then eventually end with either COMMIT or ROLLBACK. At that moment the database needs either to make it real, or to just walk away.

Transactions can be pretty gory to implement because they are generally "interactive": the user doesn't express their full intent ahead of time, as they often do in most programming languages, but instead reveals it to the database one command at a time. Until the database sees a COMMIT everything is, and fundamentally needs to be, entirely tentative. The next command could be a ROLLBACK instead, and then the whole bundle of commands is off.

Because things are so gory, transaction processing often lives in the "control plane" of the database. Some number of transactions are in flight at any moment, and some careful centralized logic needs to keep track of what is real and what is not. This often limits the effective throughput of transaction processing: the number of potentially conflicting transactions that can be resolved per second, say.

At the same time, some classes of transactions are simple enough that we will be able to migrate them from the control plane to the data plane. The control plane often coordinates with shared locks, single threading, and other forms of mutual exclusion. The data plane, by contrast, works by extracting independence from the supplied task (e.g. a SQL query, and the join keys therein). The data plane is generally much more scalable than the control plane, providing a higher throughput, but with an expressivity tax imposed by not using the control plane.

A Running Example: Banks and Stuff

A classic example of non-trivial transaction processing derives from accounts containing money.

Imagine a database of users with accounts, who would like to redistribute wealth among themselves. The participants are mercurial, and don't want to reveal their plans ahead of time. They will only indicate the intended transfers when presented with the current balances of various accounts. Likewise, the transfers should only occur if they can occur just so, exactly as articulated by the participant. A transaction identifies a few accounts and their observed balances, and indicates the intended new balances each account should have (perhaps subject to constraints like having the same total).

Imagine you get millions of these transactions a second, each reading from and writing to sets of accounts that are referenced in multiple transactions. As you might imagine, you have something of a mess in front of you. You could start pulling candidate transactions off a queue and see which should commit and which should not, but you'll quickly find what you have is a data processing problem. The volume of transactions, their potential conflicts, all need more computation that we hope to introduce to an otherwise nimble control plane.

At the expense of some flexibility, this task can be broken apart into two problems:

  1. resolve the order in which transactions would commit, and then
  2. determine which of these ordered transactions should in fact commit.

The first problem can be addressed somewhat locally (epochs plus a transaction uuid). The second problem can be addressed scalably in the data plane, and is what we'll look at next!

Read Sets and Write Sets

Many transactions can be summed up by their read sets and write sets. Read sets are the values that the transaction read (or failed to read, if absent) Write sets are the values that it then chose to write conditional on those reads. If the read values are as observed the transaction should commit. If the read values are no longer as observed the transaction must roll back.

Let's look at the problem of resolving a collection of pairs of read and write sets. "Resolving" means picking out a subset of pairs with the property that when put in some order, the read set of each matches the most recent prior values. This is a limited way to approach transaction processing: great for our account example, but less good when your transaction needs to read a hash of the whole database, or other horrible things.

Let's model intended transactions, those that end with a COMMIT, with a table that records for any transaction identifier its read and write sets.

-- A transaction consists of values read and values to write.
CREATE TABLE intents (id INT, is_read BOOL, key TEXT, val TEXT);

We'll use the transaction id to order transactions, and we'll want this to generally increase. If it helps, think of it as a pair (epoch, UUID) where epoch continually increments as time passes. We'll see how this epoch requirement eventually informs the latency of transaction resolution.

We'll use "keys" and "values" to describe where and what was read. You can imagine this as a key-value store, or relations where there exists a primary key. The ideas generalize to keyless collections, which are fundamentally key-value, where the key is the whole row and the value is its multiplicity.

A set of rows all with the same id indicate that you'd very much like to have your values committed, as long as the reads still hold.

Committed .. but to Where?

Our committed state will just be a view over intents.

For any collection of intended writes, we'll write some SQL that from intents picks winners and losers, and the winners are the transactions that "commit". We don't actually have to put those winners in a separate location, though it will make sense for us to build an index over them, as this is what most folks want to read.

Importantly, intents can change.

Changes to intents have the (intended) ability to also change our view of what has committed, and we'll need to take some care here. The read and write sets come from transactions that ended with a COMMIT, and ideally the sets for each transaction id should not themselves change once written. However, new transactions can still be added, and we need to be mindful of the identifiers they are added with. We will end up being certain about a transaction only once we are sure that all future transactions will use a higher identifier, which is something we can force with, for example, epochs.

We will be able to tidy intents, asynchronously, so that it doesn't grow without bound. We can remove failed transactions, remove the read sets of committed transactions, and remove writes that are themselves overwritten without being observed (e.g. after we remove all those read sets). Importantly, this work can be asynchronous, after the fact, rather than on the critical path of transaction resolution.

We'll end up with an indexed view over the most recently written values, to present as if we just had the data in an index. But, we'll have described the logic to derive it from intents declaratively (using SQL) rather than imperatively (using whatever your database is written in).

Resolving Transactions

Each transaction can commit if each of its read values match the previously committed write to those keys. Let's just write that logic in SQL! How would we do that?

It's not easy in vanilla SQL, at least not for me. It is pretty easy when you use recursive SQL. Brief, at least, if "easy" doesn't sound likely to you.

Informally, we'll iteratively develop, from initially empty sets:

  1. tentative writes (as a function of tentative commits),
  2. tentative reads (as a function of those tentative writes), and
  3. tentative commits (as a function of the tentative reads).

From tentative writes, reads, and commits, we'll return to re-evaluate the writes, then reads, then commits. If they change we'll go around again, and again, and again until they stop changing. Wait for a moment for the argument that it always terminates.

It turns out we'll be better served by developing transactions that do not commit, those that must be rolled back. This starts from the optimistic take that everything should commit until we have evidence that it shouldn't.

The following SQL does exactly that:

CREATE VIEW to_rollback AS
WITH MUTUALLY RECURSIVE

    -- Tentative writes as a function of `rollback`.
    writes(id INT, key TEXT, val TEXT) AS (
        SELECT intents.id, key, val
        FROM intents
        WHERE intents.id NOT IN (SELECT * FROM rollback)
          AND NOT intents.is_read
    ),

    -- Reads, and the corresponding read value.
    -- These may not match, which informs `rollback`.
    reads(id INT, val TEXT, red TEXT) AS (
        SELECT id, val, (
            SELECT DISTINCT ON (key) val
            FROM writes
            WHERE writes.id < intents.id
              AND writes.key = intents.key
            ORDER BY key, writes.id DESC, val
        )
        FROM intents
        WHERE intents.is_read
    ),

    -- Roll back transactions with a failed read.
    rollback(id INT) AS (
        SELECT DISTINCT reads.id
        FROM reads
        WHERE reads.val IS DISTINCT FROM reads.red
    )

SELECT * FROM rollback;

If WITH MUTUALLY RECURSIVE (WMR) is scary jargon, it may help to talk through how to determine what it will produce as output. Each WMR block starts with all terms in scope (writes, reads, and rollback), and initially empty. It then repeatedly updates the contents of these collections by applying the stated rules, in the order stated, until their contents stop changing. It then runs that final SELECT block at the bottom, because all SQL fragments must return one collection.

In our case we start with initially empty collections, and in particular an initially empty rollback. Based on this, we'll initially imagine all writes succeed, all reads read the previously written values, and transactions must roll back if those values don't match the reads. If any transactions must roll back, we'll need to repeat the process, as some writes may not happen now, which may change reads (positively or negatively), then rollback, etc.

Does this ever stop? Indeed it must, but let's explain that in a different subsection.

Convergence and Termination

The above logic always terminates, and the rate of convergence can be (but is not always) very fast.

Termination is probably easiest to grok, so let's start there.

The reads of a transaction, and the decision to roll it back, depend only on strictly prior transactions. If in some iteration the smallest identifier to change in rollback was i, then in the next iteration all identifiers up through and including i will not change in rollback. Each round of iteration makes permanent progress in stabilizing rollback, increasing the least identifier that can change by at least one.

It could take as many rounds of iteration as there are distinct identifiers in intents, and we can contrive inputs that will do this, but it will certainly terminate.

Convergence is more nuanced, but follows similar principles.

A transaction can only be influenced by transactions with lower identifiers that intend writes for its read set. Imagine that transaction identifiers are randomly assigned (perhaps (epoch, uuid) rather than int). If you are a transaction, and there are k other transactions that mean to write to your read set, there is a one out of k + 1 chance that you have the lowest identifier. If you have the lowest identifier you can commit, immediately and permanently! Not just you, but all transactions that have the lowest identifier among their potential conflicts. Potentially a linear fraction of transactions in the first iteration alone.

One way to imagine this is as a directed graph on nodes that correspond to transactions, with directed edges from one to another if the one would write at the read set of the other. With random identifiers sprinkled atop the nodes, the number of rounds of iterations is bounded by the longest directed path with increasing node identifiers. Informally, it can hard to find long increasing paths, because as identifiers increase it is less likely a neighbor will have a larger identifier. If the graph is dense (mostly conflicts) it will happen, but if it is sparse (limited conflicts) it is substantially less likely.

20-year-ago me, who then was facile with random graph theory, would be mortified to read the utter helplessness of the preceding paragraph. Let's leave it with the tl;dr that while there can be many iterations, there don't need to be many iterations.

Asynchronous Maintenance

Let's talk through some of the maintenance we can perform on intents as we go, to avoid unbounded data growth and unbounded work to evaluate the to_rollback view.

Before we go too far, recall that the view shows us what is true at a moment in time, but with intents continually changing we need a way to lock down some of the transaction identifiers. One way to do this is to insist that transaction identifiers only increase, and reject insertions into intents if they use an identifier not strictly greater than the maximum contained therein. Another option is to pair epochs and UUIDs as transaction identifiers, and to advance epochs whenever it feels good to resolve more transactions. We will just use a comment to remind you that it needs to be done, and won't otherwise discuss it.

One of the goals of the maintenance commands that follow is that they can be run (or not run) at any moment. We don't need to lock down the whole system to do this maintenance.

Removing failed transactions

Transactions that must be rolled back can simply be removed from intents. They should have no impact on the result of the to_rollback view.

-- Caveat by our bounds on certain ids.
DELETE FROM intents
WHERE intents.id IN (SELECT * FROM to_rollback);

Removing read sets of committed transactions

A transaction is conditional on its read set lining up with the values present. Once we know a transaction will commit, we can remove its read set and commit it unconditionally.

-- Caveat by our bounds on certain ids.
DELETE FROM intents
WHERE intents.id NOT IN (SELECT * FROM to_rollback)
  AND intents.is_read;

Remove overwritten writes

The two rules above remove the read sets for failed and committed transactions. It's now possible, likely even, that some writes are no longer observed: no read occurs before a subsequent write occurs. We can remove those dead writes.

-- Caveat by our bounds on certain ids.
DELETE FROM intents
WHERE NOT intents.is_read
  AND EXISTS (SELECT FROM intents i2
              WHERE i2.key = intents.key
                AND i2.id > intents.id
                AND NOT i2.is_read
                AND NOT i2.id IN (SELECT id FROM to_rollback))
  AND NOT EXISTS (SELECT FROM intents i2
                  WHERE i2.key = intents.key
                    AND i2.id > intents.id
                    AND i2.is_read);

This is a simplification: remove any write followed by another write, if there are no reads of that key at all. As we are also removing reads above, this should be able to remove all overwritten writes.

Implementation and Evaluation

We have a table intents, a view to_rollback, and a few maintenance DDL commands. Let's take them out for an introductory spin, and then see if we can make them perform.

I have some example transactions from Claude, which we'll insert to start.

INSERT INTO intents (id, is_read, key, val) VALUES
    -- Txn 1: blind initialization of alice. No read.
    (1, false, 'alice', '100'),

    -- Txn 2: initialize bob, asserting bob was previously absent.
    (2, true,  'bob',   NULL),
    (2, false, 'bob',   '50'),

    -- Txn 3: transfer 30 alice->bob, observing alice=100, bob=50. Commits.
    (3, true,  'alice', '100'),
    (3, true,  'bob',   '50'),
    (3, false, 'alice', '70'),
    (3, false, 'bob',   '80'),

    -- Txn 4: concurrent transfer using stale view (alice=100). Must rollback.
    (4, true,  'alice', '100'),
    (4, true,  'bob',   '50'),
    (4, false, 'alice', '60'),
    (4, false, 'bob',   '90'),

    -- Txn 5: transfer based on post-3 state. Commits.
    (5, true,  'alice', '70'),
    (5, true,  'bob',   '80'),
    (5, false, 'alice', '50'),
    (5, false, 'bob',   '100'),

    -- Txn 6: initialize x=A. Commits.
    (6, true,  'x', NULL),
    (6, false, 'x', 'A'),

    -- Txn 7: x A->B. Commits in iter 1.
    (7, true,  'x', 'A'),
    (7, false, 'x', 'B'),

    -- Txn 8: stale read of x=A. Must rollback (writes B already from 7).
    (8, true,  'x', 'A'),
    (8, false, 'x', 'C'),

    -- Txn 9: reads x=C. Only true if 8 commits — it doesn't, so 9 rolls back.
    -- Iter 1: 8's tentative write makes 9 look fine. Iter 2: 8 in rollback,
    -- 9's read no longer matches, 9 rolls back. Iter 3: stable.
    (9, true,  'x', 'C'),
    (9, false, 'x', 'D');

If we select from to_rollback we should see the following:

materialize=> SELECT * FROM to_rollback ORDER BY id;
 id
----
  4
  8
  9
(3 rows)

It would be great to see the current values for each key, so let's create a view that does this.

-- The most recent (by id) write that is not rolled back.
CREATE VIEW store AS
SELECT DISTINCT ON (key) key, val
FROM intents
WHERE NOT is_read
  AND id NOT IN (SELECT id FROM to_rollback)
ORDER BY key, id DESC;

If we read from store we should see

materialize=> SELECT * FROM store;
  key  | val
-------+-----
 x     | B
 bob   | 100
 alice | 50
(3 rows)

materialize=>

Maintenance

Let's run each of the three async maintenance tasks. We'll select from intents before and after each, to see what has changed. To start, we have

materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
 id | is_read |  key  | val
----+---------+-------+-----
  1 | f       | alice | 100
  2 | t       | bob   |
  2 | f       | bob   | 50
  3 | t       | alice | 100
  3 | t       | bob   | 50
  3 | f       | alice | 70
  3 | f       | bob   | 80
  4 | t       | alice | 100
  4 | t       | bob   | 50
  4 | f       | alice | 60
  4 | f       | bob   | 90
  5 | t       | alice | 70
  5 | t       | bob   | 80
  5 | f       | alice | 50
  5 | f       | bob   | 100
  6 | t       | x     |
  6 | f       | x     | A
  7 | t       | x     | A
  7 | f       | x     | B
  8 | t       | x     | A
  8 | f       | x     | C
  9 | t       | x     | C
  9 | f       | x     | D
(23 rows)

materialize=>

If we remove failed transactions, we prune the eight entries corresponding to ids 4, 8, and 9.

materialize=> DELETE FROM intents WHERE id IN (SELECT * FROM to_rollback);
DELETE 8
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
 id | is_read |  key  | val
----+---------+-------+-----
  1 | f       | alice | 100
  2 | t       | bob   |
  2 | f       | bob   | 50
  3 | t       | alice | 100
  3 | t       | bob   | 50
  3 | f       | alice | 70
  3 | f       | bob   | 80
  5 | t       | alice | 70
  5 | t       | bob   | 80
  5 | f       | alice | 50
  5 | f       | bob   | 100
  6 | t       | x     |
  6 | f       | x     | A
  7 | t       | x     | A
  7 | f       | x     | B
(15 rows)

materialize=>

Next we'll remove the read sets of committed transactions.

materialize=> DELETE FROM intents
WHERE intents.id NOT IN (SELECT * FROM to_rollback)
  AND intents.is_read;
DELETE 7
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
 id | is_read |  key  | val
----+---------+-------+-----
  1 | f       | alice | 100
  2 | f       | bob   | 50
  3 | f       | alice | 70
  3 | f       | bob   | 80
  5 | f       | alice | 50
  5 | f       | bob   | 100
  6 | f       | x     | A
  7 | f       | x     | B
(8 rows)

materialize=>

Finally, we'll remove writes that are not read.

materialize=> DELETE FROM intents
WHERE NOT intents.is_read
  AND EXISTS (SELECT FROM intents i2
              WHERE i2.key = intents.key
                AND i2.id > intents.id
                AND NOT i2.is_read
                AND NOT i2.id IN (SELECT id FROM to_rollback))
  AND NOT EXISTS (SELECT FROM intents i2
                  WHERE i2.key = intents.key
                    AND i2.id > intents.id
                    AND i2.is_read);
DELETE 5
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
 id | is_read |  key  | val
----+---------+-------+-----
  5 | f       | alice | 50
  5 | f       | bob   | 100
  7 | f       | x     | B
(3 rows)

materialize=>

We can see that intents now contains the same rows as in store.

Scaling up

These views and DDL are fine, but they are evaluated from scratch when invoked. There is nothing wrong with this, and the above should work on most bog-standard SQL systems. It may take some time to determine the current contents of store for example, especially as intents grows in size.

Let's add quite a lot of data to make that point.

We'll load up 10,000 transactions, each with two reads and two writes, randomly picked from 10,000 locations. This should give us a solid rate of conflict, by the pigeonhole principle. With Claude's help, this was done via:

INSERT INTO intents (id, is_read, key, val)
  WITH ops AS (
    SELECT
      g AS id,
      'k' || (seahash((g::text || ':rk1')::bytea) % 10000) AS rk1,
      'k' || (seahash((g::text || ':rk2')::bytea) % 10000) AS rk2,
      'k' || (seahash((g::text || ':wk1')::bytea) % 10000) AS wk1,
      'k' || (seahash((g::text || ':wk2')::bytea) % 10000) AS wk2
    FROM generate_series(1, 10000) AS g
  )
  SELECT id, true,  rk1, (SELECT val FROM store WHERE key = ops.rk1) FROM ops
  UNION ALL
  SELECT id, true,  rk2, (SELECT val FROM store WHERE key = ops.rk2) FROM ops
  UNION ALL
  SELECT id, false, wk1, id::text || ':' || wk1 FROM ops
  UNION ALL
  SELECT id, false, wk2, id::text || ':' || wk2 FROM ops;

We can now check out the contents of intents, and also to_rollback and store.

materialize=> SELECT count(*) FROM intents;
 count
-------
 40000
(1 row)

Time: 60.834 ms
materialize=> SELECT count(*) FROM to_rollback;
 count
-------
  5988
(1 row)

Time: 27634.122 ms (00:27.634)
materialize=> SELECT count(*) FROM store;
 count
-------
  5591
(1 row)

Time: 42650.55 ms (00:42.650)

Sure takes a lot of time.

Selecting directly out of store with key or value filters doesn't go any faster. This will not be good enough.

Incremental View Maintenance

Materialize allows you to create indexes on arbitrary views, at which point it will compute and then continually maintain the results as the data change. We can build an index on store to provide continual interactive access to the currently present keys and their values.

materialize=> CREATE DEFAULT INDEX ON store;
CREATE INDEX
Time: 164.292 ms
materialize=> SELECT COUNT(*) FROM store;
 count
-------
  5591
(1 row)

Time: 39778.886 ms (00:39.779)
materialize=> SELECT COUNT(*) FROM store;
 count
-------
  5591
(1 row)

Time: 36.306 ms
materialize=> SELECT COUNT(*) FROM store;
 count
-------
  5591
(1 row)

Time: 32.297 ms
materialize=>

The index is "created" immediately, but only comes on line after roughly the same time it takes to compute the result from scratch. Once computed, it stays up and running, and provides interactive access.

materialize=> SELECT val FROM store WHERE key = 'k1234';
    val
-----------
 943:k1234
(1 row)

Time: 41.390 ms
materialize=> SELECT val FROM store WHERE key = 'k5678';
    val
------------
 3841:k5678
(1 row)

Time: 20.515 ms
materialize=>

This is now much closer to "interactive" access than previously. The times drop to ~15ms with serializable isolation (Materialize defaults to strict serializability), which is about the time from NYC to us-east-1 and back again.

Optimization

We'll need both to_rollback and store, and the latter depends on the former. We'll actually want to clean up the index on store, and instead build an index on to_rollback first, so that our index on store can simply work off of its contents. Although there are two dataflows independently maintaining these indexes, Materialize's serializability means their contents will always appear in sync.

Having done that:

materialize=> SELECT COUNT(*) FROM intents;
 count
-------
 40000
(1 row)

Time: 98.058 ms
materialize=> SELECT COUNT(*) FROM to_rollback;
 count
-------
  5988
(1 row)

Time: 30.412 ms
materialize=> SELECT COUNT(*) FROM store;
 count
-------
  5591
(1 row)

Time: 29.841 ms
materialize=>

But we aren't done yet. We have these maintenance tasks as well, and they could use some help. Let's exercise each of them, counting their sets, rather than performing the DELETE yet.

materialize=>
    SELECT count(*)
    FROM intents
    WHERE id IN (SELECT * FROM to_rollback);
 count
-------
 23952
(1 row)

Time: 73.231 ms
materialize=>

That first one isn't so bad.

materialize=>
    SELECT count(*)
    FROM intents
    WHERE id NOT IN (SELECT * FROM to_rollback) AND is_read;
 count
-------
  8024
(1 row)

Time: 14183.270 ms (00:14.183)
materialize=>

The second one is much less interactive.

materialize=>
   SELECT count(*) FROM intents
   WHERE NOT is_read
     AND EXISTS (SELECT FROM intents i2
                 WHERE i2.key = intents.key AND i2.id > intents.id
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
     AND NOT EXISTS (SELECT FROM intents i2
                     WHERE i2.key = intents.key AND i2.id > intents.id
                       AND i2.is_read);
 count
-------
  1323
(1 row)

Time: 34746.069 ms (00:34.746)
materialize=>

That third one is pretty brutal.

Let's improve each of these.

Further Optimization; Task 1

The first query is already pretty fast, but if we look at the logic we run, we can see that it could be faster.

materialize=> explain SELECT count(*)
    FROM intents
    WHERE id IN (SELECT * FROM to_rollback);
Physical Plan
Explained Query:
  →With
    cte l0 =
      →Accumulable GroupAggregate
        Simple aggregates: count(*)
        →Differential Join %1 » %0
          Join stage 0 in %0 with lookup key #0
          →Arrange (#0)
            →Read materialize.transactions.intents
          →Arranged materialize.transactions.to_rollback
  →Return
    →Union
      →Unarranged Raw Stream
        →Arranged l0
      →Map/Filter/Project
        Project: #0
        Map: 0
          →Consolidating Union
            →Negate Diffs
              →Fused with Child Map/Filter/Project
                Project: ()
                  →Arranged l0
                    Key: ()
            →Constant (1 row)

Source materialize.transactions.intents
  project=(#0)
  filter=((#0{id}) IS NOT NULL)
  pushdown=((#0{id}) IS NOT NULL)

Used Indexes:
  - materialize.transactions.to_rollback_primary_idx (differential join)

Target cluster: default

(1 row)
Time: 36.076 ms
materialize=>

The tell here is

          →Arrange (#0)
            →Read materialize.transactions.intents

which says that we are reading intents and building an index over it. We could instead pre-form an index on intents, by id.

materialize=> CREATE INDEX intents_idx_id ON intents (id);
CREATE INDEX
Time: 99.994 ms
materialize=>

Re-running the EXPLAIN command reveals that it uses indexes for both inputs. The count now comes back in half the time, which wasn't that long to begin with.

Further Optimization; Task 2

The second maintenance task is slow because of a Materialize planning defect. The antijoin we've written, reads for transactions whose id is not in to_rollback, should be implemented with an inner join between intents and to_rollback. Because .. SQL .. the query also keeps null ids, and Materialize trips over the hidden OR in the inner join's predicate.

We can fix this with a different antijoin idiom: NOT EXISTS.

materialize=>
    SELECT count(*)
    FROM intents i
    WHERE NOT EXISTS (
      SELECT FROM to_rollback tr
      WHERE i.id = tr.id
    ) AND is_read;
 count
-------
  8024
(1 row)

Time: 58.552 ms
materialize=>

An EXPLAIN on this query confirms that it uses the pre-existing indexes on intents and to_rollback.

Further Optimization; Task 3

The third query is pretty complicated. I won't print the EXPLAIN output here, as it is two screenfuls long. Instead we'll do a clever trick that Materialize makes easy.

materialize=> CREATE VIEW dead_writes AS
   SELECT * FROM intents
   WHERE NOT is_read
     AND EXISTS (SELECT FROM intents i2
                 WHERE i2.key = intents.key AND i2.id > intents.id
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
     AND NOT EXISTS (SELECT FROM intents i2
                     WHERE i2.key = intents.key AND i2.id > intents.id
                       AND i2.is_read);
CREATE VIEW
Time: 143.153 ms
materialize=> CREATE DEFAULT INDEX ON dead_writes;
CREATE INDEX
Time: 106.631 ms
materialize=>

We just bind the logic to a named view, and create an index on it. Now the results are immediately available.

materialize=> SELECT COUNT(*) FROM dead_writes;
 count
-------
  1323
(1 row)
Time: 27.275 ms
materialize=>

In fact, we can do the same with the two other maintenance tasks, which is just good hygiene. Each of the maintenance tasks have a maintained index, that always contains the rows of intents that we can discard.


One meaningful change is that we'll need to alter our DELETE statements. We'll need to refer to dead_writes, and the other named views, like so:

DELETE FROM intents
WHERE intents IN (SELECT dead_writes FROM dead_writes);

This plans the same way as

SELECT * FROM intents
WHERE intents IN (SELECT dead_writes FROM dead_writes);

and EXPLAIN shows us that we do not have the right indexes yet.

materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
Physical Plan
Explained Query:
  →Differential Join %0 » %1
    Join stage 0 in %1 with lookup key #0..=#3
    →Arrange (#0..=#3)
      →Fused with Child Map/Filter/Project
        Filter: (#0) IS NOT NULL AND (#1) IS NOT NULL AND (#2) IS NOT NULL AND (#3) IS NOT NULL
          →Arranged materialize.transactions.intents
            Key: (#0{id})
    →Distinct GroupAggregate
      →Fused with Child Map/Filter/Project
        Filter: (#3{val}) IS NOT NULL
          →Arranged materialize.transactions.dead_writes
            Key: (#0{id}..=#3{val})

Used Indexes:
  - materialize.transactions.intents_idx_id (*** full scan ***)
  - materialize.transactions.dead_writes_primary_idx (*** full scan ***)

Target cluster: default

(1 row)
Time: 35.272 ms
materialize=>

Although we are using indexes on intents and dead_writes, we are scanning their contents. If intents is large and dead_writes is empty, we'll do a lot of work to determine this. We have indexes, but they are not the right indexes.

To get the right indexes, we'll want to make two changes:

  1. Add an index on intents by all columns, and
  2. Modify dead_writes to contain distinct records.

With these two changes, we'll use an index for intents, and be able to remove the Distinct around dead_writes.

The first step reveals progress:

materialize=> create default index on intents;
CREATE INDEX
Time: 81.806 ms
materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
                              Physical Plan
--------------------------------------------------------------------------
 Explained Query:                                                        +
   →Differential Join %1 » %0                                            +
     Join stage 0 in %0 with lookup key #0{id}..=#3{val}                 +
     →Arranged materialize.transactions.intents                          +
     →Distinct GroupAggregate                                            +
       →Fused with Child Map/Filter/Project                              +
         Filter: (#3{val}) IS NOT NULL                                   +
           →Arranged materialize.transactions.dead_writes                +
             Key: (#0{id}..=#3{val})                                     +
                                                                         +
 Used Indexes:                                                           +
   - materialize.transactions.dead_writes_primary_idx (*** full scan ***)+
   - materialize.transactions.intents_primary_idx (differential join)    +
                                                                         +
 Target cluster: default                                                 +

(1 row)

Time: 32.926 ms
materialize=>

This already reduces the cost to being proportional to the size of dead_writes, indepentent of intents. The other improvement comes from

CREATE VIEW dead_writes AS
   SELECT DISTINCT * FROM intents
   WHERE NOT is_read
     AND EXISTS (SELECT FROM intents i2
                 WHERE i2.key = intents.key AND i2.id > intents.id
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
     AND NOT EXISTS (SELECT FROM intents i2
                     WHERE i2.key = intents.key AND i2.id > intents.id
                       AND i2.is_read);

-- Explicitly name all columns, as DEFAULT index skips constant is_read.
CREATE INDEX dead_writes_idx_all ON dead_writes (id, is_read, key, val);

Notice that we need to explicitly name the index columns. Materialize's CREATE DEFAULT INDEX uses the narrowest primary keys it can find, and is_read is the constant FALSE for dead_writes.

The final plan ends up being:

materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
                            Physical Plan
----------------------------------------------------------------------
 Explained Query:                                                    +
   →Differential Join %1 » %0                                        +
     Join stage 0 in %0 with lookup key #0{id}..=#3{val}             +
       filter=((#3) IS NOT NULL)                                     +
     →Arranged materialize.transactions.intents                      +
     →Arranged materialize.transactions.dead_writes                  +
                                                                     +
 Used Indexes:                                                       +
   - materialize.transactions.intents_primary_idx (differential join)+
   - materialize.transactions.dead_writes_idx_all (differential join)+
                                                                     +
 Target cluster: default                                             +

(1 row)

Time: 35.371 ms
materialize=>

Both inputs are now used in indexed form, and the query should immediately spill out the rows to delete from intents.

materialize=> SELECT COUNT(*) FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
 count
-------
  1323
(1 row)

Time: 38.828 ms
materialize=>

Conclusions