Introduction
Hi guys,
Welcome to part 3 of our “How To Build A Data Pipeline” series. In the previous two parts, we successfully set up a data pipeline using a combination of technologies including Dagster, Docker, PostgreSQL, DBT, and Snowflake. As a Data Engineer, your responsibilities extend beyond merely constructing data pipelines; you’re also tasked with finding ways to reduce costs and optimize code.
While Dagster and DBT are open-source and free to use, Snowflake operates on a pay-as-you-go model. In this post, we’ll dive deep into various strategies and techniques to enhance your Snowflake operations, ensuring you get the most value out of your data warehouse while keeping expenses in check. Okay, let’s go.
Understanding Snowflake’s Cost Model
Snowflake’s pricing model is based on a pay-as-you-go approach, which offers flexibility but also requires careful management to optimize costs.
- Compute Costs
- Charged per second for the time your virtual warehouses are running.
- Costs vary based on the size of the warehouse (XS, S, M, L, XL, etc.).
- Suspended warehouses do not incur compute costs.
- Storage Costs
- Charged monthly based on the average amount of data stored.
- Includes data stored in tables, internal stages, and fail-safe storage.
- Compressed data storage is used for billing, which is typically much smaller than raw data size.
- Cloud Services
- Cover metadata management, query optimization, access control, and other background operations.
- Usually a small portion of overall costs for most customers.
When used together for ETL, Dagster and dbt play key roles in optimizing Snowflake costs through:
- Efficient Transformations: Optimized SQL queries in dbt, orchestrated by Dagster, reduce compute resources and processing time.
- Incremental Loading: Both tools support processing only new or changed data, minimizing resource usage.
- Intelligent Scheduling: Dagster’s advanced scheduling capabilities allow running jobs during off-peak hours, leveraging lower compute costs.
- Version Control and Testing: Both tools offer robust testing and version control, reducing errors and preventing costly data reprocessing.
- Resource Management: Tagging in dbt and Dagster’s run monitoring help identify cost-saving opportunities.
- Performance Insights: Logs from both tools provide valuable information on query efficiency and resource use.
- Flexible Materialization: dbt’s materialization strategies (table, view, incremental, ephemeral), when orchestrated by Dagster, balance performance and cost effectively
Warehouse Sizing Strategies
- Start Small, Scale Up
- Begin with smaller warehouse sizes (XS, S)
- Gradually increase based on performance needs
- Monitor and Adjust
- Use Snowflake’s query history and monitoring features
- Watch for query execution time, queuing, and credit consumption
- Differentiate by Workload
- Separate warehouses for development, reporting, ad-hoc queries, etc.
- Tailor sizes to specific needs
- Use Multi-Cluster Warehouses
- For handling concurrent queries during peak times
- Set maximum cluster count based on needs and budget
- Implement Auto-Suspend and Auto-Resume
- Minimize idle warehouse costs
- Ensure seamless user experience
Macro below will check the environment name (target.name). If it’s ‘prod’ or ‘default’, it returns the specified warehouse name (warehouse_name). Otherwise, it returns the warehouse associated with the current target (target.warehouse).
dbt_project/macros/get_warehouse.sql
{% macro get_warehouse(warehouse_name) %}
{% if target.name in ['prod', 'default'] %}
{{ return(warehouse_name) }}
{% else %}
{{ return(target.warehouse) }}
{% endif %}
{% endmacro %}
You also set warehouse default in profiles.yml
profiles.yml
default:
outputs:
dev:
account: your_account
database: your_database
password: your_password
role: your_role
schema: your_schema
threads: 1
type: snowflake
user: your_username
warehouse: transforming_warehouse_xs # set warehouse here
target: dev
Incremental Materialization
Incremental materialization is a technique for updating data tables by processing only new or changed data since the last update, rather than rebuilding the entire table. This method offers several benefits:
- Efficiency: Reduces data processing volume and time.
- Cost-Effectiveness: Lowers compute resource consumption.
- Data Timeliness: Allows for more frequent updates.
Key Strategies:
- Time-based Incremental Materialization: Processes new data based on a timestamp column.
- Change Data Capture (CDC): Identifies and applies specific changes (inserts, updates, deletes) to the target dataset.
{{
config(
materialized='incremental'
)
}}
-- write your query here
When setting up incremental models in dbt, you can define several important parameters:
| Parameter | Default Value | Options | Purpose |
|---|---|---|---|
| incremental_strategy | 'append' |
'append', 'merge', 'delete+insert' |
Defines how new data is integrated with existing data. |
| unique_key | null |
A list of columns, e.g., ['col1', 'col2', ...] |
Identifies unique records for merge operations. |
| on_schema_change | 'ignore' |
'ignore', 'append_new_columns', 'sync_all_columns', 'fail' |
Determines how to handle schema changes in the source data. |
Example for incremental materialization using the merge strategy on the user_id column:
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['user_id'],
on_schema_change='ignore'
) }}
select
user_id,
file_last_modified,
-- other columns...
from {{ source('shopify_raw', 'users') }}
{% if is_incremental() %}
-- This filter is applied only during incremental runs
where file_last_modified > (select max(file_last_modified) from {{ this }})
{% endif %}
While dbt’s recommended approach for incremental materialization uses a CTE within an is_incremental() macro to get the maximum event timestamp, this method can be inefficient. It may slow down source table scans and reduce readability.
To address these issues, we’ve created a custom macro called get_incremental_start_time_exact. This macro efficiently retrieves the required incremental timestamp and stores it in a variable.
{% macro get_incremental_start_time_exact(target_table, time_column, lag_time=var('event_lag_time'), lag_part=var('event_lag_part'), table_start_time=var('start_time'), timestamp_format='timestamp_ntz') -%}
{% if is_incremental() -%}
{% set sql -%}
select max({{ time_column }}) - interval '{{ lag_time }} {{ lag_part }}' from {{ target_table }}
{% endset -%}
'{{ run_query(sql).columns[0].values()[0] }}'::{{ timestamp_format }}
{%- else -%}
'{{ table_start_time }}'::{{ timestamp_format }}
{%- endif %}
{%- endmacro %}
This macro can be called in a model file to determine the starting point for incremental processing, like this:
{{
config(
materialized = 'incremental'
,unique_key = ['user_id']
,incremental_strategy = 'merge'
,tags = ['shopify']
)
}}
{% set incremental_cutoff = get_incremental_start_time_exact(this, 'file_last_modified', 0, 'hour', "2020-01-01", 'timestamp_ntz(0)') %}
with source as (
select * from {{ source('shopify_raw', 'users') }}
where file_last_modified > {{ incremental_cutoff }}
), final as (
select
--primary key
raw:id ::varchar as user_id
-- write your query here
--metadata
,file_last_modified ::timestamp_ntz(0) as file_last_modified
from source
qualify row_number() over(partition by user_id order by file_last_modified desc) = 1
)
select * from final
order by user_id
This approach provides a clean and efficient way to manage incremental loads, especially useful for large datasets or frequent updates.
Conclusion
In Part 3 of our “How To Build A Data Pipeline” series, we concentrated on fine-tuning Snowflake for both cost efficiency and performance enhancement. As Data Engineers, implementing these tactics ensures that our pipelines are optimal in terms of both economy and operation. Utilizing open-source tools like Dagster and DBT in conjunction with Snowflake presents an ideal solution for constructing data pipelines. Thank you for following this series, and look forward to more insights into effective data engineering practices. See yah!
To read more about Part 4: From Raw Data to Visual Storytelling