Dagster & dbt Cloud
Dagster allows you to run dbt Cloud jobs alongside other technologies. You can schedule them to run as a step in a larger pipeline and manage them as a data asset.
Our updated dbt Cloud integration offers two capabilities:
- Observability - You can view your dbt Cloud assets in the Dagster Asset Graph and double click into run/materialization history.
- Orchestration - You can use Dagster to schedule runs/materializations of your dbt Cloud assets, either on a cron schedule, or based on upstream dependencies.
Installation
- uv
- pip
uv add dagster-dbt
pip install dagster-dbt
Observability example
To make use of the observability capability, you will need to add code to your Dagster project that does the following:
- Defines your dbt Cloud credentials and workspace.
- Uses the integration to create asset specs for models in the workspace.
- Builds a sensor which will poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
import os
import dagster as dg
from dagster_dbt.cloud_v2.resources import (
DbtCloudCredentials,
DbtCloudWorkspace,
load_dbt_cloud_asset_specs,
)
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor
# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)
# Define the workspace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)
# Use the integration to create asset specs for models in the workspace
dbt_cloud_asset_specs = load_dbt_cloud_asset_specs(workspace=workspace)
# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)
Orchestration example
To make use of the orchestration capability, you will need to add code to your Dagster project that does the following:
- Defines your dbt Cloud credentials and workspace.
- Builds your asset graph in a materializable way.
- Adds these assets to the Declarative Automation Sensor.
- Builds a sensor to poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
import os
import dagster as dg
from dagster_dbt.cloud_v2.asset_decorator import dbt_cloud_assets
from dagster_dbt.cloud_v2.resources import DbtCloudCredentials, DbtCloudWorkspace
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor
# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)
# Define the worskpace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)
# Builds your asset graph in a materializable way
@dbt_cloud_assets(workspace=workspace)
def my_dbt_cloud_assets(
context: dg.AssetExecutionContext, dbt_cloud: DbtCloudWorkspace
):
yield from dbt_cloud.cli(args=["build"], context=context).wait()
# Automates your assets using Declarative Automation
# https://docs.dagster.io/guides/automate/declarative-automation
my_dbt_cloud_assets = my_dbt_cloud_assets.map_asset_specs(
lambda spec: spec.replace_attributes(
automation_condition=dg.AutomationCondition.eager()
)
)
# Adds these assets to the Declarative Automation Sensor
automation_sensor = dg.AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=dg.DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)
Partitioned assets example
You can define partitions alongside your dbt Cloud assets to build incremental models. Pass a partitions_def to the dbt_cloud_assets decorator, then use context.partition_time_window or context.partition_key to pass variables to dbt via --vars.
When using a TimeWindowPartitionsDefinition, a BackfillPolicy.single_run() is applied by default.
Daily partitioned (time window)
Use time window partitions to pass date ranges to your dbt models as variables. This is useful for incremental models that filter by date.
@dbt_cloud_assets(
workspace=workspace,
partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01"),
)
def my_partitioned_dbt_cloud_assets(
context: dg.AssetExecutionContext, dbt_cloud: DbtCloudWorkspace
):
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat(),
}
yield from dbt_cloud.cli(
args=["build", "--vars", json.dumps(dbt_vars)],
context=context,
).wait()
In your dbt model, use the passed variables to filter rows:
select * from {{ ref('my_model') }}
{% if is_incremental() %}
where order_date >= '{{ var('min_date') }}' and order_date < '{{ var('max_date') }}'
{% endif %}
Static partitioned (e.g. by region)
Use static partitions to run dbt models for different segments like regions or tenants.
@dbt_cloud_assets(
workspace=workspace,
partitions_def=dg.StaticPartitionsDefinition(["us", "eu", "apac"]),
)
def my_region_dbt_cloud_assets(
context: dg.AssetExecutionContext, dbt_cloud: DbtCloudWorkspace
):
region = context.partition_key
dbt_vars = {"target_region": region}
yield from dbt_cloud.cli(
args=["build", "--vars", json.dumps(dbt_vars)],
context=context,
).wait()
About dbt Cloud
dbt Cloud is a hosted service for running dbt jobs. It helps data analysts and engineers productionize dbt deployments. Beyond dbt open source, dbt Cloud provides scheduling , CI/CD, serving documentation, and monitoring & alerting.
If you're currently using dbt Cloud™, you can also use Dagster to run dbt-core in its place. You can read more about how to do that here.
Using dbt Cloud with Dagster Components
The DbtCloudComponent allows you to load a dbt Cloud project as a set of Dagster assets using the Components API. It automatically synchronizes with your dbt Cloud manifest and triggers jobs remotely.
Prerequisites
To use this component, you need:
- A dbt Cloud account.
- An API token and Account ID.
- A Project ID and Environment ID for the dbt Cloud project you wish to orchestrate.
Configuration
The component requires a workspace resource and optional selection arguments:
| Argument | Type | Description |
|---|---|---|
workspace | DbtCloudWorkspace | Required. Resource containing your dbt Cloud credentials (token, account_id) and project details. |
select | str | A dbt selection string to filter assets (e.g. tag:staging). Defaults to fqn:*. |
exclude | str | A dbt selection string to exclude assets. |
defs_state | DefsStateConfig | Configuration for persisting the state (manifest) locally. Defaults to local filesystem. |
Usage Example
from dagster import Definitions
from dagster_dbt import DbtCloudComponent, DbtCloudWorkspace
from dagster.components.utils.defs_state import DefsStateConfigArgs
# Define your dbt Cloud workspace
dbt_cloud_workspace = DbtCloudWorkspace(
account_id=123456,
project_id=11111,
token="your-dbt-cloud-api-token",
)
# Create the component
dbt_cloud = DbtCloudComponent(
workspace=dbt_cloud_workspace,
select="fqn:*",
defs_state=DefsStateConfigArgs.local_filesystem(),
)
Limitations
Code References: Unlike local dbt projects, the dbt Cloud component does not support linking Dagster assets to local SQL source files, as execution occurs remotely.