Implements a simple one-machine map reduce with no external dependencies.

2 min read Original article ↗
import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; // Tiny Shakespeare: https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt public class SimpleMapReduce { static class Mapper implements Callable<ConcurrentHashMap<String, Integer>> { List<String> input; Mapper(List<String> input) { this.input = input; } @Override public ConcurrentHashMap<String, Integer> call() { return input.stream() .flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .collect(Collectors.toConcurrentMap( word -> word, word -> 1, Integer::sum, ConcurrentHashMap::new )); } } public static void main(String[] args) { String path = "src/input.txt"; int numThreads = 10; try { List<String> lines = Files.readAllLines(Paths.get(path)); List<List<String>> chunks = createChunks(lines, numThreads); processChunks(chunks, numThreads) .forEach((key, value) -> System.out.println(key + ": " + value)); } catch (IOException e) { e.printStackTrace(); } } static List<List<String>> createChunks(List<String> lines, int numChunks) { int chunkSize = (int) Math.ceil((double) lines.size() / numChunks); return IntStream.range(0, numChunks) .mapToObj(i -> lines.subList( i * chunkSize, Math.min((i + 1) * chunkSize, lines.size()) )) .collect(Collectors.toList()); } static ConcurrentHashMap<String, Integer> processChunks(List<List<String>> chunks, int numThreads) { ExecutorService executorService = Executors.newFixedThreadPool(numThreads); ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<>(); try { List<Future<ConcurrentHashMap<String, Integer>>> futures = executorService.invokeAll( chunks.stream().map(Mapper::new).collect(Collectors.toList()) ); for (Future<ConcurrentHashMap<String, Integer>> future : futures) { try { ConcurrentHashMap<String, Integer> map = future.get(); map.forEach((key, value) -> result.merge(key, value, Integer::sum)); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { shutdownExecutor(executorService); } return result; } static void shutdownExecutor(ExecutorService executorService) { executorService.shutdown(); try { if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { e.printStackTrace(); } } }