Runlet is a small JVM library for embeddable, batch-oriented stream processing pipelines.
It is for jobs that need more structure than hand-written loops or Flow, but
do not justify operating Flink, Kafka Streams, or Spark Streaming. Runlet runs
inside your process: no broker, no cluster, no daemon.
Status
Runlet is pre-release. APIs, module names, and behavior may change before a stable release.
Current v0 scope:
- single JVM process
- one source, one linear pipeline, one sink
- chunked execution with
Chunk<T> map,filter, andevalMap- source factories for common chunked and cursor-paged sources
- bounded channels for uncheckpointed pipelines
- serial checkpointed execution for ordered, resumable sources
- file line source, file checkpoint store, and chunk-file sink
- blocking adapters for Java and other blocking JVM integrations
- Spring
SmartLifecycleadapter - Spring Boot starter and autoconfiguration
- optional Micrometer metrics integration for Spring Boot apps
Not implemented yet:
- windowing or
groupBy - event-time semantics or watermarks
- exactly-once semantics
- distributed execution
Modules
| Module | Purpose |
|---|---|
runlet-core |
Core API, DSL, runtime, and blocking adapters. |
runlet-connector-file |
File source, file checkpoint store, and chunk-file sink. |
runlet-connector-jackson |
Jackson-backed JSON Lines source and sink helpers. |
runlet-adapter-spring |
Spring Framework SmartLifecycle integration. |
runlet-spring-boot-autoconfigure |
Spring Boot autoconfiguration. |
runlet-spring-boot-starter |
Convenience dependency for Spring Boot applications. |
runlet-sample-spring-boot |
Runnable Spring Boot sample application. |
Install
Runlet is not published to a remote Maven repository yet. For local use, publish the artifacts to your Maven local repository:
./gradlew check ./gradlew publishToMavenLocal
Then add mavenLocal() and the modules you need:
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
implementation("org.aetherlink:runlet-core:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}For Spring Boot applications, prefer the starter:
dependencies {
implementation("org.aetherlink:runlet-spring-boot-starter:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}A runnable Spring Boot sample lives in
runlet-sample-spring-boot.
Quick Start
This checkpointed file pipeline reads lines from a file, keeps completed records, transforms them, and writes replay-safe chunk files.
import kotlinx.coroutines.runBlocking import org.aetherlink.runlet.connector.file.ChunkFileSink import org.aetherlink.runlet.connector.file.FileCheckpointStore import org.aetherlink.runlet.connector.file.FileSource import org.aetherlink.runlet.dsl.Runlet fun main() = runBlocking { Runlet("orders") { source(FileSource.lines("orders.txt", chunkSize = 1024)) .checkpoint(FileCheckpointStore("orders.ckpt")) .filter { line -> line.contains("completed") } .map { line -> line.uppercase() } .sink(ChunkFileSink.lines("summaries")) }.run() }
Checkpointed pipelines run one chunk at a time:
read -> transform -> write -> commit -> persist cursor
The checkpoint cursor only advances after the sink commit returns. If write()
or commit() fails, the checkpoint does not advance.
For checkpointable sources, .sink(...) is only available after
.checkpoint(...) has been called. The DSL enforces this with types rather than
a runtime capability check.
Real-World Usage
Runlet is useful for local or embedded jobs where the input has an ordered cursor and replay is acceptable. A common shape is:
source file/API export -> validate/filter -> transform -> durable sink
For example, a service can process a partner-provided JSON Lines export at startup or on a schedule:
import kotlinx.coroutines.runBlocking import org.aetherlink.runlet.connector.file.FileCheckpointStore import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink import org.aetherlink.runlet.connector.jackson.JacksonFileSource import org.aetherlink.runlet.dsl.Runlet data class PartnerOrder( val id: String, val status: String, val totalCents: Long, ) data class OrderSummary( val id: String, val totalCents: Long, ) fun main() = runBlocking { Runlet("partner-orders") { source(JacksonFileSource.jsonLines<PartnerOrder>("imports/orders.jsonl")) .checkpoint(FileCheckpointStore("state/partner-orders.ckpt")) .filter { order -> order.status == "completed" } .map { order -> OrderSummary(order.id, order.totalCents) } .sink(JacksonChunkFileSink.jsonLines("exports/order-summaries")) }.run() }
Operationally, treat Runlet like an embedded job runner:
- Put checkpoint files on durable storage if resumability matters.
- Make checkpointed sinks replay-safe. A failed chunk may be written again
because Runlet advances the checkpoint only after
commit()succeeds. - Keep
chunkSizelarge enough to amortize overhead, but small enough that a replayed chunk is acceptable. - Use Spring Boot lifecycle integration for long-running application pipelines.
- Watch the
runletHealthIndicatorin Spring Boot apps; a failed pipeline is reported asDOWN. - Keep distributed coordination outside Runlet. If several app instances run the same pipeline against the same input/checkpoint, use an external lock or run only one active instance.
Strong production shapes for Runlet:
| Shape | Source | Sink | Checkpoint |
|---|---|---|---|
| Database backfill | Ordered primary-key scan | Table/index upsert | Last processed primary key |
| Object storage JSONL import | Object manifest or JSONL object | Database or clean output prefix | Object key plus byte offset |
| API cursor poller | Paginated API cursor | Durable storage | API cursor or page token |
| Search index backfill | Database or object storage | Elasticsearch/OpenSearch bulk index | Last primary key or object offset after successful bulk index |
These shapes describe where the model fits. Today, only file and Jackson JSONL connectors are implemented; database, object storage, API, and search connectors would live in separate optional modules.
Custom Sources
Most application code should not implement SourceReader directly. Use the
highest-level API that fits:
| Need | Use |
|---|---|
| Built-in file or JSONL processing | FileSource, JacksonFileSource, and the matching sinks |
| App-specific non-checkpointed reads | Sources.records(...) or Sources.chunks(...) |
| App-specific resumable reads | CheckpointableSources.byLongCursor(...) or CheckpointableSources.chunks(...) |
| Reusable connector modules | Implement Source, CheckpointableSource, Sink, or CheckpointStore |
For example, a database backfill can be expressed as "read the next page after this cursor" without writing a custom reader class:
import org.aetherlink.runlet.api.CheckpointableSources import org.aetherlink.runlet.connector.file.FileCheckpointStore import org.aetherlink.runlet.dsl.Runlet val source = CheckpointableSources.byLongCursor( chunkSize = 500, read = { afterId, limit -> orderDao.fetchOrdersAfter(id = afterId, limit = limit) }, cursorOf = { order -> order.id }, ) Runlet("orders-search-backfill") { source(source) .checkpoint(FileCheckpointStore("state/orders-search-backfill.ckpt")) .map(::toSearchDocument) .sink(searchIndexSink) }
The low-level reader interfaces are still public because connector modules need them, but they are not the ergonomic starting point for application pipelines.
Spring Boot
Spring Boot applications can register Runlet pipelines as beans. The starter
creates a shared coroutine scope, wraps each registration in a SmartLifecycle,
and starts/stops pipelines with the application context.
import org.aetherlink.runlet.adapter.spring.boot.RunletPipelineRegistration import org.aetherlink.runlet.api.RunletRuntimeConfig import org.aetherlink.runlet.connector.file.FileCheckpointStore import org.aetherlink.runlet.dsl.Runlet import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @Configuration class PipelineConfiguration { @Bean fun orderCheckpointStore(): FileCheckpointStore = FileCheckpointStore("state/orders.ckpt") @Bean fun ordersPipeline( runletRuntimeConfig: RunletRuntimeConfig, orderCheckpointStore: FileCheckpointStore, ): RunletPipelineRegistration = RunletPipelineRegistration("orders") { Runlet("orders", config = runletRuntimeConfig) { source(orderSource) .checkpoint(orderCheckpointStore) .map(::summarize) .sink(orderSink) } } }
RunletRuntimeConfig is auto-configured by the starter from
runlet.runtime.*. Today it controls the bounded channel capacity used by
uncheckpointed pipelines. You can inject it into Runlet(...) as shown above or
construct your own config manually outside Spring Boot.
orderCheckpointStore is application-owned checkpoint storage. The example uses
FileCheckpointStore, which persists the last completed cursor to
state/orders.ckpt. In production, put that file on durable storage or provide
your own CheckpointStore backed by a database or object storage.
application.yml:
runlet: enabled: true threads: 4 shutdown-timeout: 30s health: enabled: true metrics: enabled: true runtime: channel-capacity: 4
Connector-specific settings, such as FileSource.lines(..., chunkSize = 1024),
are still chosen when constructing that source.
When Spring Boot's health module is on the classpath, Runlet contributes a
runletHealthIndicator. It reports UP when registered pipelines have no
recorded failure and DOWN when one or more pipelines fail.
When Micrometer is on the classpath and a MeterRegistry bean exists, Runlet
also contributes a pipeline metrics observer. Actuator-enabled Spring Boot apps
typically provide that registry. Metrics can be disabled with
runlet.metrics.enabled=false.
Published meters include:
| Meter | Type | Tags | Meaning |
|---|---|---|---|
runlet.pipeline.starts |
counter | pipeline |
Pipeline run starts. |
runlet.pipeline.completions |
counter | pipeline |
Pipeline run completions. |
runlet.pipeline.failures |
counter | pipeline, exception |
Pipeline run failures. |
runlet.pipeline.chunks |
counter | pipeline |
Chunks committed after successful sink commit. |
runlet.pipeline.records |
counter | pipeline |
Records committed after successful sink commit. |
runlet.pipeline.running |
gauge | pipeline |
1 while a pipeline is running, otherwise 0. |
runlet.pipeline.last.success.epoch.seconds |
gauge | pipeline |
Last successful completion time. |
runlet.pipeline.last.failure.epoch.seconds |
gauge | pipeline |
Last failure time. |
Runtime Model
Runlet moves records through a pipeline as chunks, not one record at a time. Stages still use ordinary per-record functions, but the runtime batches the plumbing around them.
Uncheckpointed pipelines run stages concurrently with bounded channels between the source, stages, and sink:
import org.aetherlink.runlet.api.RunletRuntimeConfig Runlet( name = "fast-path", config = RunletRuntimeConfig(channelCapacity = 4), ) { source(mySource) .map(::normalize) .evalMap(::enrich) .sink(mySink) }.run()
Checkpointed pipelines intentionally stay serial in v0 because cursor advancement depends on sink durability.
JSON Lines
The file connector supports serializer-agnostic JSON Lines by accepting decode/encode functions:
val source = FileSource.jsonLines( path = "orders.jsonl", decode = ::decodeOrder, ) val sink = ChunkFileSink.jsonLines( directory = "summaries", encode = ::encodeSummary, )
For Jackson, add runlet-connector-jackson and use the Jackson factories:
import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink import org.aetherlink.runlet.connector.jackson.JacksonFileSource val source = JacksonFileSource.jsonLines<Order>("orders.jsonl") val sink = JacksonChunkFileSink.jsonLines<OrderSummary>("summaries")
Blocking Adapters
Java and blocking JVM integrations can implement blocking interfaces and adapt them into Runlet's coroutine contracts:
import org.aetherlink.runlet.adapter.blocking.BlockingSink import org.aetherlink.runlet.adapter.blocking.asSink import org.aetherlink.runlet.api.Chunk class ConsoleBlockingSink : BlockingSink<String> { override fun write(chunk: Chunk<String>) { chunk.records.forEach(::println) } } val sink = ConsoleBlockingSink().asSink()
Blocking adapter calls run on Dispatchers.IO.
Spring Framework
Applications that use Spring Framework without Spring Boot can wrap a pipeline
as a SmartLifecycle bean:
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher import org.aetherlink.runlet.adapter.spring.SpringPipelineLifecycle import java.util.concurrent.Executors val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher() val scope = CoroutineScope(SupervisorJob() + dispatcher) val lifecycle = SpringPipelineLifecycle( pipeline = pipeline, scope = scope, onFailure = { failure -> logger.error("Runlet pipeline failed", failure) }, )
Development
Run the full verification suite:
This runs compilation, tests, and ktlint.
Useful tasks:
./gradlew test
./gradlew ktlintCheck
./gradlew ktlintFormat
./gradlew publishToMavenLocalDesign Notes
Non-Goals
If you need event-time correctness, exactly-once distributed processing, or horizontal scale, use Flink, Kafka Streams, or Spark Streaming. Runlet is for small, local, embeddable JVM pipelines.