Librarian
Librarian is a cloud-native database replicator designed as a modern replacement for Kafka Connect. It uses native database replication technologies like MongoDB Change Streams and PostgreSQL logical replication to efficiently stream data changes to message brokers and storage systems.
Why Librarian?
- Single Binary: No JVM, no external dependencies, no connector management
- Cloud Native: Built-in metrics, tracing, and observability
- Lightweight: Runs on modest hardware with minimal resource overhead
- Simple Configuration: URL-based source and target configuration
Supported Sources
- MongoDB (Change Streams)
- PostgreSQL (Logical Replication)
Supported Targets
- Kafka
- S3 (Parquet)
- Local filesystem
Quickstart
Stream MongoDB changes to Kafka in minutes.
1. Start Backing Services
Start MongoDB (with replica set) and Kafka:
make start-backing-services
2. Start the Replicator
Run the replicator to stream changes from MongoDB to Kafka:
LIBRARIAN_LOG_LEVEL=DEBUG go run cmd/librarian/main.go archiver replicate \ --source "mongodb://localhost:27017/test?authSource=admin&collection=users" \ --id=mongodb.test.users \ -t "kafka://localhost:9092/order-events"
You should see output indicating the replicator has started:
2025-11-19T08:22:38.855-0500 INFO librarian.replicator archiver/replicate.go:69 starting replicator!
2025-11-19T08:22:38.856-0500 INFO librarian.replicator archiver/replicate.go:71 replicating data...
2025-11-19T08:22:38.856-0500 INFO librarian.replicator archiver/replicate.go:86 initializing MongoDB source {"url": "mongodb://localhost:27017/test?authSource=admin&collection=users"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator archiver/replicate.go:112 initializing kafka target {"url": "kafka://localhost:9092/order-events"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/replicator.go:142 Replicator created {"state": "created"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/server.go:40 replicator registered {"replicator_id": "mongodb.test.users", "state": "created"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/server.go:156 starting replicator server {"addr": ":8080"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator.fsm replicator/fsm.go:124 State transitioned {"state": "connecting", "from": "created"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/replicator.go:159 Starting replicator {"state": "connecting", "source-options": "{CheckpointBatchSize:0 EmptyPollInterval:500ms}", "target-options": "{FlushTimeout:0s}"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/checkpoint.go:65 No checkpoint found {"replicator_id": "mongodb.test.users"}
2025-11-19T08:22:38.856-0500 INFO librarian.replicator replicator/replicator.go:184 No checkpoint found, starting fresh {"replicator_id": "mongodb.test.users"}
3. Insert a Test Record
In another terminal, connect to MongoDB and insert a record:
docker exec -it dev_mongodb_1 mongoshrs0 [direct: primary] test> db.users.insert({"name":"test"}); { acknowledged: true, insertedIds: { '0': ObjectId('691dc55aa7af9969a1b1ddf4') } }
4. Verify Replication
Back in the replicator terminal, you'll see the change event processed:
2025-11-19T08:22:39.356-0500 DEBUG librarian.replicator replicator/replicator.go:220 Change event received {"operation": "insert", "collection": "users"}
2025-11-19T08:22:39.357-0500 DEBUG librarian.replicator replicator/replicator.go:245 Message sent to Kafka {"topic": "order-events", "partition": 0}
The document is now available in your Kafka topic.
PostgreSQL to Kafka Example
Stream PostgreSQL changes to Kafka using logical replication.
1. Start the Replicator
Run the replicator to stream changes from PostgreSQL to Kafka:
LIBRARIAN_LOG_LEVEL=DEBUG go run cmd/librarian/main.go archiver replicate \ --source "postgres://test:test@localhost:5432/test?slot=librarian_users&sslmode=disable" \ --target "kafka://localhost:9092/postgres-changes" \ --id=postgres-to-kafka
You should see output indicating the replicator has started:
2025-11-19T15:07:17.718-0500 INFO librarian.replicator archiver/replicate.go:69 starting replicator!
2025-11-19T15:07:17.718-0500 INFO librarian.replicator archiver/replicate.go:71 replicating data...
2025-11-19T15:07:17.718-0500 INFO librarian.replicator archiver/replicate.go:97 initializing Postgres source {"url": "postgres://test:test@localhost:5432/test?slot=librarian_users&sslmode=disable"}
2025-11-19T15:07:17.718-0500 INFO librarian.replicator archiver/replicate.go:112 initializing kafka target {"url": "kafka://localhost:9092/postgres-changes"}
2025-11-19T15:07:17.718-0500 INFO librarian.replicator replicator/replicator.go:142 Replicator created {"state": "created"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator replicator/server.go:40 replicator registered {"replicator_id": "postgres-to-kafka", "state": "created"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator replicator/server.go:156 starting replicator server {"addr": ":8080"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator.fsm replicator/fsm.go:124 State transitioned {"state": "connecting", "from": "created"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator replicator/replicator.go:159 Starting replicator {"state": "connecting", "source-options": "{CheckpointBatchSize:0 EmptyPollInterval:500ms}", "target-options": "{FlushTimeout:0s}"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator replicator/checkpoint.go:65 No checkpoint found {"replicator_id": "postgres-to-kafka"}
2025-11-19T15:07:17.719-0500 INFO librarian.replicator replicator/replicator.go:184 No checkpoint found, starting fresh {"replicator_id": "postgres-to-kafka"}
2025-11-19T15:07:17.734-0500 INFO librarian.replicator kafka/repository.go:121 Kafka target connected {"topic": "postgres-changes", "brokers": "localhost:9092"}
2025-11-19T15:07:17.776-0500 INFO librarian.replicator postgres/source.go:582 Starting from current LSN {"lsn": "0/1995338"}
2025-11-19T15:07:17.777-0500 INFO librarian.replicator postgres/source.go:483 PostgreSQL replication started {"database": "test", "slot": "librarian_users", "publication": "librarian_pub_test", "start_lsn": "0/1995338"}
2025-11-19T15:07:17.777-0500 INFO librarian.replicator.fsm replicator/fsm.go:124 State transitioned {"state": "streaming", "from": "connecting"}
2025-11-19T15:07:17.777-0500 INFO librarian.replicator replicator/replicator.go:204 Replicator started {"state": "streaming"}
2. Insert a Test Record
In another terminal, connect to PostgreSQL and insert a record:
docker exec -it dev_postgres_1 psql -U test
test=# INSERT INTO users (account, signup_time) VALUES ('alice19example.com', NOW()); INSERT 0 1
3. Verify Replication
Back in the replicator terminal, you'll see the change event processed:
2025-11-19T15:07:21.786-0500 DEBUG librarian.replicator postgres/source.go:215 Transaction begin {"xid": 830}
2025-11-19T15:07:22.288-0500 DEBUG librarian.replicator postgres/source.go:197 Stored relation info {"relation": "users", "relation_id": 16400}
2025-11-19T15:07:22.790-0500 DEBUG librarian.replicator postgres/source.go:253 PostgreSQL INSERT event {"table": "users", "lsn": "0/1995338", "data": {"account":"alice19example.com","signup_time":"2025-11-19 20:07:20.830027"}}
2025-11-19T15:07:22.824-0500 DEBUG librarian.replicator kafka/repository.go:110 Message delivered {"topic": "postgres-changes", "partition": 0, "offset": 54}
The row is now available in your Kafka topic.
Features
- Real-time change data capture (CDC)
- Automatic checkpointing and resume
- HTTP health check endpoint (
:8080) - Configurable batch sizes and flush intervals
Stats Server
Librarian includes a built-in HTTP stats server that provides real-time visibility into your replication pipelines. Unlike traditional JVM-based connectors that expose generic JVM metrics, Librarian's stats are data-pipeline centered, giving you direct insight into what matters for debugging and monitoring your data flows.
Value
- Pipeline-First Metrics: See exactly how many events have been processed, bytes transferred, and error counts—not garbage collection stats
- Direct Debugging: Quickly identify connection issues, event errors, or stalled replicators without parsing logs
- Zero Configuration: Stats server starts automatically on port 8080 with every replicator
- Lightweight: JSON API with sub-millisecond response times
API Endpoints
GET /api/v1/replicators
Returns detailed statistics for all active replicators:
curl -s localhost:8080/api/v1/replicators | jq .
{
"count": 1,
"replicators": [
{
"id": "mongodb.test.users",
"state": "streaming",
"stats": {
"source": {
"total_events": 0,
"total_bytes": 0,
"last_event_at": "0001-01-01T00:00:00Z",
"last_connect_at": "2025-11-20T08:32:03.197972-05:00",
"connection_healthy": true,
"connection_retries": 1,
"event_error_count": 0,
"source_specific": {
"collection": "users",
"database": "test"
}
},
"target": {
"total_events": 0,
"connection_healthy": true,
"connection_retries": 0,
"event_error_count": 0,
"last_write_at": "0001-01-01T00:00:00Z",
"write_error_count": 0,
"target_specific": {
"brokers": "localhost:9092",
"topic": "order-events"
}
},
"replicator": {
"started_at": "2025-11-20T08:32:03.142297-05:00",
"uptime_seconds": 3,
"state": "streaming",
"checkpoint_count": 0,
"last_checkpoint_at": "0001-01-01T00:00:00Z",
"signals_received": 0
}
}
}
]
}Understanding the Stats
The stats API provides three levels of detail:
- Source Stats: Connection health, event counts, bytes transferred, and source-specific metadata (database, collection, table)
- Target Stats: Write performance, connection status, error counts, and target-specific metadata (brokers, topics, buckets)
- Replicator Stats: Overall state, uptime, checkpoint frequency, and signal handling
Use these stats to:
- Monitor replication lag by checking
last_event_atandlast_write_at - Detect connection issues via
connection_healthyandconnection_retries - Track error rates with
event_error_countandwrite_error_count - Verify replicator state transitions and uptime
Debezium Message Compatibility
Librarian produces change events in a Debezium-compatible message format, allowing you to use existing Debezium consumers and downstream tools without modification.
Message Structure
Each change event follows the standard Debezium envelope structure:
{
"payload": {
"before": {...},
"after": {...},
"source": {
"version": "1.0.0",
"connector": "librarian",
"name": "replicator-id",
"ts_ms": 1234567890,
"snapshot": "false",
"db": "database-name",
"schema": "schema-name",
"table": "table-name",
"lsn": 12345,
"txId": 678
},
"op": "c",
"ts_ms": 1234567890,
"transaction": {
"id": "tx-id",
"total_order": 1,
"data_collection_order": 1
}
}
}Operation Codes
Librarian uses standard Debezium operation codes:
c- Create/Insertu- Updated- Deleter- Read (snapshot)
Source Metadata
The source field contains metadata about the origin of the change event:
- MongoDB: Includes resume token information, collection name, and timestamp
- PostgreSQL: Includes LSN (Log Sequence Number), transaction ID, schema, and table name
Compatibility
Because Librarian produces Debezium-compatible messages, you can:
- Use existing Debezium consumers without modification
- Leverage Debezium-aware tools and frameworks (e.g., Kafka Connect transformations)
- Mix Librarian and Debezium connectors in the same pipeline
- Apply Debezium-specific filtering and routing logic
Direct Source Consumption
For advanced use cases, you can consume change events directly from the Postgres source without using the full replicator. This gives you fine-grained control over event processing while still leveraging Librarian's PostgreSQL logical replication handling.
Example: Consuming PostgreSQL Events
package main import ( "context" "errors" "fmt" "log" "net/url" "time" "github.com/turbolytics/librarian/pkg/postgres" "github.com/turbolytics/librarian/pkg/replicator" "go.uber.org/zap" ) func main() { ctx := context.Background() // Initialize logger logger, _ := zap.NewDevelopment() // Parse connection URL // Note: You must create the publication manually first: // CREATE PUBLICATION librarian_pub_test FOR ALL TABLES; sourceURL, err := url.Parse("postgres://test:test@localhost:5432/test?slot=librarian_slot&sslmode=disable") if err != nil { log.Fatal(err) } // Initialize the source source, err := postgres.NewSource(sourceURL, logger) if err != nil { log.Fatal(err) } // Connect to PostgreSQL (establishes replication connection, creates slot, etc.) if err := source.Connect(ctx, nil); err != nil { log.Fatal(err) } defer source.Disconnect(ctx) logger.Info("Connected to PostgreSQL, streaming changes...") // Read events from the stream for { event, err := source.Next(ctx) // ErrNoEventsFound is returned when the receive timeout expires // This is normal and just means no changes happened in the last second if errors.Is(err, replicator.ErrNoEventsFound) { continue } if err != nil { logger.Error("Error reading event", zap.Error(err)) time.Sleep(1 * time.Second) continue } // Process the event fmt.Printf("Received %s event on %s.%s\n", event.Payload.Op, event.Payload.Source.Schema, event.Payload.Source.Table) fmt.Printf(" Before: %+v\n", event.Payload.Before) fmt.Printf(" After: %+v\n", event.Payload.After) fmt.Printf(" LSN: %d\n", event.Payload.Source.Lsn) } }
Key Considerations
- Publication Setup: You must manually create a PostgreSQL publication before connecting. The source does not auto-create publications.
- Replication Slot: The source automatically creates a replication slot if one doesn't exist for the given slot name.
- No Events != Error: The
Next()method returnsErrNoEventsFoundwhen no events are available within the timeout (1 second). This is normal behavior in streaming scenarios. - Heartbeats: The source automatically handles PostgreSQL keepalive messages and sends standby status updates.
- Connection Management: Use
defer source.Disconnect(ctx)to ensure proper cleanup of replication connections. - Event Filtering: At this level, you receive all change events from tables in the publication. Apply your own filtering logic as needed.
When to Use Direct Consumption
Direct source consumption is useful when:
- Building custom event processing pipelines
- Implementing specialized transformation logic
- Integrating with non-standard targets
- Requiring fine-grained control over checkpointing and recovery
- Prototyping or debugging replication behavior
For most use cases, the full replicate command provides better ergonomics and built-in checkpointing.
License
MIT