Introduction

Dagster

Before reading this post, you can clone my source code on GitHub for reference.

Hi guys,

In the previous post, I introduced an overview data pipeline and many tech stack for a data engineer. You can review this post here. Now, I will guide you setup a standard data pipeline. This will be a series of posts: “How To Build A Data Pipeline”, and please note that the code provided is for illustration purposes and meant for simulation. No wasting time, here are list of main tech stack we will apply for this:

  • Dagster: The core data orchestration framework
  • Docker: Containerization and deployment
  • PostgreSQL: Used as Dagster’s metadata store
  • DBT: Data transformations
  • Snowflake: Data warehouse

Dagster is a data orchestration platform that helps data teams build, test, and monitor data pipelines. Dagster’s flexibility allows it to integrate seamlessly with a wide range of data tools and services. This extensibility makes Dagster a powerful choice for orchestrating complex data ecosystems.

In this post, we’ll focus on Dagster. I’ll walk you through setting up Dagster with Docker and PostgreSQL. Before begin, ensure you have the following installed:

  • Docker
  • Docker Compose
  • Python 3.10 or later
  • PostgreSQL

Project Structure

dagster/
├── compute_logs           # Directory for storing computation logs
├── dbt_project            # Contains dbt project files and models
├── example_postgres_data  # PostgreSQL data directory (for persistence)
├── local_artifact_storage # Local storage for Dagster artifacts
├── orchestration/         # Main directory for Dagster orchestration code
│   ├── assets/            # Definitions for Dagster assets
│   ├── jobs/              # Dagster job definitions
│   ├── resources/         # Custom resource definitions
│   ├── schedule/          # Schedule and sensor definitions
│   ├── utils/             # Utility functions and helpers
├── orchestration-dbt      # Integration code for Dagster and dbt
├── .env                   # Environment variables for the project
├── dagster.yaml           # Dagster instance configuration
├── Dockerfile             # Defines the Docker image for the project
├── docker-compose.yml     # Defines and configures the project's services
├── requirements.txt       # Python package dependencies
├── workspace.yaml         # Dagster workspace configuration

Setup Dagster

Dockerfile

This Dockerfile creates a custom image for Dagster, integrating the project’s source code.

# Dagster
FROM python:3.8-slim as dagster

# Update and upgrade the system packages
RUN apt-get update && apt-get upgrade -yqq

# Set an environment variable for Dagster's home directory
ENV DAGSTER_HOME=/opt/dagster/dagster_home/

# Create the Dagster home directory
RUN mkdir -p $DAGSTER_HOME

# Set the working directory to Dagster's home
WORKDIR $DAGSTER_HOME

# Copy necessary files into the working directory
COPY requirement.txt dagster.yaml workspace.yaml .env $DAGSTER_HOME

# Install the dependencies from the requirements file
RUN pip install -r requirement.txt

# Copy the project directories into the container
COPY /orchestration $DAGSTER_HOME/orchestration
# COPY /orchestration-dbt $DAGSTER_HOME/orchestration-dbt
# COPY /dbt_project $DAGSTER_HOME/dbt_project

# Expose port 3000 for web access
EXPOSE 3000

# Define the command to run the Dagster webserver
CMD ["sh", "-c", "dagster-webserver -h 0.0.0.0 -p 3000 & dagster-daemon run"]

Docker Compose

This Docker Compose configuration defines two services: PostgreSQL database (postgres) and the Dagster application (dagster).

version: '3.8'

services:
  postgres:
    image: postgres
    container_name: example_postgres
    hostname: example_postgres
    env_file:
      - .env
    environment:
      POSTGRES_USER: ${POSTGRES_DAGSTER_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
    restart: on-failure
    networks:
      - dagster_network
    ports:
      - 5438:5432
    volumes:
      - ./example_postgres_data:/var/lib/postgresql/data

  dagster:
    build: .
    container_name: example_dagster
    ports:
      - 3008:3000
    volumes:
      - ./orchestration:/opt/dagster/dagster_home/orchestration 
      - ./compute_logs:/opt/dagster/dagster_home/compute_logs
      - ./local_artifact_storage:/opt/dagster/dagster_home/local_artifact_storage
    env_file:
      - .env
    restart: on-failure
    networks:
      - dagster_network
    depends_on:
      - postgres

networks:
  dagster_network:
    driver: bridge

Postgres Service

  • Image: Uses the postgres image from DockerHub for a stable PostgreSQL version.
  • Container name: postgres
  • Environment variables:
    • POSTGRES_USER: Username for PostgreSQL.
    • POSTGRES_PASSWORD: Password for PostgreSQL.
    • POSTGRES_DB: Database name for PostgreSQL.
  • Restart Policy: Configured to restart on failure for reliability.
  • Networks: Connects to dagster_network for communication with the Dagster service.
  • Volumes: Maps the local ./example_postgres_data directory to /var/lib/postgresql/data in the container, ensuring data persistence.

Dagster Service

  • Image: Built from the Dockerfile in the current directory.
  • Port: Exposes port 3000 for the Dagster web interface.
  • Volumes:
    • Maps ./orchestration to /opt/dagster/dagster_home/orchestration for orchestration code.
    • Maps ./compute_logs to /opt/dagster/dagster_home/compute_logs for logs.
    • Maps ./local_artifact_storage to /opt/dagster/dagster_home/local_artifact_storage for artifact storage.
  • Restart Policy: Set to restart on failure for service continuity.
  • Networks: Connects to dagster_network for communication with PostgreSQL.
  • Dependency: Depends on the postgres service to ensure it starts only after PostgreSQL is running.

Network Configuration

  • Dagster Network: Creates a network named dagster_network using the bridge driver to facilitate communication between postgres and dagster services.

Configuration Files

dagster.yaml

compute_logs:
  module: dagster.core.storage.local_compute_log_manager
  class: LocalComputeLogManager
  config:
    base_dir: ./compute_logs

local_artifact_storage:
  module: dagster.core.storage.root
  class: LocalArtifactStorage
  config:
    base_dir: ./local_artifact_storage

storage:
  postgres:
    postgres_db:
      username: ******
      password: ******
      hostname: ******
      db_name: ******
      port: 5432

telemetry:
  enabled: False

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

Compute Logs

  • Purpose: Manages the capture and storage of stdout and stderr logs.
  • Configuration:
    • module: Specifies the module dagster.core.storage.local_compute_log_manager.
    • class: Uses the LocalComputeLogManager class to manage local compute logs.
    • config:
      • base_dir: Directory for storing compute logs (./compute_logs).

Local Artifact Storage

  • Purpose: Manages storage for local artifacts.
  • Configuration:
    • module: Refers to the dagster.core.storage.root module.
    • class: Uses the LocalArtifactStorage class, which provides mechanisms for storing artifacts locally.
    • config:
      • base_dir: Directory for storing artifacts (./local_artifact_storage).

Dagster Storage

  • Purpose: Manages the persistence of job and asset history.
  • Configuration:
    • postgres: Specifies the use of PostgreSQL as the storage backend.
      • postgres_db: Database connection details.
        • username: Database username.
        • password: Database password.
        • hostname: Database server hostname.
        • db_name: Database name.
        • port: Database port (default is 5432).

Telemetry

  • Purpose: Configures the collection of anonymized usage statistics.
  • Configuration:
    • enabled: Set to False to disable telemetry.

Scheduler

  • Purpose: Manages pipeline schedules.
  • Configuration:
    • module: Specifies the module dagster.core.scheduler.
    • class: Uses the DagsterDaemonScheduler class. This scheduler is a part of the Dagster daemon process and is responsible for scheduling and launching pipeline runs based on predefined schedules.

Run Coordinator

  • Purpose: Manages run prioritization and concurrency.
  • Configuration:
    • module: Refers to dagster.core.run_coordinator.
    • class: The QueuedRunCoordinator class is used here. It provides functionality for queuing run requests and determining the order of execution. This is especially useful in scenarios where multiple pipeline runs are triggered, and there is a need to manage their execution order efficiently.

workspace.yaml

The workspace.yaml file configures the code location server for your project. It should be in the same directory as your Dockerfile.

load_from:
  - python_module: orchestration
#   - python_module: orchestration-dbt

Sample Code

Recources

Resources configurations, describing connections to external services such as databases, APIs, or custom tools.

# orchestration/resources/example_recource.py
from dagster import resource

class ExampleResource:
    def get_message(self):
        return "Hello from ExampleResource!"

@resource
def example_resource():
    return ExampleResource()

Assets

Assets represent data objects and their production methods.

# orchestration/assets/example_asset.py
from dagster import asset

@asset
def example_asset():
    return "This is an example asset"

@asset
def dependent_asset(example_asset):
    return f"This asset depends on: {example_asset}"

Jobs

Jobs are the main units for executing and monitoring specific sections of an asset graph.

# orchestration/jobs/example_job.py
from dagster import job

from ..assets.example_asset import example_asset, dependent_asset

@job
def example_job():
    example_asset()
    dependent_asset(example_asset())

Schedules

Schedules automate job execution at predetermined intervals.

# orchestration/schedules/example_schedule.py
from dagster import schedule
from orchestration.jobs.example_job import example_job

@schedule(
    cron_schedule="0 0 * * *",  # This is a daily schedule at midnight
    job=example_job,
    execution_timezone="UTC",
)
def example_schedule():
    return {}

Package Initialization

This file serves as the entry point for the Dagster project, bringing together all the components we defined earlier (assets, jobs, schedules, and resources) into a cohesive structure.

# orchestration/__ init__.py
from dagster import Definitions, load_assets_from_modules

from orchestration.assets import example_asset
from orchestration.jobs.example_job import example_job
from orchestration.schedules.example_schedule import example_schedule
from orchestration.resources.example_resource import example_resource

all_assets = load_assets_from_modules([example_asset])

defs = Definitions(
    assets=[*all_assets],
    jobs=[example_job],
    schedules=[example_schedule],
    resources={
        "example_resource": example_resource,
    }
)

Deployment

To deploy your Dagster project, follow these steps:

  1. Navigate to your Dagster project directory:
cd dagster
  1. Build and start your Docker containers:
docker-compose build
docker-compose up
  1. Once the containers are running, open a web browser and navigate to http://localhost:3000 to access the Dagster web interface.
  2. To stop the deployment, run:
docker-compose down

You can refer to my code on GitHub.

Results

After deploying Dagster on Docker, you can access the Dagster web interface by navigating to http://localhost:3000. In the example code provided, I have written simple assets in Dagster to help you visualize our workflow. Essentially, we will write the assets and strategy code to ingest data from third-party API services into a database or data warehouse. The Dagster web interface allows us to monitor and manage data pipelines and handle GraphQL queries.

Key features of the Dagster web interface include:

  • Jobs: Execute and monitor job runs
  • Runs: View and filter all pipeline runs
  • Assets: Explore and manage data assets
  • Schedules: Set up and monitor automated job executions
  • Sensors: Configure and track event-driven pipeline triggers
  • Deployment: Access information about code locations, definitions, daemons, and configurations

The example code provided demonstrates a simple asset structure in Dagster. In real-world applications, you would typically write assets and strategy code to ingest data from third-party API services into databases or data warehouses.

Assets Example Example Assets

Runs Example Example Runs

Conclusion

Yayy, I have introduced and guided you through setting up Dagster on Docker step by step. Dagster is a robust tool for orchestrating data, easy to set up, and user-friendly. The most important aspect is building a strategy to ingest data from APIs. We need to understand:

  • Source API (Endpoint, Response format, Authentication)
  • Design Data Ingestion Flow
  • Custom API Resource (if any)
  • Strategy for each endpoint (Full Refresh Data, Incremental Data, Backfill)
  • Error handling

I will write a separate post about Data Ingestion Strategy so we can explore it together. Returning to the main topic of this post, after successfully setting up Dagster and completing data ingestion, we will integrate DBT with Dagster to transform and clean data. This integration will be covered in part 2. See yah!

To read more about Part 2: Integrating DBT with Dagster

Reference

Dagster documentation