A data processing framework for Rust inspired by Apache Beam and Google Cloud Dataflow. Ironbeam provides a declarative API for building batch data pipelines with support for transformations, aggregations, joins, and I/O operations.
Features
- Declarative pipeline API with fluent interface
- Stateless operations:
map,filter,flat_map,map_batches,filter_map - Stateful operations:
group_by_key,combine_values, keyed aggregations - Built-in aggregations:
count_globally,count_per_key,count_per_element,top_k_per_key,to_list_per_key,to_set_per_key,latest_globally,latest_per_key,sample_reservoir,sample_values_reservoir,distinct,distinct_per_key,approx_distinct_count; lower-level combiners (Sum,Min,Max,AverageF64,ApproxMedian,ApproxQuantiles,TDigest) available viacombine_values/combine_globally - Flatten: merge multiple
PCollections of the same type into one - Multi-output / side outputs: enum-based routing with the
partition!macro, compile-time type-safe - N-way CoGroupByKey:
cogroup_by_key!macro for 2–10 inputs - Enhanced filters:
filter_eq,filter_ne,filter_lt,filter_le,filter_gt,filter_ge,filter_range,filter_range_inclusive,filter_by - Reservoir sampling:
sample_reservoir(global) andsample_values_reservoir(per-key) - Join support: inner, left, right, and full outer joins
- Side inputs for enriching streams with auxiliary data (Vec and HashMap views)
- Sequential and parallel execution modes
- Type-safe with compile-time correctness
- Optional I/O backends: JSON Lines, CSV, Parquet
- Optional compression: gzip, zstd, bzip2, xz
- Metrics collection and checkpointing for fault tolerance
- Automatic memory spilling to disk for memory-constrained environments
- Cloud I/O abstractions for provider-agnostic cloud integrations
- Data validation utilities for production-grade error handling
- Comprehensive testing framework with fixtures and assertions
Installation
Add to your Cargo.toml:
[dependencies] ironbeam = "3"
By default, all features are enabled. To use a minimal configuration:
[dependencies] ironbeam = { version = "3", default-features = false }
Available feature flags:
io-jsonl- JSON Lines supportio-csv- CSV supportio-parquet- Parquet supportcompression-gzip- gzip compressioncompression-zstd- zstd compressioncompression-bzip2- bzip2 compressioncompression-xz- xz compressionparallel-io- parallel I/O operationsmetrics- pipeline metrics collectioncheckpointing- checkpoint and recovery supportspilling- automatic memory spilling to disk
Quick Start
use ironbeam::*; fn main() -> anyhow::Result<()> { // Create a pipeline let p = Pipeline::default(); // Build a word count pipeline let lines = from_vec(&p, vec![ "hello world".to_string(), "hello rust".to_string(), ]); let counts = lines .flat_map(|line: &String| { line.split_whitespace() .map(|w| w.to_string()) .collect::<Vec<_>>() }) .key_by(|word: &String| word.clone()) .map_values(|_word: &String| 1u64) .combine_values(Count); // Execute and collect results let results = counts.collect_seq()?; for (word, count) in results { println!("{}: {}", word, count); } Ok(()) }
Core Concepts
Pipeline
A Pipeline is the container for your computation graph. Create one with Pipeline::default(), then attach data sources and transformations.
PCollection
A PCollection<T> represents a distributed collection of elements. Collections are immutable, lazy, and type-safe. Transformations create new collections, and computation happens when you call an execution method like collect_seq() or collect_par().
Transformations
Stateless operations work on individual elements:
map- transform each elementfilter- keep elements matching a predicateflat_map- transform each element into zero or more outputsmap_batches- process elements in batches
Stateful operations work on keyed data:
key_by/with_keys/with_constant_key- convert to a keyed collectionmap_values/filter_values- transform or filter values while preserving keysgroup_by_key- group values by keycombine_values/combine_values_lifted- aggregate values per keytop_k_per_key- select top K values per keydistinct/distinct_per_key- remove duplicate elements or values
Combiners
Ironbeam provides convenience methods for common aggregations, plus lower-level combiners for
use with combine_values / combine_globally.
Convenience methods (call directly on a collection):
count_globally()— total element count →PCollection<u64>count_per_key()— element count per key →PCollection<(K, u64)>count_per_element()— frequency of each distinct element →PCollection<(T, u64)>top_k_per_key(k)— K largest values per key →PCollection<(K, Vec<V>)>to_list_per_key()— all values per key →PCollection<(K, Vec<V>)>to_set_per_key()— unique values per key →PCollection<(K, HashSet<V>)>latest_globally()— value with the latest timestamp →PCollection<T>latest_per_key()— latest value per key →PCollection<(K, T)>sample_reservoir(k, seed)— global reservoir sample →PCollection<T>sample_reservoir_vec(k, seed)— global sample as a single Vec →PCollection<Vec<T>>sample_values_reservoir(k, seed)— per-key reservoir sample →PCollection<(K, V)>distinct()— remove global duplicates →PCollection<T>distinct_per_key()— remove duplicate values per key →PCollection<(K, V)>approx_distinct_count(k)— estimated cardinality (KMV) →PCollection<f64>approx_distinct_count_per_key(k)— estimated cardinality per key →PCollection<(K, f64)>
Lower-level combiners for use with combine_values / combine_globally:
Sum/Min/Max— basic numeric aggregationAverageF64— arithmetic meanApproxMedian/ApproxQuantiles/TDigest— statistical estimators
Custom combiners can be implemented via the CombineFn trait.
Flatten
Merge multiple PCollections of the same type into one:
let merged = flatten(&[&pc1, &pc2, &pc3]);
Multi-Output / Side Outputs
Split a collection into multiple typed output streams using enums and the partition! macro.
All output types are checked at compile time:
#[derive(Clone)] enum ParseResult { Integer(i64), Float(f64), Invalid(String), } let classified = lines.flat_map(|s: &&str| { if let Ok(i) = s.parse::<i64>() { vec![ParseResult::Integer(i)] } else if let Ok(f) = s.parse::<f64>() { vec![ParseResult::Float(f)] } else { vec![ParseResult::Invalid(s.to_string())] } }); partition!(classified, ParseResult, { Integer => integers, Float => floats, Invalid => errors }); // integers, floats, and errors are now independent PCollections
N-Way CoGroupByKey
Full outer join of N-keyed collections on a shared key type:
// cogroup_by_key! supports 2–10 inputs let grouped = cogroup_by_key!(users, orders, addresses); let results = grouped.flat_map(|(user_id, (users, orders, addrs))| { // users: Vec<User>, orders: Vec<Order>, addrs: Vec<Address> vec![...] });
Joins
Two-collection keyed joins:
let joined = left.join_inner(&right);
Available: join_inner, join_left, join_right, join_full.
Side Inputs
Enrich elements with small, broadcast auxiliary data:
// Vec side input let primes = side_vec(vec![2u32, 3, 5, 7]); let flagged = nums.map_with_side(&primes, |n, ps| ps.contains(n)); // HashMap side input (O(1) lookup) let lookup = side_hashmap(vec![("a".to_string(), 1u32), ("b".to_string(), 2)]); let enriched = data.map_with_side_map(&lookup, |(k, v), m| { (k.clone(), v + m.get(k).copied().unwrap_or(0)) });
Sampling
Uniform reservoir sampling, deterministic across sequential and parallel execution:
// Sample 1000 elements globally (returns PCollection<Vec<T>>) let sample = data.sample_reservoir_vec(1000, /*seed=*/ 42); // Sample 10 values per key let per_key = keyed.sample_values_reservoir(10, 42);
Execution
Execute pipelines in sequential or parallel mode:
let results = collection.collect_seq()?; // Single-threaded let results = collection.collect_par()?; // Multithreading with Rayon
I/O Examples
JSON Lines
use ironbeam::*; use serde::{Deserialize, Serialize}; #[derive(Clone, Serialize, Deserialize)] struct Record { id: u32, name: String, } fn main() -> anyhow::Result<()> { let p = Pipeline::default(); // Read file let data = read_jsonl::<Record>(&p, "data.jsonl")?; // Process let filtered = data.filter(|r: &Record| r.id > 100); // Write results filtered.write_jsonl("output.jsonl")?; Ok(()) }
CSV
let data = read_csv::<Record>(&p, "data.csv")?; data.write_csv("output.csv")?;
Parquet
let data = read_parquet::<Record>(&p, "data.parquet")?; data.write_parquet("output.parquet")?;
Compression
Compression is automatically detected by file extension:
// Read compressed file let data = read_jsonl::<Record>(&p, "data.jsonl.gz")?; // Write compressed file data.write_jsonl("output.jsonl.zst")?;
Supported extensions: .gz, .zst, .bz2, .xz
Advanced Features
Windowing
Group time-series data into fixed or sliding windows:
let windowed = data .window_fixed(Duration::from_secs(60)) .group_by_key() .combine_values(Sum);
Checkpointing
Save and restore the pipeline's state for fault tolerance:
data.checkpoint("checkpoints/step1")?; // Later, recover from checkpoint let recovered = recover_checkpoint::<MyType>(&p, "checkpoints/step1")?;
Metrics
Collect pipeline execution metrics:
let result = collection.collect_seq()?; let metrics = p.get_metrics(); println!("Elements processed: {}", metrics.elements_processed);
Automatic Memory Spilling
For memory-constrained environments, Ironbeam can automatically spill data to persistent storage when memory limits are exceeded:
use ironbeam::spill::SpillConfig; use ironbeam::spill_integration::init_spilling; // Configure automatic spilling with a 100MB memory limit init_spilling(SpillConfig::new() .with_memory_limit(100 * 1024 * 1024) .with_spill_directory("/tmp/ironbeam-spill") .with_compression(true)); // Now run your pipeline - spilling happens automatically let results = large_dataset .map(|x| heavy_computation(x)) .collect_seq()?;
Data is transparently spilled to persistent storage when memory pressure is detected and automatically restored when needed. This allows processing datasets larger than available RAM without manual intervention.
Cloud I/O Abstractions
Write provider-agnostic code that works with any cloud service:
use ironbeam::io::cloud::*; // Use fake implementations for testing let storage = FakeObjectIO::new(); storage.put_object("bucket", "key", b"data")?; // In production, swap in real implementations (AWS S3, GCS, Azure Blob, etc.) // without changing your business logic
Supported cloud service categories:
- Storage: Object storage (S3/GCS/Azure), warehouses (BigQuery/Redshift), databases
- Messaging: Pub/sub (Kafka/Kinesis), queues (SQS), notifications (SNS)
- Compute: Serverless functions (Lambda/Cloud Functions)
- ML: Model inference (SageMaker/Vertex AI)
- Observability: Metrics, logging, configuration, caching
All traits are synchronous by design and include fake implementations for unit testing.
Data Validation
Handle bad data gracefully in production pipelines:
use ironbeam::validation::*; impl Validate for MyRecord { fn validate(&self) -> ValidationResult { let mut errors = Vec::new(); if self.email.is_empty() { errors.push(ValidationError::field("email", "Email required")); } if self.age < 0 || self.age > 150 { errors.push(ValidationError::field("age", "Invalid age")); } if errors.is_empty() { Ok(()) } else { Err(errors) } } } // Apply validation with configurable error handling let validated = data .try_map(|record: &MyRecord| record.validate()) .collect_fail_fast()?; // Or use .collect_skip_errors()
Examples
The examples/ directory contains complete demonstrations:
etl_pipeline.rs- Extract, transform, load workflowadvanced_joins.rs- Join operationswindowing_aggregations.rs- Time-based windowingcombiners_showcase.rs- Using built-in combinerscompressed_io.rs- Working with compressed filescheckpointing_demo.rs- Checkpoint and recoverymetrics_example.rs- Collecting metricscloud_io_demo.rs- Cloud service integrationsdata_quality_validation.rs- Production data validationtesting_pipeline.rs- Testing patterns
Run examples with:
cargo run --example etl_pipeline --features io-jsonl,io-csv cargo run --example cloud_io_demo
Testing Your Pipelines
Ironbeam includes a comprehensive testing framework with utilities specifically designed for pipeline testing:
Test Utilities
use ironbeam::testing::*; #[test] fn test_my_pipeline() -> anyhow::Result<()> { let p = TestPipeline::new(); let result = from_vec(&p, vec![1, 2, 3]) .map(|x: &i32| x * 2) .collect_seq()?; // Specialized assertions for collections assert_collections_equal(result, vec![2, 4, 6]); assert_all(&result, |x| x % 2 == 0); Ok(()) }
Pre-built Test Fixtures
use ironbeam::testing::fixtures::*; // Ready-to-use test datasets let logs = sample_log_entries(); // Web server logs let users = user_product_interactions(); // Relational data let series = time_series_data(); // Sensor readings
Debug Utilities
// Inspect data during test execution let result = data .debug_inspect("after filter") .filter(|x: &i32| x > 10) .debug_count("filtered count") .collect_seq()?;
Running Tests
Run the full test suite:
Run tests with specific features:
cargo test --features spillingFor coverage:
cargo tarpaulin --out Html
License
This project uses the Rust 2024 edition and requires a recent Rust toolchain.