Data lineage refers to the journey data takes — where it comes from, how it gets transformed, and where it ends up. This information provides valuable insight for understanding and debugging complex workflows. In this story, we explore how we can integrate Celery with Marquez, a data lineage visualisation tool, using the Python OpenLineage package.
Press enter or click to view image in full size
Introduction
I’ve heard about data lineage, open lineage, and Marquez some time ago, maybe even a year before writing this post. I remember not knowing what the hell they were, but it seemed like some visualisation tool for data pipelines.
At our team we think of Celery workflows as data pipelines. Furthermore, we’re missing a nice tool to visualise the relationship between tasks and the data being passed between them. So a thought occurred — could we use Marquez to display the execution of Celery tasks? For example, could we use Marquez to display the relationships and data that we want to see?
This is the first story in a series where we explore sending Celery data to Marquez using the Python OpenLineage package.
Definitions
Before we begin, we need to relate entities in Celery with those in OpenLineage. When talking about OpenLineage, we are going to stick to Events, Jobs, and Runs in this story. We are also going to touch on facets.
A Job represents a process that consumes, produces, or transforms datasets. More concretely, a Job can be a task in a workflow, a model, query, or checkpoint. A Job can represent an arbitrary amount of work. Jobs have a unique name within a namespace.
For our Celery implementation, a Job will be a Celery task, the name will be the name of the task, and the namespace will be something we define.
An event represents a discrete observation about the lifecycle of a job. It is usually emitted when something important happens. For example, when a job is started, completed, aborted, or failed. We can also emit RUNNING events to indicate that the job is still running. An event communicates that a job changed its running state.
We already decided that Celery tasks will be represented by Jobs. It makes sense to emit events using the lifecycle hooks that Celery provides. We will use the following:
before_start— to set theSTARTrun state, which indicates that a task has started;on_success— to set theCOMPLETErun state, indicating that the task has successfully finished;on_failure— to set theFAILrun state, indicating that the task failed.
We will also send the RUNNING run state when the __call__ function is invoked.
Runs represent one instance of a jobs execution in time [2]. For example, if a running a daily cron job, each day we will have a new run instance. A run is identified by a unique ID. It is recommended to use the UUIDv7 format for the run id.
It seems appropriate to use Celery’s task ids as run ids.
Facets are used to provide additional context and metadata to the OpenLineage events. A facet is a key-value object that contains specific metadata. The main types of facets include job, run and dataset facets.
We can also create custom facets for our runs. Next, let’s start going into the code.
Docker compose
The project is set up using docker compose. Here is the complete file that I use:
services:
redis:
image: 'redis:alpine'
container_name: celery-lineage-redis
healthcheck:
test: [ "CMD", "redis-cli","ping" ]
interval: 10s
retries: 5
start_period: 5s
timeout: 10s
volumes:
- ./redis_data:/data
ports:
- '6379:6379' worker:
image: celery-lineage
volumes:
- .:/app
depends_on:
- redis
- scheduler
- marquez
env_file:
- .env
entrypoint: [ './entrypoint.sh', 'worker', 'default' ]
scheduler:
build: .
image: celery-lineage
container_name: celery-lineage-scheduler
volumes:
- .:/app
depends_on:
- redis
environment:
- instance=scheduler
env_file:
- .env
entrypoint: ['./entrypoint.sh', 'scheduler']
marquez:
image: marquezproject/marquez:latest
container_name: marquez
env_file:
- .env
environment:
- PROXY_CHANGE_ORIGIN=true
depends_on:
- marquez-db
links:
- marquez-db:postgres
ports:
- "9000:9000"
- "9001:9001"
# volumes:
# - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh
# entrypoint: [ "./wait-for-it.sh", "postgres:5432", "--", "./entrypoint.sh" ]
marquezweb:
image: marquezproject/marquez-web:latest
container_name: marquezweb
env_file:
- .env
environment:
- REACT_APP_ADVANCED_SEARCH=false
ports:
- "3000:3000"
stdin_open: true
tty: true
depends_on:
- marquez
marquez-db:
image: postgres:14
container_name: marquez-db
environment:
- POSTGRES_USER=marquez
- POSTGRES_PASSWORD=marquez
- MARQUEZ_USER=marquez
- MARQUEZ_PASSWORD=marquez
- MARQUEZ_DB=marquez
volumes:
- marquez-db-data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
marquez-db-data:
As you can see, there are some lines commented out. The official GitHub repo for Marquez ([3]) contains some examples of running it with docker compose, and each contains this wait-for-it.sh bash script, which I omitted in my repo. You can find the official repo here, and examples here and here.
The first important point here is in the marquez service:
links:
- marquez-db:postgresThis allows the marquez service to communicate with the postgres database service (marquez-db) using postgres as the hostname.
In the env file, we need to define the host, port, admin port for Marquez:
MARQUEZ_HOST=marquez
MARQUEZ_PORT=9000
MARQUEZ_ADMIN_PORT=9001OpenLineage client
We define a separate client class that will create the OpenLineage client with HTTP transport.
We define the http transport:
http_config = HttpConfig(
url=config.MARQUEZ_URL, # this will be http://marquez:9000
endpoint="api/v1/lineage",
timeout=120,
verify=False,
compression=HttpCompression.GZIP,
)http_transport = HttpTransport(http_config)
And we define the client class with submit_event and get_job_name_from_task_name methods:
class LineageClient:
def __init__(self):
self.lineage_client = OpenLineageClient(transport=http_transport) def submit_event(
self,
event_type: RunState,
run_id: str,
name: str,
namespace="default",
):
run_event = events.create_event(
event_type=event_type,
run_id=run_id,
name=name,
namespace=namespace,
)
self.lineage_client.emit(run_event)
def get_job_name_from_task_name(self, task_name: str):
return task_name.split(".")[-1]
This is just a helper class to wrap the OpenLineage client, and provide a (hopefully) simpler way to create OpenLineage instances that we need.
In the submit_event method, we first create the event, and then simply emit it. I defined the create_event method in a separate module:
def create_job(namespace: str, name: str) -> Job:
return Job(namespace=namespace, name=name)def create_run(run_id: str) -> Run:
return Run(runId=run_id)
def create_event(
event_type: RunState,
run_id: str,
name: str,
namespace,
):
return RunEvent(
eventType=event_type,
eventTime=datetime.now(UTC).isoformat(),
run=create_run(run_id=run_id),
job=create_job(namespace, name),
producer=name,
)
Nothing special going on in the above code, we just create the required instances for the RunEvent and the RunEvent instance.
Now for the fun part… we define a custom task class to use the client we implemented.
The custom task class
Considering the definitions we discussed previously, it makes sense to extend the base task class from Celery. This will allow us to send the appropriate run states based on the task’s lifecycle. It also gives us access to the task id that we will use for the run id.
In the constructor, we define the attributes that we will need:
class LineageTask(Task):
def __init__(self):
self.client = None
self.task_job_name = NoneIn this case, we want to have attributes for the client (LineageClient) and job_name.
Get Marin Aglić’s stories in your inbox
Join Medium for free to get updates from this writer.
In the before_start method we will create an instance of the LineageClient task we defined, assign the job_name attribute, and send the START run state:
def before_start(self, task_id, args, kwargs):
self.client = client.LineageClient() self.task_job_name = self.client.get_job_name_from_task_name(self.name)
self.client.submit_event(
event_type=RunState.START,
run_id=self.request.id,
name=self.task_job_name,
)
We can get the task id from self.request.id. In the call function, we send the RUNNING state:
def __call__(self, *args, **kwargs):
self.client.submit_event(
event_type=RunState.RUNNING,
run_id=self.request.id,
name=self.task_job_name,
) result = super().__call__(*args, **kwargs)
return result
And in the on_success and on_failure callbacks we send COMPLETE and FAIL respectively:
def on_success(self, retval, task_id, args, kwargs):
self.client.submit_event(
event_type=RunState.COMPLETE,
run_id=task_id,
name=self.task_job_name,
)# pylint: disable=too-many-arguments
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.client.submit_event(
event_type=RunState.FAIL,
run_id=task_id,
name=self.task_job_name,
)
When instantiating the Celery app, I set the LineageTask class as the default for my Celery app:
Celery("lineage", task_cls="lineage.open_lineage.lineage_task:LineageTask")If you don’t want it to be the default task class, you can set it to any task you want like this:
@app.task(base=LineageTask)
def pipeline():
passDemonstration pipelines
In this section, I briefly give the implementation of the pipelines used for the demonstration.
The simple_pipelines:
@app.task
def forty_two(*args, **kwargs):
return 42@app.task
def printer(*args, **kwargs):
print("Hello from printer :-)")
@app.task
def lineage_pipeline(*args, **kwargs):
return (forty_two.s() | printer.s(test="hello")).apply_async()
The slow_task:
@app.task
def slow_task():
iterations = 30
sleep_seconds = 1 for i in range(iterations):
time.sleep(sleep_seconds)
print(f"Iteration: {i}")
And the failing pipeline:
@app.task
def failure(self, *args, **kwargs):
raise ValueError("I'm supposed to fail")@app.task
def pipeline():
return failure.s().apply_async()
Running the example
The pipelines are scheduled to run with a countdown of 35 seconds — this is enough time for the Marquez service to startup and do the required initialisations.
simple_pipeline.lineage_pipeline.s().apply_async(countdown=35)
slow_task().s().apply_async(countdown=35)
failing_pipeline.pipeline.s().apply_async(countdown=35)Commands are defined in the Makefile so that the entire project can be started simply with make run or make clean-run.
The Marquez UI
When we start up the project, we need to wait for the pipelines to execute. Once the pipelines are executed, we can see the jobs on the Marquez web UI:
Press enter or click to view image in full size
We can see in the above image that 4 tasks (jobs) completed successfully, 1 failed, and 1 is still running. The name of the job is the last part of the default task name (since we didn’t explicitly assign a task name). At some point, the slow_task will also enter the COMPLETED state.
If we re-run the example using make run, and select a job, we can take a look at its run history:
Press enter or click to view image in full size
We can see some useful information here, such as when the job was first created, last time it was started and finished, and the latest duration.
Adding the error facet
As the last part of this story, let’s add the ErrorMessageRunFacet. For this, I created a new Python module called facets with a single function:
from openlineage.client.generated.error_message_run import ErrorMessageRunFacetdef create_error_facet(message, stack_trace) -> ErrorMessageRunFacet:
return ErrorMessageRunFacet(
message=message, stackTrace=stack_trace, programmingLanguage="python"
)
Then we update the create_event and create_run functions in the events module:
def create_run(run_id: str, run_facets: dict | None) -> Run:
return Run(runId=run_id, facets=run_facets)def create_event(
event_type: RunState,
run_id: str,
run_facets: dict | None,
name: str,
namespace,
):
return RunEvent(
eventType=event_type,
eventTime=datetime.now(UTC).isoformat(),
run=create_run(run_id=run_id, run_facets=run_facets),
job=create_job(namespace, name),
producer=name,
)
I also added a new method to the client:
def create_error_message_facet(self, error_message, stack_trace):
return facets.create_error_facet(
message=error_message,
stack_trace=stack_trace,
)and updated the submit_event function:
def submit_event(
self,
event_type: RunState,
run_id: str,
name: str,
run_facets: dict | None = None,
namespace=DEFAULT_NAMESPACE,
):
run_event = events.create_event(
event_type=event_type,
run_id=run_id,
run_facets=run_facets,
name=name,
namespace=namespace,
) self.lineage_client.emit(run_event)
In the LineageTask class, we make a small modification to the on_failure callback:
def on_failure(self, exc, task_id, args, kwargs, einfo): error_facet = self.client.create_error_message_facet(
error_message=str(exc),
stack_trace=einfo.traceback,
)
error_facet = {"errorMessage": error_facet}
self.client.submit_event(
event_type=RunState.FAIL,
run_id=task_id,
run_facets=error_facet,
name=self.task_job_name,
)
Now, let’s get back to the Marquez web UI and select the job failure. Then we can go to the run history and select the latest run to take a look at the facet:
Press enter or click to view image in full size
Conclusion
This was a fun little exploration into the OpenLineage Python package and how to use it to send Celery task data to Marquez.
This is the first story in a series.
You can find the entire example on GitHub.