GitHub - logannye/emsqrt: Process any data size with a fixed, small memory footprint. EM-√ is an external-memory ETL/log processing engine with hard peak-RAM guarantees. Unlike traditional systems that "try" to stay within memory limits, EM-√ enforces a strict memory cap, enabling you to process arbitrarily large datasets using small memory footprints.

11 min read Original article ↗

EM-√ (EM-Sqrt): External-Memory ETL Engine

Process any dataset size with a fixed, small memory footprint.

License: Apache-2.0 Rust

EM-√ is an external-memory ETL/log processing engine with hard peak-RAM guarantees. Unlike traditional systems that "try" to stay within memory limits, EM-√ enforces a strict memory cap, enabling you to process arbitrarily large datasets using small memory footprints.

Key Features

  • Hard Memory Guarantees: Never exceeds the configured memory cap (default 512MB). All allocations are tracked via RAII guards.
  • External-Memory Operators: Sort, join, and aggregate operations automatically spill to disk when memory limits are hit.
  • Tree Evaluation (TE) Scheduling: Principled execution schedule that decomposes plans into blocks with bounded fan-in to control peak memory.
  • Cloud-Ready: Spill segments support local filesystem with checksums and compression. S3 and GCS adapters are planned.
  • Pluggable Spill Storage: Point spills at local paths or cloud object stores (S3, GCS, Azure) with retry/backoff controls.
  • Parquet Support: Native columnar Parquet I/O with Arrow integration (optional --features parquet).
  • Grace Hash Join: Automatic partition-based hash join for datasets exceeding memory limits.
  • Deterministic Execution: Stable plan hashing for reproducibility and auditability.
  • Memory-Constrained Environments: Designed for edge computing, serverless, embedded systems, and containerized deployments.

Quick Start

Installation

# Clone the repository
git clone https://github.com/logannye/emsqrt.git
cd emsqrt

# Build the project
cargo build --release

# Run tests
cargo test

Basic Usage

Programmatic API

use emsqrt_core::schema::{Field, DataType, Schema};
use emsqrt_core::dag::LogicalPlan as L;
use emsqrt_core::config::EngineConfig;
use emsqrt_planner::{rules, lower_to_physical, estimate_work};
use emsqrt_te::plan_te;
use emsqrt_exec::Engine;

// Define your schema
let schema = Schema {
    fields: vec![
        Field::new("id", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("age", DataType::Int64, false),
    ],
};

// Build a logical plan: scan → filter → project → sink
let scan = L::Scan {
    source: "file:///path/to/input.csv".to_string(),
    schema: schema.clone(),
};

let filter = L::Filter {
    input: Box::new(scan),
    expr: "age > 25".to_string(),
};

let project = L::Project {
    input: Box::new(filter),
    columns: vec!["name".to_string(), "age".to_string()],
};

let sink = L::Sink {
    input: Box::new(project),
    destination: "file:///path/to/output.csv".to_string(),
    format: "csv".to_string(),
};

// Optimize and execute
let optimized = rules::optimize(sink);
let phys_prog = lower_to_physical(&optimized);
let work = estimate_work(&optimized, None);
let te = plan_te(&phys_prog.plan, &work, 512 * 1024 * 1024)?; // 512MB memory cap

// Configure and run
let mut config = EngineConfig::default();
config.mem_cap_bytes = 512 * 1024 * 1024; // 512MB
config.spill_dir = "/tmp/emsqrt-spill".to_string();

let mut engine = Engine::new(config).expect("engine initialization");
let manifest = engine.run(&phys_prog, &te)?;

println!("Execution completed in {}ms", manifest.finished_ms - manifest.started_ms);

YAML DSL

The YAML DSL supports linear pipelines with the following operators:

steps:
  - op: scan
    source: "data/logs.csv"
    schema:
      - { name: "ts", type: "Utf8", nullable: false }
      - { name: "uid", type: "Utf8", nullable: false }
      - { name: "amount", type: "Float64", nullable: true }
  
  - op: filter
    expr: "amount > 1000"
  
  - op: project
    columns: ["ts", "uid", "amount"]
  
  - op: sink
    destination: "results/filtered.csv"
    format: "csv"  # or "parquet" (requires --features parquet)

Note: Currently supports scan, filter, project, map, and sink. Aggregate and join operators are not yet supported in YAML (use the programmatic API for these operations).

Add an optional config block to describe spill targets without touching CLI flags:

config:
  spill_uri: "s3://my-bucket/spill"
  spill_aws_region: "us-east-1"
  spill_gcs_service_account: "/path/to/service-account.json"
steps:
  - op: scan
    source: "data/logs.csv"
    schema: []
  - op: sink
    destination: "stdout"
    format: "csv"

Values from config merge with environment variables and command-line overrides.

Parquet Support: Scan and Sink operators support Parquet format when built with --features parquet. Files are automatically detected by extension (.parquet, .parq) or can be explicitly specified with format: "parquet".

CLI Usage

The EM-√ CLI provides a convenient way to run pipelines from YAML files:

# Validate a pipeline YAML file
emsqrt validate --pipeline examples/simple_pipeline.yaml

# Show execution plan (EXPLAIN)
emsqrt explain --pipeline examples/simple_pipeline.yaml --memory-cap 536870912

# Execute a pipeline
emsqrt run --pipeline examples/simple_pipeline.yaml

# Override configuration via command-line flags
emsqrt run \
  --pipeline examples/simple_pipeline.yaml \
  --memory-cap 1073741824 \
  --spill-uri s3://my-bucket/spill \
  --spill-aws-region us-east-1 \
  --spill-aws-access-key-id AKIA... \
  --spill-aws-secret-access-key SECRET... \
  --spill-gcs-service-account /path/to/sa.json \
  --spill-azure-access-key azureKey \
  --spill-retry-max 5 \
  --spill-dir /tmp/emsqrt-spill \
  --max-parallel 4

See examples/README.md for more details on YAML pipeline syntax.

Cloud Spill Authentication

When using S3/GCS/Azure spill URIs, provide credentials via CLI flags, environment variables, or the platform's SDK defaults:

  • S3: export AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY or use aws configure; optionally include --spill-aws-region.
  • GCS: set GOOGLE_SERVICE_ACCOUNT/GOOGLE_SERVICE_ACCOUNT_PATH or run gcloud auth application-default login.
  • Azure: use az login and AZURE_STORAGE_CONNECTION_STRING or pass --spill-azure-access-key.

The config block in examples/cloud_spill/pipeline.yaml illustrates a spill URI plus retry tuning so you can avoid repeating CLI flags per run.

Examples of Practical Use Cases

1. Serverless Data Pipelines

Process large datasets in AWS Lambda, Google Cloud Functions, or Azure Functions with strict memory limits:

// Process 100GB dataset in a 512MB Lambda
// Note: S3 spill support is planned; currently use local filesystem
let config = EngineConfig {
    mem_cap_bytes: 512 * 1024 * 1024, // 512MB
    spill_dir: "/tmp/lambda-spill".to_string(),
    ..Default::default()
};

Value: 10-100x cost reduction vs. large EC2 instances or EMR clusters.

2. Edge Data Processing

Aggregate sensor data on IoT gateways or embedded devices with limited RAM:

// Process 1M sensor readings on a Raspberry Pi with 256MB RAM
let config = EngineConfig {
    mem_cap_bytes: 128 * 1024 * 1024, // Use only 128MB
    spill_dir: "/tmp/sensor-spill".to_string(),
    ..Default::default()
};

Value: Enable edge analytics without hardware upgrades.

3. Multi-Tenant Data Platforms

Run customer queries with isolated memory budgets:

// Each customer gets a memory budget
// Note: S3 spill support is planned; currently use local filesystem
let config = EngineConfig {
    mem_cap_bytes: customer_memory_budget,
    spill_dir: format!("/tmp/platform-spill/customer-{}", customer_id),
    ..Default::default()
};

Value: Predictable performance, resource isolation, accurate cost attribution.

4. Cost-Optimized Analytics

Use smaller, cheaper instances by trading I/O for memory:

// Process 500GB dataset on a 4GB RAM instance instead of 64GB
let config = EngineConfig {
    mem_cap_bytes: 4 * 1024 * 1024 * 1024, // 4GB
    spill_dir: "/fast-nvme/spill".to_string(),
    ..Default::default()
};

Value: 10x cost reduction for memory-bound workloads.

Architecture

EM-√ is built as a modular Rust workspace with the following crates:

emsqrt-core/      - Core types, schemas, DAGs, memory budget traits
emsqrt-te/        - Tree Evaluation planner (bounded fan-in decomposition)
emsqrt-mem/       - Memory budget implementation, spill manager, buffer pool
emsqrt-io/        - I/O adapters (CSV, JSONL, Parquet, storage backends)
emsqrt-operators/ - Query operators (filter, project, sort, join, aggregate)
emsqrt-planner/   - Logical/physical planning, optimization, YAML DSL
emsqrt-exec/      - Execution runtime, scheduler, engine
emsqrt-cli/       - Command-line interface for running pipelines

Execution Flow

  1. Planning: YAML/Logical plan → Optimized logical plan → Physical plan with operator bindings
  2. TE Scheduling: Physical plan → Tree Evaluation blocks with bounded fan-in
  3. Execution: Blocks executed in dependency order, respecting memory budget
  4. Spilling: Operators automatically spill to disk when memory limits are hit
  5. Manifest: Deterministic execution manifest with plan hashes for reproducibility

Memory Management

  • MemoryBudget: RAII guards track all allocations
  • SpillManager: Checksummed, compressed segments for external-memory operations
  • TE Frontier: Bounded live blocks guarantee peak memory ≤ cap

Configuration

EngineConfig

pub struct EngineConfig {
    /// Hard memory cap (bytes). The engine must NEVER exceed this.
    pub mem_cap_bytes: usize,
    
    /// Optional block-size hint (TE planner may override)
    pub block_size_hint: Option<usize>,
    
    /// Max on-disk spill concurrency
    pub max_spill_concurrency: usize,
    
    /// Optional seed for deterministic shuffles
    pub seed: Option<u64>,
    
    /// Execution parallelism
    pub max_parallel_tasks: usize,
    
    /// Directory for spill files
    pub spill_dir: String,
}

Environment Variables

export EMSQRT_MEM_CAP_BYTES=536870912  # 512MB
export EMSQRT_SPILL_DIR=/tmp/emsqrt-spill
export EMSQRT_MAX_PARALLEL_TASKS=4
export EMSQRT_SPILL_URI=s3://my-bucket/emsqrt
export EMSQRT_SPILL_AWS_REGION=us-east-1
export EMSQRT_SPILL_AWS_ACCESS_KEY_ID=AKIA...
export EMSQRT_SPILL_AWS_SECRET_ACCESS_KEY=SECRET...
export EMSQRT_SPILL_AWS_SESSION_TOKEN=optionalSession
export EMSQRT_SPILL_GCS_SA_PATH=/path/to/service-account.json
export EMSQRT_SPILL_AZURE_ACCESS_KEY=azureKey
export EMSQRT_SPILL_RETRY_MAX_RETRIES=5
export EMSQRT_SPILL_RETRY_INITIAL_MS=250
export EMSQRT_SPILL_RETRY_MAX_MS=5000

Default Configuration

EngineConfig::default()
// mem_cap_bytes: 512 MiB
// max_spill_concurrency: 4
// max_parallel_tasks: 4
// spill_dir: "/tmp/emsqrt-spill"

StorageConfig

pub struct StorageConfig {
    pub uri: Option<String>,        // e.g. s3://bucket/prefix
    pub root: String,               // normalized spill root
    pub aws_region: Option<String>,
    pub aws_access_key_id: Option<String>,
    pub aws_secret_access_key: Option<String>,
    pub aws_session_token: Option<String>,
    pub gcs_service_account_path: Option<String>,
    pub azure_access_key: Option<String>,
    pub retry_max_retries: usize,
    pub retry_initial_backoff_ms: u64,
    pub retry_max_backoff_ms: u64,
}

EngineConfig::storage_config() produces this snapshot and emsqrt-io uses it to choose between filesystem and cloud adapters.

Building & Testing

Build

# Debug build
cargo build

# Release build (optimized)
cargo build --release

# Build specific crate
cargo build -p emsqrt-exec

Run Tests

# All tests (unit tests in crates)
cargo test --all --lib

# Specific test suite (in workspace root tests/ directory)
cargo test --test integration_tests
cargo test --test expression_tests
cargo test --test cost_estimation_tests

# Run comprehensive test suite (10 phases)
./scripts/run_all_tests.sh

Test Coverage

The comprehensive test suite (scripts/run_all_tests.sh) includes 10 phases:

  1. Unit Tests: SpillManager, RowBatch helpers, Memory budget
  2. Integration Tests: Full pipeline tests (scan, filter, project, sort, aggregate, sink, join)
  3. E2E Tests: End-to-end smoke tests
  4. Crate-Level Tests: All library unit tests across crates
  5. Expression Engine Tests: Expression parsing and evaluation
  6. Column Statistics Tests: Statistics collection and cost estimation
  7. Error Handling Tests: Error context and recovery
  8. Operator Tests: Merge join, filter with expressions
  9. Feature-Specific Tests: Parquet, Arrow (when features enabled)
  10. CLI Tests: YAML parsing and validation

Supported Operations

Currently Implemented

  • Scan: Read CSV and Parquet files with schema inference
  • Filter: Predicate filtering (e.g., age > 25, name == "Alice")
  • Project: Column selection and renaming
  • Map: Column renaming (e.g., old_name AS new_name)
  • Sort: External sort with k-way merge
  • Aggregate: Group-by with COUNT, SUM, AVG, MIN, MAX
  • Join: Hash join (with Grace hash join for large datasets), merge join (sorted merge join for pre-sorted inputs)
  • Sink: Write CSV and Parquet files
  • Expression Engine: Full SQL-like expressions with operator precedence, cross-type arithmetic, and logical operations
  • Statistics: Column statistics (min/max/distinct_count/null_count) for cost estimation and selectivity modeling
  • Parquet I/O: Native columnar read/write with Arrow integration (requires --features parquet)
  • Arrow Integration: Columnar processing with RecordBatch ↔ RowBatch conversion utilities
  • Grace Hash Join: Partition-based hash join for very large datasets with automatic spilling

Planned Features

  • 🔄 Cloud Storage: S3, GCS adapters for spill segments (currently filesystem only)

How It Works

Tree Evaluation (TE)

Tree Evaluation is a principled execution scheduling approach that:

  1. Decomposes plans into blocks with bounded fan-in (e.g., each join block depends on at most K input blocks)
  2. Controls the live frontier (the set of materialized blocks at any time)
  3. Guarantees peak memoryK * block_size + overhead

External-Memory Operators

When memory limits are hit, operators automatically:

  1. Spill to disk: Write intermediate results to checksummed, compressed segments
  2. Partition: Divide work into smaller chunks that fit in memory
  3. Merge: Combine results from multiple partitions/runs

Example: External sort generates sorted runs, then performs k-way merge.

Memory Budget Enforcement

Every allocation requires a BudgetGuard:

let guard = budget.try_acquire(bytes, "my-buffer")?;
// Allocate memory...
// Guard automatically releases bytes on drop (RAII)

If try_acquire returns None, the operator must spill or partition.

Performance

Benchmarks (Planned)

  • Sort 10GB with 512MB memory
  • Join 1GB × 1GB with 50MB memory
  • Aggregate 1M groups with 20MB memory
  • TPC-H queries (Q1, Q3, Q6)

See docs/benchmarks.md for the current Criterion harness and the scripts/benchmarks/run_benchmarks.sh helper.

Expected Characteristics

  • Throughput: 10-100x slower than in-memory systems (by design)
  • Memory: Guaranteed to never exceed cap (unlike other systems)
  • Scalability: Can process datasets 100-1000x larger than available RAM

Development

Project Structure

emsqrt/
├── Cargo.toml              # Workspace configuration
├── crates/
│   ├── emsqrt-core/       # Core types and traits
│   ├── emsqrt-te/          # Tree Evaluation planner
│   ├── emsqrt-mem/         # Memory budget and spill manager
│   ├── emsqrt-io/          # I/O adapters
│   ├── emsqrt-operators/   # Query operators
│   ├── emsqrt-planner/     # Planning and optimization
│   ├── emsqrt-exec/        # Execution runtime
│   └── emsqrt-cli/         # Command-line interface
├── tests/                  # Integration and unit tests (workspace root)
├── scripts/                # Utility scripts (run_all_tests.sh)
├── examples/               # YAML pipeline examples
└── README.md               # This file

Adding a New Operator

  1. Implement the Operator trait in emsqrt-operators/src/
  2. Register in emsqrt-operators/src/registry.rs
  3. Add to planner lowering in emsqrt-planner/src/lower.rs
  4. Add tests in tests/

Code Style

  • Follow Rust standard formatting: cargo fmt
  • All code must compile with #![forbid(unsafe_code)]
  • Use thiserror for error types
  • Use serde for serialization

Contributing

Contributions are welcome! Areas of particular interest:

  • Cloud storage adapters (S3, GCS, Azure) - placeholders exist, need implementation
  • Additional operators (window functions, lateral joins)
  • YAML DSL support for aggregate and join operators
  • Performance optimizations (SIMD in Arrow operations, parallel processing)
  • Documentation improvements

Acknowledgments

This project implements Tree Evaluation (TE) scheduling for external-memory query processing, enabling predictable memory usage in constrained environments.


Take-home: EM-√ trades throughput for guaranteed memory bounds. Use it when memory constraints are more important than raw speed.

Repo is a dynamic work, please be aware that it will evolve and further develop over time.