datafusion-bio-formats
March 15, 2026 · View on GitHub
Apache DataFusion table providers for bioinformatics file formats, enabling SQL queries on genomic data.
Overview
This workspace provides a collection of Rust crates that implement DataFusion TableProvider interfaces for various bioinformatics file formats. Query your genomic data using SQL through Apache DataFusion's powerful query engine.
Crates
| Crate | Description | Predicate Pushdown | Projection Pushdown | Multi-threaded | Write Support | Status |
|---|---|---|---|---|---|---|
| datafusion-bio-format-core | Core utilities and object storage support | N/A | N/A | N/A | N/A | ✅ |
| datafusion-bio-format-fastq | FASTQ sequencing reads | ❌ | ✅ | ✅ (BGZF + uncompressed) | ✅ | ✅ |
| datafusion-bio-format-vcf | VCF genetic variants | ✅ (TBI/CSI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-bam | BAM/SAM sequence alignments | ✅ (BAI/CSI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-bed | BED genomic intervals | ❌ | ❌ | ❌ | ❌ | ✅ |
| datafusion-bio-format-gff | GFF genome annotations | ✅ | ✅ | ✅ (BGZF) | ❌ | ✅ |
| datafusion-bio-format-fasta | FASTA biological sequences | ❌ | ❌ | ❌ | ❌ | ✅ |
| datafusion-bio-format-cram | CRAM compressed alignments | ✅ (CRAI) | ✅ | ✅ (indexed) | ✅ | ✅ |
| datafusion-bio-format-pairs | Pairs (Hi-C) chromatin contacts | ✅ (TBI/px2) | ✅ | ✅ (indexed) | ❌ | ✅ |
| datafusion-bio-format-ensembl-cache | Raw Ensembl VEP cache entities | ✅ (basic chr/start/end) | ✅ | ✅ (streaming) | ❌ | 🚧 |
Features
- High Performance: Index-based random access with balanced partitioning across genomic regions
- Predicate Pushdown: SQL
WHEREclauses on genomic coordinates use BAI/CRAI/TBI indexes to skip irrelevant data - Projection Pushdown:
SELECTcolumn lists are pushed down to the parser — only requested fields are extracted from each record, skipping expensive parsing of unreferenced columns - Cloud Native: Built-in support for GCS, S3, and Azure Blob Storage
- SQL Interface: Query genomic data using familiar SQL syntax
- Constant Memory: Streaming I/O with backpressure keeps memory usage constant regardless of file size
- DataFusion Integration: Seamless integration with Apache DataFusion ecosystem
- Write Support: Export query results to BAM/SAM, CRAM, FASTQ, and VCF files with compression
Installation
Add the crates you need to your Cargo.toml:
[dependencies]
datafusion = "50.3.0"
# Choose the formats you need:
datafusion-bio-format-fastq = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
datafusion-bio-format-vcf = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
datafusion-bio-format-bam = { git = "https://github.com/biodatageeks/datafusion-bio-formats" }
# ... etc
Quick Start
Query a FASTQ file
use datafusion::prelude::*;
use datafusion_bio_format_fastq::FastqTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a FASTQ file as a table
// Parallelism is automatic: BGZF with GZI index and uncompressed files
// are read in parallel partitions; GZIP files are read sequentially.
let table = FastqTableProvider::new("data/sample.fastq.bgz".to_string(), None)?;
ctx.register_table("sequences", Arc::new(table))?;
// Query with SQL
let df = ctx.sql("
SELECT name, sequence, quality_scores
FROM sequences
WHERE LENGTH(sequence) > 100
LIMIT 10
").await?;
df.show().await?;
Ok(())
}
Query a VCF file
use datafusion::prelude::*;
use datafusion_bio_format_vcf::table_provider::VcfTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a VCF file
let table = VcfTableProvider::new(
"data/variants.vcf.gz".to_string(),
Some(vec!["AF".to_string()]), // INFO fields to include
None, // FORMAT fields
None, // object_storage_options
true, // coordinate_system_zero_based
)?;
ctx.register_table("variants", Arc::new(table))?;
// Query variants
let df = ctx.sql("
SELECT chrom, pos, ref, alt, info_af
FROM variants
WHERE chrom = 'chr1' AND info_af > 0.01
").await?;
df.show().await?;
Ok(())
}
Multi-Sample VCF Analytical Queries
For multi-sample VCFs, FORMAT fields are stored in a columnar genotypes: Struct<GT: List<Utf8>, DP: List<Int32>, ...> layout. Register the built-in UDFs to run bcftools-style analytical pipelines in SQL:
use datafusion_bio_format_vcf::register_vcf_udfs;
// Register analytical UDFs: list_avg, list_gte, list_lte, list_and, vcf_set_gts
register_vcf_udfs(&ctx);
-- Filter variants by average genotype quality across all samples
SELECT chrom, start, qual, list_avg(genotypes."GQ") AS avg_gq
FROM variants
WHERE qual >= 20
AND list_avg(genotypes."GQ") >= 15
AND list_avg(genotypes."DP") BETWEEN 15 AND 150;
-- Set genotypes to missing where per-sample DP or GQ are too low
SELECT chrom, start, "ref", alt,
vcf_set_gts(genotypes."GT",
list_and(list_gte(genotypes."GQ", 10),
list_and(list_gte(genotypes."DP", 10), list_lte(genotypes."DP", 200)))
) AS masked_gt
FROM variants;
A companion {table}_long view can be auto-registered for per-sample lookups:
use datafusion_bio_format_vcf::auto_register_vcf_long_view;
auto_register_vcf_long_view(&ctx, "variants").await?;
SELECT sample_id, "GT", "DP" FROM variants_long WHERE sample_id = 'NA12878';
See the VCF crate README for full documentation on the columnar schema, UDF reference, and query patterns.
Query a Pairs (Hi-C) file
use datafusion::prelude::*;
use datafusion_bio_format_pairs::table_provider::PairsTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register a Pairs file (auto-discovers .tbi/.px2 index)
let table = PairsTableProvider::new(
"data/contacts.pairs.gz".to_string(),
None, // object_storage_options
false, // coordinate_system_zero_based (pairs uses 1-based)
)?;
ctx.register_table("contacts", Arc::new(table))?;
// Query Hi-C contacts — chr1 filter uses tabix index,
// chr2 filter is applied as a residual post-read filter
let df = ctx.sql("
SELECT chr1, pos1, chr2, pos2, strand1, strand2
FROM contacts
WHERE chr1 = 'chr1' AND chr2 = 'chr2'
").await?;
df.show().await?;
Ok(())
}
Query from cloud storage
use datafusion_bio_format_core::object_storage::ObjectStorageOptions;
// Works with GCS, S3, Azure
let opts = ObjectStorageOptions { ... };
let table = FastqTableProvider::new(
"gs://my-bucket/sample.fastq.gz".to_string(),
Some(opts),
)?;
Write Support
BAM/SAM, CRAM, FASTQ, and VCF formats support writing DataFusion query results back to files using the INSERT OVERWRITE SQL syntax or the insert_into() API.
Design
The write implementation follows DataFusion's standard patterns:
TableProvider.insert_into()
└── WriteExec (ExecutionPlan consuming RecordBatches)
├── Serializer (Arrow → format conversion)
└── LocalWriter (compression handling)
Key features:
- Compression: Auto-detected from file extension (
.bgz/.bgzf→ BGZF,.gz→ GZIP, else plain) - BGZF default: Block-gzipped format recommended for bioinformatics (allows random access)
- Coordinate conversion: Automatic 0-based ↔ 1-based conversion for VCF files
- Mode: OVERWRITE only (creates/replaces file)
Write a FASTQ file
use datafusion::prelude::*;
use datafusion_bio_format_fastq::FastqTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register source FASTQ
let source = FastqTableProvider::new("input.fastq.gz".to_string(), None)?;
ctx.register_table("source", Arc::new(source))?;
// Register destination FASTQ (compression auto-detected from extension)
let dest = FastqTableProvider::new("output.fastq.bgz".to_string(), None)?;
ctx.register_table("dest", Arc::new(dest))?;
// Filter and write to new file
ctx.sql("
INSERT OVERWRITE dest
SELECT name, description, sequence, quality_scores
FROM source
WHERE LENGTH(sequence) >= 100
").await?.collect().await?;
Ok(())
}
Write a VCF file
use datafusion::prelude::*;
use datafusion_bio_format_vcf::VcfTableProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// Register source VCF with INFO fields
let source = VcfTableProvider::try_new(
"input.vcf.gz",
Some(vec!["AF".to_string(), "DP".to_string()]), // INFO fields
None, // FORMAT fields
true, // coordinate_system_zero_based
).await?;
ctx.register_table("variants", Arc::new(source))?;
// Register destination VCF
let dest = VcfTableProvider::try_new(
"filtered.vcf.bgz",
Some(vec!["AF".to_string(), "DP".to_string()]),
None,
true,
).await?;
ctx.register_table("output", Arc::new(dest))?;
// Filter variants and write
ctx.sql("
INSERT OVERWRITE output
SELECT chrom, start, end, id, ref, alt, qual, filter, AF, DP
FROM variants
WHERE AF > 0.01 AND DP >= 10
").await?.collect().await?;
Ok(())
}
Coordinate System Handling
VCF files use 1-based coordinates, but the table provider can expose them as 0-based half-open intervals (common in bioinformatics tools like BED):
| Setting | Read Behavior | Write Behavior |
|---|---|---|
coordinate_system_zero_based=true | VCF POS 100 → DataFrame start=99 | DataFrame start=99 → VCF POS 100 |
coordinate_system_zero_based=false | VCF POS 100 → DataFrame start=100 | DataFrame start=100 → VCF POS 100 |
The same setting controls both reading and writing, ensuring round-trip consistency.
Compression Options
| Extension | Compression | Notes |
|---|---|---|
.vcf, .fastq | Plain | Uncompressed |
.vcf.gz, .fastq.gz | GZIP | Standard compression |
.vcf.bgz, .fastq.bgz | BGZF | Block-gzipped (recommended) |
.vcf.bgzf, .fastq.bgzf | BGZF | Block-gzipped (recommended) |
VCF Header Metadata Preservation
When reading VCF files, header metadata (field descriptions, types, and numbers) is stored in Arrow field metadata. This enables round-trip read/write operations to preserve the original VCF header information:
// Reading preserves metadata in schema
let provider = VcfTableProvider::new("input.vcf", ...)?;
let schema = provider.schema();
// Field metadata contains original VCF definitions
let dp_field = schema.field_with_name("DP")?;
let metadata = dp_field.metadata();
// metadata["vcf_description"] = "Total read depth"
// metadata["vcf_type"] = "Integer"
// metadata["vcf_number"] = "1"
The writer uses this metadata to reconstruct proper VCF header lines:
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total read depth">
For write-only operations (new output files), use VcfTableProvider::new_for_write() which accepts the schema directly without reading from file.
Index-Based Range Queries
BAM, CRAM, VCF, GFF, and Pairs table providers support index-based predicate pushdown for efficient genomic region queries. When an index file is present alongside the data file, SQL filters on genomic coordinate columns are translated into indexed random access, skipping irrelevant data entirely.
Supported Index Formats
| Data Format | Index Formats | Naming Convention |
|---|---|---|
| BAM | BAI, CSI | sample.bam.bai or sample.bai, sample.bam.csi |
| CRAM | CRAI | sample.cram.crai |
| VCF (bgzf) | TBI, CSI | sample.vcf.gz.tbi, sample.vcf.gz.csi |
| GFF (bgzf) | TBI, CSI | sample.gff.gz.tbi, sample.gff.gz.csi |
| Pairs (bgzf) | TBI, CSI, px2 | sample.pairs.gz.tbi, sample.pairs.gz.px2 |
Index files are auto-discovered — place them alongside the data file and the table provider will find them automatically. No configuration needed.
SQL Query Patterns
All standard genomic filter patterns are supported:
-- Single region query
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start >= 1000000 AND end <= 2000000;
-- Multi-chromosome query (parallel across chromosomes)
SELECT * FROM alignments
WHERE chrom IN ('chr1', 'chr2', 'chr3');
-- Range with BETWEEN
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start BETWEEN 1000000 AND 2000000;
-- Combine genomic region with record-level filters
SELECT * FROM alignments
WHERE chrom = 'chr1' AND start >= 1000000 AND mapping_quality >= 30;
How It Works
┌────────────────────────────────────────┐
│ SQL: WHERE chrom = 'chr1' │
│ AND start >= 1000000 │
│ AND mapping_quality >= 30 │
└──────────┬─────────────────────────────┘
│
┌──────▼──────────────────────┐
│ 1. Extract genomic regions │ chrom/start/end → index query regions
│ 2. Separate residual │ mapping_quality → post-read filter
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 3. Estimate region sizes │ Read index metadata (BAI/CRAI/TBI)
│ from index │ to estimate compressed bytes per region
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 4. Balance partitions │ Greedy bin-packing distributes regions
│ (target_partitions) │ across N balanced partitions
└──────┬──────────────────────┘
│
┌──────▼──────────────────────┐
│ 5. Per-partition (streaming)│
│ For each assigned region:│ Sequential within partition
│ IndexedReader.query() │ Seek directly via BAI/CRAI/TBI
│ → apply residual filter│ mapping_quality >= 30
│ → stream RecordBatches │ via channel(2) backpressure
└─────────────────────────────┘
Partitioning behavior (with contig lengths known):*
| Index Available? | SQL Filters | Partitions |
|---|---|---|
| Yes | chrom = 'chr1' AND start >= 1000 | up to target_partitions (region split into sub-regions) |
| Yes | chrom IN ('chr1', 'chr2') | up to target_partitions (both regions split to fill bins) |
| Yes | mapping_quality >= 30 (no genomic filter) | up to target_partitions (all chroms balanced + split) |
| Yes | None (full scan) | up to target_partitions (all chroms balanced + split) |
| No | Any | 1 (sequential full scan) |
Partitioning behavior (without contig lengths):*
| Index Available? | SQL Filters | Partitions |
|---|---|---|
| Yes | chrom = 'chr1' AND start >= 1000 | 1 (cannot split without contig length) |
| Yes | chrom IN ('chr1', 'chr2') | min(2, target_partitions) |
| Yes | mapping_quality >= 30 (no genomic filter) | min(N chroms, target_partitions) |
| Yes | None (full scan) | min(N chroms, target_partitions) |
| No | Any | 1 (sequential full scan) |
*When contig lengths are known, the balancer splits large regions into sub-regions to fill all target_partitions bins, achieving full parallelism even with few contigs. Without contig lengths, parallelism is capped at the number of queried contigs.
| Format | Contig lengths available? | Single-contig parallelism? |
|---|---|---|
| BAM | Always (from header) | Yes |
| CRAM | Always (from header) | Yes |
| VCF | When header has ##contig=<...,length=N> | Depends on file |
| GFF | Typically not available | No |
| Pairs | When header has #chromsize: lines | Depends on file |
When an index exists, regions are distributed across target_partitions using a greedy bin-packing algorithm that estimates data volume from the index. This ensures balanced work distribution even when chromosomes have vastly different sizes (e.g., chr1 at ~249Mb vs chrY at ~57Mb).
Record-Level Filter Pushdown
Beyond index-based region queries, all formats support record-level predicate evaluation. Filters on columns like mapping_quality, flag, score, or strand are evaluated as each record is read, filtering early before Arrow RecordBatch construction.
This works with or without an index file:
-- No index needed — filters applied per-record during sequential scan
SELECT * FROM alignments WHERE mapping_quality >= 30 AND flag & 4 = 0;
Balanced Partitioning and Streaming I/O
When an index is available, the query engine uses a three-stage execution model:
1. Size Estimation — Each format reads its index to estimate compressed data volume per region:
| Format | Index | Estimation Method |
|---|---|---|
| BAM | BAI | Min/max compressed byte offsets from bin chunks |
| CRAM | CRAI | Sum of slice_length per reference sequence (exact) |
| VCF | TBI | Min/max compressed byte offsets from bin chunks |
| GFF | TBI/CSI | Min/max compressed byte offsets from bin chunks |
| Pairs | TBI | Min/max compressed byte offsets from bin chunks |
2. Balanced Partitioning — A greedy bin-packing algorithm distributes regions across target_partitions bins:
- Regions sorted by estimated size (descending)
- Each region assigned to the bin with the smallest current total
- Large regions (>1.5x ideal share) are split into sub-regions when contig length is known
- Result:
min(target_partitions, num_regions)partitions with roughly equal work
3. Streaming I/O — Each partition processes its assigned regions using constant-memory streaming:
- Dedicated OS thread per partition (not tokio's blocking pool)
mpsc::channel(2)provides backpressure between producer and consumer- At most ~3 RecordBatches in flight per partition (~3-8 MB)
- Total memory: ~3 batches x
target_partitions(constant regardless of file size)
target_partitions = 4
┌──────────────────┐
Partition 0 (thread) │ chr1:1-125M │ ─── sequential within partition
│ chr1:125M-249M │ streaming via channel(2)
├──────────────────┤
Partition 1 (thread) │ chr2 │ ─── concurrent across partitions
│ chr3 │ max 4 threads active
├──────────────────┤
Partition 2 (thread) │ chr4 │
│ chr5, chr6 │
├──────────────────┤
Partition 3 (thread) │ chrX │
│ chrY, chrM │
└──────────────────┘
Configuration:
Control the number of concurrent partitions via DataFusion's SessionConfig:
use datafusion::prelude::*;
let config = SessionConfig::new().with_target_partitions(8); // default varies by system
let ctx = SessionContext::new_with_config(config);
Index File Generation
Create index files using standard bioinformatics tools:
# BAM: sort and index
samtools sort input.bam -o sorted.bam
samtools index sorted.bam # creates sorted.bam.bai
# CRAM: sort and index
samtools sort input.cram -o sorted.cram --reference ref.fa
samtools index sorted.cram # creates sorted.cram.crai
# VCF: sort, compress, and index
bcftools sort input.vcf -Oz -o sorted.vcf.gz
bcftools index -t sorted.vcf.gz # creates sorted.vcf.gz.tbi
# GFF: sort, compress, and index
(grep "^#" input.gff; grep -v "^#" input.gff | sort -k1,1 -k4,4n) | bgzip > sorted.gff.gz
tabix -p gff sorted.gff.gz # creates sorted.gff.gz.tbi
# Pairs: sort, compress, and index (chr1=col2, pos1=col3)
(grep "^#" input.pairs; grep -v "^#" input.pairs | sort -k2,2 -k3,3n) | bgzip > sorted.pairs.gz
tabix -s 2 -b 3 -e 3 sorted.pairs.gz # creates sorted.pairs.gz.tbi
Projection Pushdown
When a query selects only a subset of columns, the projection is pushed down to the record parser so that only the requested fields are extracted from each record. This avoids the cost of parsing, allocating, and converting fields that won't appear in the query output.
How It Works
SQL: SELECT chrom, start, mapping_quality FROM alignments
┌──────────────────────────────────┐
│ DataFusion optimizer │
│ projection = [1, 2, 6] │ Only 3 of 11 columns needed
└──────────┬───────────────────────┘
│
┌──────────▼───────────────────────┐
│ Physical exec (e.g. BamExec) │
│ needs_chrom = true │ Computed from projection indices
│ needs_start = true │
│ needs_mapq = true │
│ needs_name = false ← skipped │ No allocation, no parsing
│ needs_cigar = false ← skipped │
│ needs_seq = false ← skipped │ Expensive fields avoided
│ ... │
└──────────┬───────────────────────┘
│
┌──────────▼───────────────────────┐
│ RecordBatch with 3 columns │ Only projected arrays built
└──────────────────────────────────┘
Per-Format Support
| Format | Level | Skipped When Not Projected |
|---|---|---|
| BAM/SAM | Field-level | All 11 core fields + auxiliary tags |
| CRAM | Field-level | All 11 core fields + auxiliary tags |
| VCF | Field-level | 8 core fields + INFO fields + FORMAT fields |
| GFF | Field-level | 8 core fields + attribute parsing |
| FASTQ | Field-level | All 4 fields (name, description, sequence, quality) |
| BED | Batch-level | Projection applied after parsing |
| Pairs | Batch-level | Projection applied after parsing |
| FASTA | Batch-level | Projection applied after parsing |
Field-level means the parser conditionally skips extraction per record — no allocation, no string conversion, no array construction for unrequested columns. This is especially beneficial for expensive fields like sequence, quality_scores, and cigar.
Batch-level means all fields are still parsed but only the projected columns are included in the output RecordBatch.
Aggregate Query Support
Empty projections (e.g., SELECT COUNT(*) FROM table) are handled correctly — zero-column RecordBatches with accurate row counts are returned, allowing DataFusion to compute aggregates without reading any field data.
Performance Benchmarks
This project includes a comprehensive benchmark framework to track performance across releases and validate optimizations.
Run Benchmarks Locally
# Build the benchmark runner
cargo build --release --package datafusion-bio-benchmarks-runner
# Run GFF benchmarks
./target/release/benchmark-runner benchmarks/configs/gff.yml
See benchmarks/README.md for detailed documentation on running benchmarks and adding new formats.
Development
Build
cargo build --all
Test
cargo test --all
Format
cargo fmt --all
Lint
cargo clippy --all-targets --all-features -- -D warnings
Documentation
cargo doc --no-deps --all-features --open
Requirements
- Rust 1.86.0 or later (specified in
rust-toolchain.toml) - Git (for building crates with git dependencies)
Dependencies
This project uses a forked version of noodles from biodatageeks/noodles for enhanced support of certain file formats (CRAM, FASTA, VCF, GFF).
Related Projects
- polars-bio - Polars extensions for bioinformatics
License
Licensed under Apache-2.0. See LICENSE for details.
Contributing
Contributions are welcome! Please feel free to submit issues or pull requests.
Acknowledgments
This project builds upon:
- Apache DataFusion - Fast, extensible query engine
- noodles - Bioinformatics I/O library
- OpenDAL - Unified data access layer