Watermill SQLite3 Driver Pack

September 22, 2025 ยท View on GitHub

Golang SQLite3 driver pack for https://watermill.io event dispatch framework. Drivers satisfy the following interfaces:

SQLite3 does not support querying FOR UPDATE, which is used for row locking when subscribers in the same consumer group read an event batch in official Watermill SQL PubSub implementations. Current architectural decision is to lock a consumer group offset using unixepoch()+lockTimeout time stamp. While one consumed message is processing per group, the offset lock time is extended by lockTimeout periodically by time.Ticker. If the subscriber is unable to finish the consumer group batch, other subscribers will take over the lock as soon as the grace period runs out. A time lock fulfills the role of a traditional database network timeout that terminates transactions when its client disconnects.

  • Implement SQLite connection back off manager

    A friend recommended implementing a back off manager. I think the SQLite busy_timeout produces a linear back off timeout. When attemping to write a row lock, SQLite will freeze the transaction until the previous one is complete up to the busy_timeout duration. This should prevent unneccessary waits due to polling. Perhaps this does not work like I imagine. Also, the ZombieZen variant uses immediate transactions, which may ignore the busy_timeout. This requires additional investigation before implementing.

    Here is an example attempt: https://github.com/sandpapersoftware/watermillsqlite

    The busy waiting loop, that polls the next batches causes a modification to the database (sets the lock) this causes tools like litestream to write wal files to their replicas every poll interval this makes a restore incredibly slow and causes a constant drain on the cpu.

    I wonder if a rollback in a batch == 0 case, wouldn't be enough to release the lock or you only set the lock, when a batch is > 0? Lock + read operation are in the same transaction; I think I can just cancel the transaction if the batch size is 0 to prevent the write.

    Implementation examples in other libraries to consider:

  • Add clean up routines for removing old messages from topics.

    • wmsqlitemodernc.CleanUpTopics
    • wmsqlitezombiezen.CleanUpTopics
  • ExpiringKeyRepository needs clean up sync test

  • Replace SQL queries with an abstraction adaptor

    Currently, SQL queries are hard-coded into into Publisher and Subscriber. Other implementations provide query adaptors that permit overriding the queries. The author is hesitant to make this change, because it is hard to imagine a use case where this kind of adjustment would be useful. Supporting it seems like over-engineering.
    
    It is possible to override the table structure by manually creating the table and never setting the InitializeSchema constructor option. For rare specialty use cases, it seems cleaner to create a fork and re-run all the tests to make sure that all the SQL changes are viable and add additional tests. It seems that a query adaptor would just get in the way.
    
    The issue is created so that arguments can be made in favor of adding a query adaptor.
    
  • Subscriber with poll interval lower than 10ms locks up; see BenchmarkAll; increasing the batch size also can cause a lock up

  • Three-Dots Labs acceptance requests:

  • Finish time-based lock extension when:

    • sending a message to output channel
    • waiting for message acknowledgement
  • Pass official implementation acceptance tests:

    • ModernC
      • tests.TestPublishSubscribe
      • tests.TestConcurrentSubscribe
      • tests.TestConcurrentSubscribeMultipleTopics
      • tests.TestResendOnError
      • tests.TestNoAck
      • tests.TestContinueAfterSubscribeClose
      • tests.TestConcurrentClose
      • tests.TestContinueAfterErrors
      • tests.TestPublishSubscribeInOrder
      • tests.TestPublisherClose
      • tests.TestTopic
      • tests.TestMessageCtx
      • tests.TestSubscribeCtx
      • tests.TestConsumerGroups
    • ZombieZen (passes simple tests)
      • tests.TestPublishSubscribe
      • tests.TestConcurrentSubscribe
      • tests.TestConcurrentSubscribeMultipleTopics
      • tests.TestResendOnError
      • tests.TestNoAck
      • tests.TestContinueAfterSubscribeClose
      • tests.TestConcurrentClose
      • tests.TestContinueAfterErrors
      • tests.TestPublishSubscribeInOrder
      • tests.TestPublisherClose
      • tests.TestTopic
      • tests.TestMessageCtx
      • tests.TestSubscribeCtx
      • tests.TestConsumerGroups

Vanilla ModernC Driver

Go Reference Go Report Card codecov

go get -u github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc

The ModernC driver is compatible with the Golang standard library SQL package. It works without CGO. Has fewer dependencies than the ZombieZen variant.

import (
	"database/sql"
	"github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc"
	_ "modernc.org/sqlite"
)

db, err := sql.Open("sqlite", ":memory:")
if err != nil {
	panic(err)
}
// limit the number of concurrent connections to one
// this is a limitation of `modernc.org/sqlite` driver
db.SetMaxOpenConns(1)
defer db.Close()

pub, err := wmsqlitemodernc.NewPublisher(db, wmsqlitemodernc.PublisherOptions{
	InitializeSchema: true, // create tables for used topics
})
if err != nil {
	panic(err)
}
sub, err := wmsqlitemodernc.NewSubscriber(db, wmsqlitemodernc.SubscriberOptions{
	InitializeSchema: true, // create tables for used topics
})
if err != nil {
	panic(err)
}
// ... follow guides on <https://watermill.io>

Advanced ZombieZen Driver

Go Reference Go Report Card codecov

go get -u github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen

The ZombieZen driver abandons the standard Golang library SQL conventions in favor of the more orthogonal API and higher performance potential. Under the hood, it uses ModernC SQLite3 implementation and does not need CGO. Advanced SQLite users might prefer this driver.

It is about 9 times faster than the ModernC variant when publishing messages. It is currently more stable due to lower level control. It is faster than even the CGO SQLite variants on standard library interfaces, and with some tuning should become the absolute speed champion of persistent message brokers over time. Tuned SQLite is ~35% faster than the Linux file system.

import "github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen"

// &cache=shared is critical, see: https://github.com/zombiezen/go-sqlite/issues/92#issuecomment-2052330643
connectionDSN := ":memory:")
conn, err := sqlite.OpenConn(connectionDSN)
if err != nil {
	panic(err)
}
defer conn.Close()

pub, err := wmsqlitezombiezen.NewPublisher(conn, wmsqlitezombiezen.PublisherOptions{
	InitializeSchema: true, // create tables for used topics
})
if err != nil {
	panic(err)
}
sub, err := wmsqlitezombiezen.NewSubscriber(connectionDSN, wmsqlitezombiezen.SubscriberOptions{
	InitializeSchema: true, // create tables for used topics
})
if err != nil {
	panic(err)
}
// ... follow guides on <https://watermill.io>

Similar Projects