PHP MapReduce
A high-performance, framework-agnostic MapReduce implementation for PHP that processes large datasets using true parallel processing with separate PHP worker processes and disk-based storage.
Why Use This?
- True CPU parallelism - spawns separate PHP processes for parallel execution, not async coroutines
- Process datasets larger than RAM using memory-efficient disk storage
- Stream iterators without loading into memory - process large files, database cursors, and API responses efficiently
- Utilize multiple CPU cores for CPU-intensive transformations
- Framework-agnostic - works with any PHP project
- Handle millions of records with predictable memory usage
Installation
composer require spiritinlife/php-mapreduce
Requirements: PHP 8.1+
Quick Start
Word count example:
use Spiritinlife\MapReduce\MapReduceBuilder; $documents = [ 'doc1' => 'the quick brown fox', 'doc2' => 'the lazy dog', 'doc3' => 'quick brown dogs', ]; $results = (new MapReduceBuilder()) ->input($documents) ->map(function ($text, $context) { foreach (str_word_count(strtolower($text), 1) as $word) { yield [$word, 1]; } }) ->reduce(function ($word, $countsIterator, $context) { $sum = 0; foreach ($countsIterator as $count) { $sum += $count; } return $sum; }) ->execute(); // Results are streamed as a generator - iterate to process results foreach ($results as $word => $count) { // Output: 'quick' => 2, 'brown' => 2, 'the' => 2, ... }
How It Works
MapReduce processes data in three parallel phases using separate PHP worker processes:
- Map Phase - Input is streamed in batches to parallel worker processes. Each worker emits key-value pairs to partitioned temporary files.
- Shuffle Phase - Partitions are sorted and grouped in parallel using memory-efficient external sorting with separate worker processes.
- Reduce Phase - Partitions are processed in parallel by worker processes, aggregating values by key.
Input → [Parallel Map Workers] → [Parallel Shuffle & Sort] → [Parallel Reduce Workers] → Results
Each phase uses spatie/async to spawn separate PHP processes, enabling true CPU parallelism for compute-intensive operations.
⚠️ Important: Because mapper and reducer functions run in separate processes, you cannot use closureuseclauses to access external variables. Use thecontext()method instead. See Parallel Processing & Context for details.
Input Types
MapReduce accepts any iterable as input - arrays, generators, iterators, or custom iterables.
💡 True Streaming: Large files, database cursors, and API paginations are consumed as they're produced - never loading the entire dataset into memory.
Arrays and Basic Iterables
// Simple array ->input(['a', 'b', 'c']) // Associative array ->input(['doc1' => 'text', 'doc2' => 'more text']) // ArrayIterator ->input(new ArrayIterator($data))
Files and Streams
// Read file line by line (memory-efficient) ->input(new SplFileObject('large-file.txt')) // Generator from file ->input((function() { $handle = fopen('data.csv', 'r'); while (($line = fgets($handle)) !== false) { yield $line; } fclose($handle); })())
CSV Files
// Parse CSV rows ->input((function() { $file = fopen('data.csv', 'r'); fgetcsv($file); // Skip header while (($row = fgetcsv($file)) !== false) { yield [ 'name' => $row[0], 'value' => $row[1], ]; } fclose($file); })())
JSON Lines (JSONL)
// Process newline-delimited JSON ->input((function() { $file = fopen('data.jsonl', 'r'); while (($line = fgets($file)) !== false) { yield json_decode($line, true); } fclose($file); })())
Database Results
// PDO result set $stmt = $pdo->query('SELECT * FROM large_table'); ->input($stmt) // Generator for memory efficiency ->input((function() use ($pdo) { $stmt = $pdo->prepare('SELECT * FROM users'); $stmt->execute(); while ($row = $stmt->fetch()) { yield $row; } })())
Directory Scanning
// Process all files in a directory ->input(new RecursiveIteratorIterator( new RecursiveDirectoryIterator('/path/to/logs') )) // Custom file filter ->input((function() { $dir = new RecursiveDirectoryIterator('/path/to/data'); $iterator = new RecursiveIteratorIterator($dir); foreach ($iterator as $file) { if ($file->isFile() && $file->getExtension() === 'log') { yield $file->getPathname() => file_get_contents($file); } } })())
API Reference
Core Methods (Required)
input(iterable $input)
Set the input data - any array or iterable.
->input($documents) ->input(new ArrayIterator($data))
map(callable $mapper)
Define the transformation function. Receives ($value, $context) and must yield [$key, $value] pairs.
->map(function ($value, $context) { // Process and emit key-value pairs yield [$newKey, $newValue]; })
reduce(callable $reducer)
Define the aggregation function using an accumulator + iterator pattern.
The reducer receives ($key, $valuesIterator, $context) where:
$key- The current key being reduced$valuesIterator- Iterator that streams values one at a time for memory efficiency$context- Optional shared context data (see context() method)
You manually accumulate results by iterating over the values. This approach never loads all values for a key into memory at once, which is critical for keys with many values (e.g., a global counter with millions of entries).
// Accumulator pattern - iterate and accumulate manually ->reduce(function ($key, $valuesIterator, $context) { $sum = 0; // Accumulator foreach ($valuesIterator as $value) { $sum += $value; // Accumulate } return $sum; }) // Using context data ->reduce(function ($key, $valuesIterator, $context) { $threshold = $context['threshold']; $sum = 0; foreach ($valuesIterator as $value) { if ($value > $threshold) { $sum += $value; } } return $sum; }) // If you need the full array (uses memory): ->reduce(function ($key, $valuesIterator, $context) { $values = iterator_to_array($valuesIterator); return ['count' => count($values), 'unique' => array_unique($values)]; })
context(mixed $data)
Pass shared data to mapper and reducer functions executing in parallel processes.
Because mapper and reducer functions run in separate PHP processes, you cannot use closure use clauses to access external variables. Use context() instead to pass serializable data that will be available to all workers.
// ❌ This doesn't work - variables won't be available in parallel processes $threshold = 100; ->map(function ($value, $context) use ($threshold) { // Won't work! if ($value > $threshold) yield [$value, 1]; }) // ✅ Use context() instead ->context(['threshold' => 100]) ->map(function ($value, $context) { if ($value > $context['threshold']) yield [$value, 1]; })
The context is passed as an additional parameter to both mapper and reducer:
- Mapper:
function($value, $context) - Reducer:
function($key, $valuesIterator, $context)
See Parallel Processing & Context for details.
execute(): Generator
Run the job and return results as a generator
Configuration Methods (Optional)
concurrent(int $workers)
Number of parallel worker processes. Default: 4
->concurrent(8) // Spawn 8 parallel PHP processes
Tip: Match your CPU core count. For I/O-bound tasks, you can use 2-3x cores.
partitions(int $count)
Number of reduce partitions for parallel processing. Default: same as concurrent()
->partitions(16) // More parallelism in reduce phase
Tip: Use 1-2x concurrency level. More partitions = better parallelism but more overhead.
mapperBatchSize(int $items)
Controls: Items sent to each parallel worker process. Default: 500
Determines how many input items are batched together and sent to a worker process. This is critical for memory management because it affects how many tasks the parent process creates.
->mapperBatchSize(2000) // Large datasets - reduce process overhead ->mapperBatchSize(100) // Small datasets - better load balancing
Tradeoffs:
- Smaller batches = Better load balancing, MORE tasks = MORE parent memory
- Larger batches = Less overhead, fewer tasks, LESS parent memory
Guidelines:
- Small datasets (<10K records): 100-500 (default)
- Medium datasets (10K-100K): 500-2,000
- Large datasets (100K-1M): 2,000-5,000
- Very large datasets (>1M): 5,000-10,000
⚠️ Critical: Batch Size vs MemoryParent process memory is proportional to the number of tasks, not dataset size:
- 1M records ÷ 500 batch = 2,000 tasks (may use 100MB+ parent memory)
- 1M records ÷ 5,000 batch = 200 tasks (uses ~10MB parent memory)
For large datasets (>100K records), use larger batch sizes to reduce parent process memory overhead.
shuffleChunkSize(int $records)
Controls: Memory usage during shuffle phase external sorting. Default: 10000
Determines how many records accumulate in memory before being sorted and written as a chunk file during external sorting. Only affects the shuffle phase.
->shuffleChunkSize(50000) // High-memory system - faster sorting ->shuffleChunkSize(2000) // Memory-constrained
Memory impact: shuffleChunkSize × average_record_size bytes per sort operation
Guidelines:
- Small datasets (<100K): 5,000-10,000
- Medium datasets (100K-1M): 10,000-50,000 (default: 10,000)
- Large datasets (>1M): 50,000-100,000
bufferSize(int $records)
Controls: File I/O write buffering across all phases. Default: 1000
Determines how many records are buffered in memory before flushing to disk. Affects the frequency of fwrite() system calls. Used by map, shuffle, and reduce phases.
->bufferSize(5000) // Large datasets, plenty of RAM ->bufferSize(500) // Memory-constrained
Memory impact: bufferSize × average_record_size × num_partitions bytes in map phase (each partition has its own buffer)
Guidelines:
- Small datasets (<10K): 100-500
- Medium datasets (10K-1M): 1000-5000 (default: 1000)
- Large datasets (>1M): 5000-10000
💡 Three Independent Parameters:
mapperBatchSize- Controls worker granularity & parent memory (map phase only)shuffleChunkSize- Controls sorting memory (shuffle phase only)bufferSize- Controls write batching (all phases)These handle different concerns:
- Large
mapperBatchSizereduces process spawning overhead AND parent memory (critical for >100K records)- Large
shuffleChunkSizecreates fewer chunk files to merge- Large
bufferSizereduces system call overheadMost impactful for large datasets: Increase
mapperBatchSizefirst!
workingDirectory(string $path)
Directory for temporary files. Default: system temp
->workingDirectory('/mnt/fast-ssd/tmp')
Tip: Use SSD storage for better performance on large datasets.
autoload(string $path)
Configure autoloader path for parallel worker processes. Default: none
When using parallel processing, worker processes spawn in separate contexts and may need to load dependencies. This method specifies the path to your autoloader (typically vendor/autoload.php).
->autoload(__DIR__ . '/vendor/autoload.php')
When to use:
- When mapper/reducer functions use classes from external libraries
- When using custom classes that need autoloading
- If you get "class not found" errors in worker processes
Note: Most of the time you won't need this - the library handles common cases automatically. Only use if you encounter class loading issues in parallel workers.
partitionBy(callable $partitioner)
Custom function to control which partition a key goes to. Receives ($key, $numPartitions, $context) and must return 0 to partitions-1.
->partitionBy(function ($key, $numPartitions, $context) { return ord($key[0]) % $numPartitions; // Partition by first letter })
Examples
Aggregating Sales Data
$sales = [ ['product' => 'Widget', 'amount' => 100], ['product' => 'Gadget', 'amount' => 150], ['product' => 'Widget', 'amount' => 200], ]; $totals = (new MapReduceBuilder()) ->input($sales) ->map(fn($sale, $context) => yield [$sale['product'], $sale['amount']]) ->reduce(function($product, $amountsIterator, $context) { // Collect amounts to calculate statistics $amounts = iterator_to_array($amountsIterator); return [ 'total' => array_sum($amounts), 'average' => array_sum($amounts) / count($amounts), 'count' => count($amounts), ]; }) ->execute(); foreach ($totals as $product => $stats) { echo "{$product}: {$stats['total']}\n"; }
Building an Inverted Index
$documents = [ ['id' => 1, 'text' => 'the quick brown fox'], ['id' => 2, 'text' => 'the lazy dog'], ['id' => 3, 'text' => 'quick brown animals'], ]; $invertedIndex = (new MapReduceBuilder()) ->input($documents) ->map(function ($doc, $context) { foreach (array_unique(str_word_count(strtolower($doc['text']), 1)) as $word) { yield [$word, $doc['id']]; } }) ->reduce(function($word, $docIdsIterator, $context) { // Collect document IDs to calculate frequency and uniqueness $docIds = iterator_to_array($docIdsIterator); return [ 'documents' => array_unique($docIds), 'frequency' => count($docIds), ]; }) ->execute(); foreach ($invertedIndex as $word => $index) { echo "{$word}: appears in {$index['frequency']} documents\n"; }
Log Analysis
$stats = (new MapReduceBuilder()) ->input(file('access.log')) ->map(function ($line, $context) { if (preg_match('/^(\S+).*?"GET (\S+).*?" (\d+)/', $line, $m)) { yield ["ip:{$m[1]}", 1]; yield ["status:{$m[3]}", 1]; } }) ->reduce(function($key, $countsIterator, $context) { $sum = 0; foreach ($countsIterator as $count) { $sum += $count; } return $sum; }) ->concurrent(8) ->execute(); foreach ($stats as $key => $count) { echo "{$key}: {$count}\n"; }
Using Context for Shared Data
// Pre-calculated lookup tables or configuration $userTiers = ['alice' => 'gold', 'bob' => 'silver', 'charlie' => 'bronze']; $tierMultipliers = ['gold' => 1.5, 'silver' => 1.2, 'bronze' => 1.0]; $purchases = [ ['user' => 'alice', 'amount' => 100], ['user' => 'bob', 'amount' => 150], ['user' => 'charlie', 'amount' => 200], ]; $results = (new MapReduceBuilder()) ->input($purchases) // Pass shared data via context (not via 'use' clause!) ->context([ 'userTiers' => $userTiers, 'tierMultipliers' => $tierMultipliers, ]) ->map(function ($purchase, $context) { // Access context data in mapper $tier = $context['userTiers'][$purchase['user']] ?? 'bronze'; $multiplier = $context['tierMultipliers'][$tier]; $bonusPoints = $purchase['amount'] * $multiplier; yield [$purchase['user'], $bonusPoints]; }) ->reduce(function ($user, $pointsIterator, $context) { // Context also available in reducer $tier = $context['userTiers'][$user]; // Stream and sum points without loading all into memory $totalPoints = 0; foreach ($pointsIterator as $points) { $totalPoints += $points; } return [ 'total_points' => $totalPoints, 'tier' => $tier, ]; }) ->execute(); foreach ($results as $user => $data) { echo "{$user} ({$data['tier']}): {$data['total_points']} points\n"; }
Parallel Processing & Context
Important: Closure use Clauses Don't Work
Because MapReduce uses true parallel processing with separate PHP processes (via spatie/async), variables captured with use clauses are NOT available in mapper and reducer functions.
// ❌ BROKEN - Variables won't be available in parallel processes $totalUsers = 1000; $lookupTable = ['a' => 1, 'b' => 2]; (new MapReduceBuilder()) ->input($data) ->map(function ($item, $context) use ($lookupTable) { // ❌ $lookupTable will be NULL or empty here! $value = $lookupTable[$item['key']]; // Won't work! yield [$item['key'], $value]; }) ->reduce(function ($key, $valuesIterator, $context) use ($totalUsers) { // ❌ $totalUsers will be NULL or 0 here! $sum = 0; foreach ($valuesIterator as $value) { $sum += $value; } $percentage = $sum / $totalUsers; // Won't work! return $percentage; }) ->execute();
Solution: Use context() Method
Pass data via the context() method, which serializes and provides it to all workers:
// ✅ CORRECT - Use context() to pass data $totalUsers = 1000; $lookupTable = ['a' => 1, 'b' => 2]; (new MapReduceBuilder()) ->input($data) ->context([ 'totalUsers' => $totalUsers, 'lookupTable' => $lookupTable, 'config' => ['threshold' => 50] ]) ->map(function ($item, $context) { // ✅ Access via $context parameter $value = $context['lookupTable'][$item['key']]; yield [$item['key'], $value]; }) ->reduce(function ($key, $valuesIterator, $context) { // ✅ Context available in reducer too $sum = 0; foreach ($valuesIterator as $value) { $sum += $value; } $percentage = $sum / $context['totalUsers']; return $percentage; }) ->execute();
Context Guidelines
What can be passed as context:
- Arrays, scalars (int, float, string, bool)
- Objects (as long as they're serializable - no resources, DB connections, or file handles)
- Nested data structures
- Class constants and configuration values
Best practices:
- Keep context reasonably sized (< 100MB recommended) - it's serialized to each worker
- Context is read-only - changes in one worker won't affect others
- All data must be serializable (no resources, closures, or database connections)
Real-world example (collaborative filtering):
// Phase 1: Calculate counts (stored in memory) $totalBuyers = 10000; $eventCounts = [ 'follow' => ['seller1' => 500, 'seller2' => 300], 'purchase' => ['seller1' => 200, 'seller2' => 150], ]; // Phase 2: Use counts to calculate scores with MapReduce $results = (new MapReduceBuilder()) ->input($cooccurrences) ->context([ 'totalBuyers' => $totalBuyers, 'eventCounts' => $eventCounts, 'minScore' => 5.0 ]) ->reduce(function ($key, $valuesIterator, $context) { // Extract shared data from context $totalBuyers = $context['totalBuyers']; $eventCounts = $context['eventCounts']; // Collect values for calculation $values = iterator_to_array($valuesIterator); // Use in calculations $score = calculateScore($values, $totalBuyers, $eventCounts); if ($score >= $context['minScore']) { return ['score' => $score, 'count' => count($values)]; } return null; }) ->execute();
Performance Tuning
Quick Reference
(new MapReduceBuilder()) ->concurrent(8) // CPU cores (or 2-3x for I/O tasks) ->partitions(16) // 1-2x concurrency for reduce parallelism ->mapperBatchSize(2000) // Items per worker batch (larger = less overhead) ->shuffleChunkSize(50000) // Shuffle sort memory (larger = fewer merges) ->bufferSize(5000) // Write buffer (larger = fewer syscalls) ->workingDirectory('/ssd') // Use fast storage for large jobs ->autoload(__DIR__ . '/vendor/autoload.php') // Optional: for custom class loading
Memory vs Performance
Low Memory System (2GB RAM):
->mapperBatchSize(200) // Small batches to reduce serialization ->shuffleChunkSize(2000) // Small sort chunks ->bufferSize(500) // Small write buffers
High Performance System (SSD, 32GB RAM):
->mapperBatchSize(5000) // Large batches to reduce process overhead ->shuffleChunkSize(100000) // Large sort chunks for fast sorting ->bufferSize(10000) // Large buffers to minimize disk I/O ->workingDirectory('/mnt/fast-ssd/tmp')
Tuning independently:
- Large dataset (>100K records)? CRITICAL: Increase
mapperBatchSizeto drastically reduce parent process memory (can drop from 100MB+ to <10MB) - Slow disk? Increase
bufferSizeto batch more writes - Limited RAM during shuffle? Decrease
shuffleChunkSizeto reduce sort memory - Many concurrent workers + partitions? Decrease
bufferSize(memory = workers × partitions × bufferSize) - Parent process using too much memory? Increase
mapperBatchSize(fewer tasks = less overhead)
Benchmarks
Tested on Apple M2 Pro (10-core), 16GB RAM, SSD with spatie/async parallel processing:
| Records | Concurrency | Time | Parent Memory* | Batch Size |
|---|---|---|---|---|
| 10K | 4 | 0.3s | 6MB | 500 |
| 100K | 4 | 1.2s | 0MB | 2,000 |
| 1M | 8 | 7.9s | 8MB | 5,000 |
| 10M | 8 | 1m23s | 10MB | 5,000 |
*Parent process memory only - worker processes use separate memory space
Key Insight: Using larger mapperBatchSize for datasets >100K dramatically reduces parent memory (from 100MB+ to <10MB) by creating fewer tasks. See the critical callout in mapperBatchSize() documentation above.
Run your own benchmarks:
php bin/benchmark-readme.php
Testing
Run tests:
composer test # Run test suite composer test-coverage # With coverage report composer analyse # Static analysis (PHPStan) composer cs-check # Code style check (PSR-12)
Limitations
- Single machine only (not a distributed cluster)
- Requires disk space for intermediate files
- Process spawning overhead makes it inefficient for tiny datasets (<1000 records)
- Closure
useclauses don't work in mapper/reducer functions due to parallel processing - usecontext()method instead (see Parallel Processing & Context)
License
MIT License - see LICENSE file