EM-√ (EM-Sqrt): External-Memory ETL Engine
Process any dataset size with a fixed, small memory footprint.
EM-√ is an external-memory ETL/log processing engine with hard peak-RAM guarantees. Unlike traditional systems that "try" to stay within memory limits, EM-√ enforces a strict memory cap, enabling you to process arbitrarily large datasets using small memory footprints.
Key Features
- Hard Memory Guarantees: Never exceeds the configured memory cap (default 512MB). All allocations are tracked via RAII guards.
- External-Memory Operators: Sort, join, and aggregate operations automatically spill to disk when memory limits are hit.
- Tree Evaluation (TE) Scheduling: Principled execution schedule that decomposes plans into blocks with bounded fan-in to control peak memory.
- Cloud-Ready: Spill segments support local filesystem with checksums and compression. S3 and GCS adapters are planned.
- Pluggable Spill Storage: Point spills at local paths or cloud object stores (S3, GCS, Azure) with retry/backoff controls.
- Parquet Support: Native columnar Parquet I/O with Arrow integration (optional
--features parquet). - Grace Hash Join: Automatic partition-based hash join for datasets exceeding memory limits.
- Deterministic Execution: Stable plan hashing for reproducibility and auditability.
- Memory-Constrained Environments: Designed for edge computing, serverless, embedded systems, and containerized deployments.
Quick Start
Installation
# Clone the repository git clone https://github.com/logannye/emsqrt.git cd emsqrt # Build the project cargo build --release # Run tests cargo test
Basic Usage
Programmatic API
use emsqrt_core::schema::{Field, DataType, Schema}; use emsqrt_core::dag::LogicalPlan as L; use emsqrt_core::config::EngineConfig; use emsqrt_planner::{rules, lower_to_physical, estimate_work}; use emsqrt_te::plan_te; use emsqrt_exec::Engine; // Define your schema let schema = Schema { fields: vec![ Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false), Field::new("age", DataType::Int64, false), ], }; // Build a logical plan: scan → filter → project → sink let scan = L::Scan { source: "file:///path/to/input.csv".to_string(), schema: schema.clone(), }; let filter = L::Filter { input: Box::new(scan), expr: "age > 25".to_string(), }; let project = L::Project { input: Box::new(filter), columns: vec!["name".to_string(), "age".to_string()], }; let sink = L::Sink { input: Box::new(project), destination: "file:///path/to/output.csv".to_string(), format: "csv".to_string(), }; // Optimize and execute let optimized = rules::optimize(sink); let phys_prog = lower_to_physical(&optimized); let work = estimate_work(&optimized, None); let te = plan_te(&phys_prog.plan, &work, 512 * 1024 * 1024)?; // 512MB memory cap // Configure and run let mut config = EngineConfig::default(); config.mem_cap_bytes = 512 * 1024 * 1024; // 512MB config.spill_dir = "/tmp/emsqrt-spill".to_string(); let mut engine = Engine::new(config).expect("engine initialization"); let manifest = engine.run(&phys_prog, &te)?; println!("Execution completed in {}ms", manifest.finished_ms - manifest.started_ms);
YAML DSL
The YAML DSL supports linear pipelines with the following operators:
steps: - op: scan source: "data/logs.csv" schema: - { name: "ts", type: "Utf8", nullable: false } - { name: "uid", type: "Utf8", nullable: false } - { name: "amount", type: "Float64", nullable: true } - op: filter expr: "amount > 1000" - op: project columns: ["ts", "uid", "amount"] - op: sink destination: "results/filtered.csv" format: "csv" # or "parquet" (requires --features parquet)
Note: Currently supports scan, filter, project, map, and sink. Aggregate and join operators are not yet supported in YAML (use the programmatic API for these operations).
Add an optional config block to describe spill targets without touching CLI flags:
config: spill_uri: "s3://my-bucket/spill" spill_aws_region: "us-east-1" spill_gcs_service_account: "/path/to/service-account.json" steps: - op: scan source: "data/logs.csv" schema: [] - op: sink destination: "stdout" format: "csv"
Values from config merge with environment variables and command-line overrides.
Parquet Support: Scan and Sink operators support Parquet format when built with --features parquet. Files are automatically detected by extension (.parquet, .parq) or can be explicitly specified with format: "parquet".
CLI Usage
The EM-√ CLI provides a convenient way to run pipelines from YAML files:
# Validate a pipeline YAML file emsqrt validate --pipeline examples/simple_pipeline.yaml # Show execution plan (EXPLAIN) emsqrt explain --pipeline examples/simple_pipeline.yaml --memory-cap 536870912 # Execute a pipeline emsqrt run --pipeline examples/simple_pipeline.yaml # Override configuration via command-line flags emsqrt run \ --pipeline examples/simple_pipeline.yaml \ --memory-cap 1073741824 \ --spill-uri s3://my-bucket/spill \ --spill-aws-region us-east-1 \ --spill-aws-access-key-id AKIA... \ --spill-aws-secret-access-key SECRET... \ --spill-gcs-service-account /path/to/sa.json \ --spill-azure-access-key azureKey \ --spill-retry-max 5 \ --spill-dir /tmp/emsqrt-spill \ --max-parallel 4
See examples/README.md for more details on YAML pipeline syntax.
Cloud Spill Authentication
When using S3/GCS/Azure spill URIs, provide credentials via CLI flags, environment variables, or the platform's SDK defaults:
- S3: export
AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEYor useaws configure; optionally include--spill-aws-region. - GCS: set
GOOGLE_SERVICE_ACCOUNT/GOOGLE_SERVICE_ACCOUNT_PATHor rungcloud auth application-default login. - Azure: use
az loginandAZURE_STORAGE_CONNECTION_STRINGor pass--spill-azure-access-key.
The config block in examples/cloud_spill/pipeline.yaml illustrates a spill URI plus retry tuning so you can avoid repeating CLI flags per run.
Examples of Practical Use Cases
1. Serverless Data Pipelines
Process large datasets in AWS Lambda, Google Cloud Functions, or Azure Functions with strict memory limits:
// Process 100GB dataset in a 512MB Lambda // Note: S3 spill support is planned; currently use local filesystem let config = EngineConfig { mem_cap_bytes: 512 * 1024 * 1024, // 512MB spill_dir: "/tmp/lambda-spill".to_string(), ..Default::default() };
Value: 10-100x cost reduction vs. large EC2 instances or EMR clusters.
2. Edge Data Processing
Aggregate sensor data on IoT gateways or embedded devices with limited RAM:
// Process 1M sensor readings on a Raspberry Pi with 256MB RAM let config = EngineConfig { mem_cap_bytes: 128 * 1024 * 1024, // Use only 128MB spill_dir: "/tmp/sensor-spill".to_string(), ..Default::default() };
Value: Enable edge analytics without hardware upgrades.
3. Multi-Tenant Data Platforms
Run customer queries with isolated memory budgets:
// Each customer gets a memory budget // Note: S3 spill support is planned; currently use local filesystem let config = EngineConfig { mem_cap_bytes: customer_memory_budget, spill_dir: format!("/tmp/platform-spill/customer-{}", customer_id), ..Default::default() };
Value: Predictable performance, resource isolation, accurate cost attribution.
4. Cost-Optimized Analytics
Use smaller, cheaper instances by trading I/O for memory:
// Process 500GB dataset on a 4GB RAM instance instead of 64GB let config = EngineConfig { mem_cap_bytes: 4 * 1024 * 1024 * 1024, // 4GB spill_dir: "/fast-nvme/spill".to_string(), ..Default::default() };
Value: 10x cost reduction for memory-bound workloads.
Architecture
EM-√ is built as a modular Rust workspace with the following crates:
emsqrt-core/ - Core types, schemas, DAGs, memory budget traits
emsqrt-te/ - Tree Evaluation planner (bounded fan-in decomposition)
emsqrt-mem/ - Memory budget implementation, spill manager, buffer pool
emsqrt-io/ - I/O adapters (CSV, JSONL, Parquet, storage backends)
emsqrt-operators/ - Query operators (filter, project, sort, join, aggregate)
emsqrt-planner/ - Logical/physical planning, optimization, YAML DSL
emsqrt-exec/ - Execution runtime, scheduler, engine
emsqrt-cli/ - Command-line interface for running pipelines
Execution Flow
- Planning: YAML/Logical plan → Optimized logical plan → Physical plan with operator bindings
- TE Scheduling: Physical plan → Tree Evaluation blocks with bounded fan-in
- Execution: Blocks executed in dependency order, respecting memory budget
- Spilling: Operators automatically spill to disk when memory limits are hit
- Manifest: Deterministic execution manifest with plan hashes for reproducibility
Memory Management
- MemoryBudget: RAII guards track all allocations
- SpillManager: Checksummed, compressed segments for external-memory operations
- TE Frontier: Bounded live blocks guarantee peak memory ≤ cap
Configuration
EngineConfig
pub struct EngineConfig { /// Hard memory cap (bytes). The engine must NEVER exceed this. pub mem_cap_bytes: usize, /// Optional block-size hint (TE planner may override) pub block_size_hint: Option<usize>, /// Max on-disk spill concurrency pub max_spill_concurrency: usize, /// Optional seed for deterministic shuffles pub seed: Option<u64>, /// Execution parallelism pub max_parallel_tasks: usize, /// Directory for spill files pub spill_dir: String, }
Environment Variables
export EMSQRT_MEM_CAP_BYTES=536870912 # 512MB export EMSQRT_SPILL_DIR=/tmp/emsqrt-spill export EMSQRT_MAX_PARALLEL_TASKS=4 export EMSQRT_SPILL_URI=s3://my-bucket/emsqrt export EMSQRT_SPILL_AWS_REGION=us-east-1 export EMSQRT_SPILL_AWS_ACCESS_KEY_ID=AKIA... export EMSQRT_SPILL_AWS_SECRET_ACCESS_KEY=SECRET... export EMSQRT_SPILL_AWS_SESSION_TOKEN=optionalSession export EMSQRT_SPILL_GCS_SA_PATH=/path/to/service-account.json export EMSQRT_SPILL_AZURE_ACCESS_KEY=azureKey export EMSQRT_SPILL_RETRY_MAX_RETRIES=5 export EMSQRT_SPILL_RETRY_INITIAL_MS=250 export EMSQRT_SPILL_RETRY_MAX_MS=5000
Default Configuration
EngineConfig::default() // mem_cap_bytes: 512 MiB // max_spill_concurrency: 4 // max_parallel_tasks: 4 // spill_dir: "/tmp/emsqrt-spill"
StorageConfig
pub struct StorageConfig { pub uri: Option<String>, // e.g. s3://bucket/prefix pub root: String, // normalized spill root pub aws_region: Option<String>, pub aws_access_key_id: Option<String>, pub aws_secret_access_key: Option<String>, pub aws_session_token: Option<String>, pub gcs_service_account_path: Option<String>, pub azure_access_key: Option<String>, pub retry_max_retries: usize, pub retry_initial_backoff_ms: u64, pub retry_max_backoff_ms: u64, }
EngineConfig::storage_config() produces this snapshot and emsqrt-io uses it to choose between filesystem and cloud adapters.
Building & Testing
Build
# Debug build cargo build # Release build (optimized) cargo build --release # Build specific crate cargo build -p emsqrt-exec
Run Tests
# All tests (unit tests in crates) cargo test --all --lib # Specific test suite (in workspace root tests/ directory) cargo test --test integration_tests cargo test --test expression_tests cargo test --test cost_estimation_tests # Run comprehensive test suite (10 phases) ./scripts/run_all_tests.sh
Test Coverage
The comprehensive test suite (scripts/run_all_tests.sh) includes 10 phases:
- Unit Tests: SpillManager, RowBatch helpers, Memory budget
- Integration Tests: Full pipeline tests (scan, filter, project, sort, aggregate, sink, join)
- E2E Tests: End-to-end smoke tests
- Crate-Level Tests: All library unit tests across crates
- Expression Engine Tests: Expression parsing and evaluation
- Column Statistics Tests: Statistics collection and cost estimation
- Error Handling Tests: Error context and recovery
- Operator Tests: Merge join, filter with expressions
- Feature-Specific Tests: Parquet, Arrow (when features enabled)
- CLI Tests: YAML parsing and validation
Supported Operations
Currently Implemented
- ✅ Scan: Read CSV and Parquet files with schema inference
- ✅ Filter: Predicate filtering (e.g.,
age > 25,name == "Alice") - ✅ Project: Column selection and renaming
- ✅ Map: Column renaming (e.g.,
old_name AS new_name) - ✅ Sort: External sort with k-way merge
- ✅ Aggregate: Group-by with COUNT, SUM, AVG, MIN, MAX
- ✅ Join: Hash join (with Grace hash join for large datasets), merge join (sorted merge join for pre-sorted inputs)
- ✅ Sink: Write CSV and Parquet files
- ✅ Expression Engine: Full SQL-like expressions with operator precedence, cross-type arithmetic, and logical operations
- ✅ Statistics: Column statistics (min/max/distinct_count/null_count) for cost estimation and selectivity modeling
- ✅ Parquet I/O: Native columnar read/write with Arrow integration (requires
--features parquet) - ✅ Arrow Integration: Columnar processing with RecordBatch ↔ RowBatch conversion utilities
- ✅ Grace Hash Join: Partition-based hash join for very large datasets with automatic spilling
Planned Features
- 🔄 Cloud Storage: S3, GCS adapters for spill segments (currently filesystem only)
How It Works
Tree Evaluation (TE)
Tree Evaluation is a principled execution scheduling approach that:
- Decomposes plans into blocks with bounded fan-in (e.g., each join block depends on at most K input blocks)
- Controls the live frontier (the set of materialized blocks at any time)
- Guarantees peak memory ≤
K * block_size + overhead
External-Memory Operators
When memory limits are hit, operators automatically:
- Spill to disk: Write intermediate results to checksummed, compressed segments
- Partition: Divide work into smaller chunks that fit in memory
- Merge: Combine results from multiple partitions/runs
Example: External sort generates sorted runs, then performs k-way merge.
Memory Budget Enforcement
Every allocation requires a BudgetGuard:
let guard = budget.try_acquire(bytes, "my-buffer")?; // Allocate memory... // Guard automatically releases bytes on drop (RAII)
If try_acquire returns None, the operator must spill or partition.
Performance
Benchmarks (Planned)
- Sort 10GB with 512MB memory
- Join 1GB × 1GB with 50MB memory
- Aggregate 1M groups with 20MB memory
- TPC-H queries (Q1, Q3, Q6)
See docs/benchmarks.md for the current Criterion harness and the scripts/benchmarks/run_benchmarks.sh helper.
Expected Characteristics
- Throughput: 10-100x slower than in-memory systems (by design)
- Memory: Guaranteed to never exceed cap (unlike other systems)
- Scalability: Can process datasets 100-1000x larger than available RAM
Development
Project Structure
emsqrt/
├── Cargo.toml # Workspace configuration
├── crates/
│ ├── emsqrt-core/ # Core types and traits
│ ├── emsqrt-te/ # Tree Evaluation planner
│ ├── emsqrt-mem/ # Memory budget and spill manager
│ ├── emsqrt-io/ # I/O adapters
│ ├── emsqrt-operators/ # Query operators
│ ├── emsqrt-planner/ # Planning and optimization
│ ├── emsqrt-exec/ # Execution runtime
│ └── emsqrt-cli/ # Command-line interface
├── tests/ # Integration and unit tests (workspace root)
├── scripts/ # Utility scripts (run_all_tests.sh)
├── examples/ # YAML pipeline examples
└── README.md # This file
Adding a New Operator
- Implement the
Operatortrait inemsqrt-operators/src/ - Register in
emsqrt-operators/src/registry.rs - Add to planner lowering in
emsqrt-planner/src/lower.rs - Add tests in
tests/
Code Style
- Follow Rust standard formatting:
cargo fmt - All code must compile with
#![forbid(unsafe_code)] - Use
thiserrorfor error types - Use
serdefor serialization
Contributing
Contributions are welcome! Areas of particular interest:
- Cloud storage adapters (S3, GCS, Azure) - placeholders exist, need implementation
- Additional operators (window functions, lateral joins)
- YAML DSL support for aggregate and join operators
- Performance optimizations (SIMD in Arrow operations, parallel processing)
- Documentation improvements
Acknowledgments
This project implements Tree Evaluation (TE) scheduling for external-memory query processing, enabling predictable memory usage in constrained environments.
Take-home: EM-√ trades throughput for guaranteed memory bounds. Use it when memory constraints are more important than raw speed.
Repo is a dynamic work, please be aware that it will evolve and further develop over time.