search
Search
Login
Unlock 100+ guides
menu
menu
web
search toc
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
What does this mean?
Why is this true?
Give me some examples!
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to

Setting Run-time Configurations in Dagster

schedule Sep 14, 2023
Last updated
local_offer
Dagster
Tags
mode_heat
Master the mathematics behind data science with 100+ top-tier guides
Start your free 7-days trial now!

Setting run-time configuration using Dagster UI

We can specify run-time configuration directly on the Dagster UI. Suppose we have the following main.py file:

from dagster import Definitions, asset
import pandas as pd

@asset(name="iris_data")
def get_iris_data(context):
context.log.info(context.op_config["greet"])
df = pd.read_csv("https://raw.githubusercontent.com/SkyTowner/sample_data/main/iris_data.csv")
return df

defs = Definitions(assets=[get_iris_data])

Let's launch the Dagster UI using the following command:

dagster dev -f main.py

On the Dagster UI, click on the following dropdown icon and then click on Open launchpad:

The Launchpad is where we can specify our run-time configuration:

For those who want to copy and paste this config, here's the code:

{
ops: {
iris_data: {
config: {
greet: "meow"
}
}
}
}

Once we click on Materialize, we should a meow in the logs:

To see the run configuration for this particular run, click on the View tags and config button on the top right corner:

We should then see the following:

This is quite useful when looking back at past runs!

Passing in run-time configuration programmatically

We can programmatically set run-time configuration that assets can access during their materialization.

Using a Python dictionary

To demonstrate, consider the following main.py file:

from dagster import RunConfig, Config, asset, materialize

@asset(name="my_data")
def get_my_data(config: dict):
print(config) # {'cat': 'meow', 'dog': 'ruff'}
return "My data"

if __name__ == "__main__":
asset_result = materialize(
[get_my_data],
run_config={
"ops": {
"my_data": {
"config": {
"cat": "meow",
"dog": "ruff"
}
}
}
}
)

Here, note the following:

  • the "ops" key is synonymous to "assets".

  • to pass run-time configurations to our my_data asset, we must specify "my_data" under "ops", and then "config" under "my_data".

  • to access the run-time configurations in our assets, we must supply config as parameter - this word must be spelt exactly as config (instead of say configgg), otherwise an error is thrown.

  • we must also supply a dict type-hint, otherwise an error will be thrown.

Let's now run our main.py file:

python main.py
2023-09-14 21:21:01 +0800 - dagster - DEBUG - __ephemeral_asset_job__ - e9fd75c0-a86c-411f-a6ae-ce7b7d08831e - 8583 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
...
{'cat': 'meow', 'dog': 'ruff'}
...

Instead of passing in the config parameter, we can also access the run-time configuration via the context object like so:

@asset(name="my_data")
def get_my_data(context):
print(context.op_config) # {'cat': 'meow', 'dog': 'ruff'}
return "My data"

Instead of specifying which asset to pass the configuration to, we can set the run-time configurations under "execution" so that all assets can gain access to them:

@asset(name="my_data")
def get_my_data(context):
print(context.run_config["execution"]["config"]) # {'cat': 'meow', 'dog': 'ruff'}
return "My data"

if __name__ == "__main__":
asset_result = materialize(
[get_my_data],
run_config={
"execution": {
"config": {
"cat": "meow",
"dog": "ruff"
}
}
}
)

Using a Dagster Config

Instead of passing a Python dictionary, which can hold any arbitrary key-value pairs, we can be more specific by passing a Dagster config. To demonstrate, consider the following main.py file:

from dagster import RunConfig, Config, asset, materialize

class MyAssetConfig(Config):
my_string: str = "my_default_string"
my_numbers: list

@asset(name="my_data")
def get_my_data(config: MyAssetConfig):
print(config.my_string)
print(config.my_numbers)
return "My data"

if __name__ == "__main__":
asset_result = materialize(
[get_my_data],
run_config=RunConfig({"my_data": MyAssetConfig(my_numbers=[1,2])}),
)

Note the following:

  • the MyAssetConfig class, which inherits Dagster's Config class, provides the template of the configuration.

  • we can pass the run-time configuration (MyAssetConfig) to the asset (my_data) by passing it in as the first parameter.

  • we have to specify the type of the config parameter to MyAssetConfig - otherwise, Dagster will throw an error. Also, the config parameter must be named exactly config - if we use configg instead, an error will be thrown.

  • in the materialize(-) function, we pass an instance of the RunConfig using the run_config parameter.

  • the RunConfig takes as input a dictionary where the key ("my_data" in this case) is the name of the asset to pass the configuration to, while the value is an instance of the run-time config.

Let's now run our main.py file:

python my_dagster_code_location/main.py
...
2023-07-16 23:24:55 +0800 - dagster - DEBUG - __ephemeral_asset_job__ - 22815ef5-8ec0-4759-9c2f-334fc2511a6f - 25168 - my_data - STEP_START - Started execution of step "my_data".
my_default_string
[1, 2]
...

Note that if we specify non-defined fields to our configuration class, then an error will be thrown when Dagster validates the configuration at launch:

if __name__ == "__main__":
asset_result = materialize(
[get_my_data],
# some_other_field is a property not defined in MyAssetConfig
run_config=RunConfig({"my_data": MyAssetConfig(some_other_field=[1,2])}),
)

Accessing run-time configuration using the context object

Instead of accessing the run-time config object directly, we can also access it via the context object like so:

from dagster import RunConfig, Config, asset, materialize, OpExecutionContext

class MyAssetConfig(Config):
my_string: str = "my_default_string"
my_numbers: list

@asset(name="my_data")
def get_my_data(context: OpExecutionContext):
print(context.op_config["my_string"]) # my_default_string
print(context.op_config["my_numbers"]) # [1, 2]
return "My data"

if __name__ == "__main__":
asset_result = materialize(
[get_my_data],
run_config=RunConfig({"my_data": MyAssetConfig(my_numbers=[1,2])}),
)

Note that we could pass the config and context objects at the same time like so:

@asset(name="my_data")
def get_my_data(context: OpExecutionContext, config: MyAssetConfig):
assert context.op_config["my_numbers"] == config.my_numbers
return "My data"

Here, the ordering of the parameters context and config does not matter - only their names do!

Setting tags to runs when materializing

We can attach key-value tags to runs, which we can then use for filtering in the Dagster UI. In this section, we will demonstrate how to do so using the Dagster UI and Dagster's Python API.

Using Dagster UI

Suppose we have a Python file called my_code_location.py with the following content:

from dagster import Definitions, asset
from dagster import asset

@asset(name="my_asset")
def get_my_asset():
return 3

defs = Definitions(assets=[get_my_asset])

Launch the Dagster UI like so:

dagster-webserver -f my_code_location.py
2023-08-30 21:31:57 +0800 - dagster.code_server - INFO - Started Dagster code server for file my_code_location.py in process 32254
2023-08-30 21:31:57 +0800 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 32251

Click on the dropdown icon on the right of the Materialize button and click on Open launchpad:

Next, click on the Edit tags button:

Let's add the following two tags to our run:

Click Apply and then click on Materialize button at the bottom below:

Now, under the Run tab, we should see our executed run with the tags that we set:

Tags are useful because we can filter by tag like so:

Using Dagster's Python API

Let's now demonstrate how to set tags to runs when using Dagster's Python API. Consider the following files:

dagster_home/
main.py
my_asset.py
my_code_location.py

Where my_asset holds our asset:

from dagster import asset

@asset(name="my_asset")
def get_my_asset():
return 3

Our main.py materializes this asset via the Dagster Python API:

from my_asset import get_my_asset
from dagster import materialize, DagsterInstance
import os

if __name__ == "__main__":
os.environ["DAGSTER_HOME"] = "/Users/isshininada/dagster_tutorial/dagster_home"
with DagsterInstance.get() as instance:
materialize(
assets=[get_my_asset],
instance=instance,
tags={
"my_tag_one": "B",
"my_tag_two": 5,
}
)

Here, we've attached two key-value tags to our run.

Finally, my_code_location.py holds the Dagster Definitions object to initialize the UI:

from my_asset import get_my_asset
from dagster import Definitions
import os

os.environ["DAGSTER_HOME"] = "/Users/isshininada/dagster_tutorial/dagster_home"
defs = Definitions(assets=[get_my_asset])

Now, let's run our main.py to materialize our assets:

python main.py
2023-08-30 21:36:27 +0800 - dagster - DEBUG - __ephemeral_asset_job__ - 683e7474-306e-4c89-82fe-343bbf08b6b9 - 32646 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-08-30 21:36:27 +0800 - dagster - DEBUG - __ephemeral_asset_job__ - 683e7474-306e-4c89-82fe-343bbf08b6b9 - 32646 - ENGINE_EVENT - Executing steps in process (pid: 32646)
2023-08-30 21:36:27 +0800 - dagster - DEBUG - __ephemeral_asset_job__ - 683e7474-306e-4c89-82fe-343bbf08b6b9 - 32646 - my_asset - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
...

Spin up the Dagster UI server like so:

dagster-webserver -f my_code_location.py
2023-08-30 21:31:57 +0800 - dagster.code_server - INFO - Started Dagster code server for file my_code_location.py in process 32254
2023-08-30 21:31:57 +0800 - dagster-webserver - INFO - Serving dagster-webserver on http://127.0.0.1:3000 in process 32251

When we head over to the run tab, we can see our run recorded with the two tags that we've set:

robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_up
thumb_down
Comment
Citation
Ask a question or leave a feedback...