Table of Contents
Open Table of Contents
Competition
Dagster is an alternative to Apache Airflow
Apache Airflow:
- is the status quo, it came first and so became a standard
- very popular
- has a lot of providers
Dagster:
- is friendlier, prettier
- integrates with ML/DL frameworks
So, Dagster is the new shiny tool that doesn’t have the stinks of aging Airflow. Airflow’s strength lies in its community and popularity - learning it will be easier because of the vast amount of resources available.
Setup
We’ll use poetry for the initial setup
Install Dagster
poetry add dagster dagster-webserver
Create a project
poetry run dagster project scaffold --name project_name
Install dependencies
This process is slightly different, I couldn’t find how to install local packages with poetry so we’ll use pip.
This assumes that a .venv is present within your directory.
source .venv/bin/activate # OR poetry shell
# cd project_name
pip install -e ".[dev]"
Run Dagster
Now that you’re within the environment that has your local package installed, you can run Dagster
dagster dev
Navigate to http://127.0.0.1:3000 and you’ll see a GUI.
Working with Dagster
We will work with DAGs (Directed Acyclic Graph, a graph where edges have directions and there’s no cycles)
Dagster is built with assets, an asset is an entity that a data pipeline produces.
in project_name/project_name you will find a file called assets.py, there you can create your assets.
import os
from dagster import AssetExecutionContext, MaterializeResult, asset
from .pipeline import Model, Parser
import json
model = Model()
parser = Parser()
md = "some data you should retrieve from somewhere"
@asset
def transform_content(context: AssetExecutionContext) -> None:
context.log.info("Content transformed") # adding context to the function signature enables you to log stuff!
@asset(deps={"transform_content"}) # the `deps` will make this a DAG
def segment_data(context: AssetExecutionContext) -> None:
segments = parser.parse(md)
context.log.info("Data segmented")
os.makedirs("output", exist_ok=True)
with open("output/segments.json", "w") as f:
f.write(json.dumps(segments))
@asset(deps={"segment_data"})
def integrate_data_to_model(context: AssetExecutionContext) -> MaterializeResult:
with open("output/segments.json", "r") as f:
data = json.loads(f.read())
integrated = []
for item in data:
integrated.append(model.map(item))
context.log.info("Data integrated into model")
return MaterializeResult(metadata={"integrated": integrated}) # materialize result returns metadata to the Dagster GUI, you can also return plots and markdown here https://docs.dagster.io/tutorial/building-an-asset-graph#metadata-and-markdown
This is a basic DAG, now you can get into more advanced stuff like
- scheduling
- automating
- connecting to external resources
- analytics
Ops
One more thing worth talking about is the ops unit.
Ops are in reality just python functions, the difference between assets and ops is that ops don’t know about their dependencies.
The idea of an op is that it should perform a simple task like:
- Executing a query
- Initiating a job
- Sending an email/message
Ops can:
- return values
- create
onsuccessandonfailureevents - have a retry policy
If you’d like to go more depth, this post from Georg Heiler is a great starting point and here’s his github repository outlining how a Dagster repository could look like .
Here’s a simpler example from Anton Friberg’s github
Summary
Why would you use Dagster?
For simple use cases it’s probably not necessary, but when you need to scale and combine multiple complex operations - it will come in handy for sure. The main goal of using tools like these is observability, which is crucial when data transformations are involved.