|
import java.io.IOException; |
|
import java.nio.file.Files; |
|
import java.nio.file.Paths; |
|
import java.util.Arrays; |
|
import java.util.List; |
|
import java.util.concurrent.*; |
|
import java.util.stream.Collectors; |
|
import java.util.stream.IntStream; |
|
|
|
// Tiny Shakespeare: https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt |
|
public class SimpleMapReduce { |
|
static class Mapper implements Callable<ConcurrentHashMap<String, Integer>> { |
|
List<String> input; |
|
|
|
Mapper(List<String> input) { |
|
this.input = input; |
|
} |
|
|
|
@Override |
|
public ConcurrentHashMap<String, Integer> call() { |
|
return input.stream() |
|
.flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+"))) |
|
.filter(word -> !word.isEmpty()) |
|
.collect(Collectors.toConcurrentMap( |
|
word -> word, |
|
word -> 1, |
|
Integer::sum, |
|
ConcurrentHashMap::new |
|
)); |
|
|
|
} |
|
} |
|
|
|
public static void main(String[] args) { |
|
String path = "src/input.txt"; |
|
int numThreads = 10; |
|
|
|
try { |
|
List<String> lines = Files.readAllLines(Paths.get(path)); |
|
List<List<String>> chunks = createChunks(lines, numThreads); |
|
processChunks(chunks, numThreads) |
|
.forEach((key, value) -> System.out.println(key + ": " + value)); |
|
} catch (IOException e) { |
|
e.printStackTrace(); |
|
} |
|
} |
|
|
|
static List<List<String>> createChunks(List<String> lines, int numChunks) { |
|
int chunkSize = (int) Math.ceil((double) lines.size() / numChunks); |
|
return IntStream.range(0, numChunks) |
|
.mapToObj(i -> lines.subList( |
|
i * chunkSize, |
|
Math.min((i + 1) * chunkSize, lines.size()) |
|
)) |
|
.collect(Collectors.toList()); |
|
} |
|
|
|
static ConcurrentHashMap<String, Integer> processChunks(List<List<String>> chunks, int numThreads) { |
|
ExecutorService executorService = Executors.newFixedThreadPool(numThreads); |
|
ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<>(); |
|
|
|
try { |
|
List<Future<ConcurrentHashMap<String, Integer>>> futures = executorService.invokeAll( |
|
chunks.stream().map(Mapper::new).collect(Collectors.toList()) |
|
); |
|
|
|
for (Future<ConcurrentHashMap<String, Integer>> future : futures) { |
|
try { |
|
ConcurrentHashMap<String, Integer> map = future.get(); |
|
map.forEach((key, value) -> result.merge(key, value, Integer::sum)); |
|
} catch (ExecutionException e) { |
|
e.printStackTrace(); |
|
} |
|
} |
|
} catch (InterruptedException e) { |
|
e.printStackTrace(); |
|
} finally { |
|
shutdownExecutor(executorService); |
|
} |
|
|
|
return result; |
|
} |
|
|
|
static void shutdownExecutor(ExecutorService executorService) { |
|
executorService.shutdown(); |
|
try { |
|
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { |
|
executorService.shutdownNow(); |
|
} |
|
} catch (InterruptedException e) { |
|
e.printStackTrace(); |
|
} |
|
} |
|
} |