dbt assets package
The dbt assets package crawls dbt assets and publishes them to Atlan for discovery.
Cloud
This should only be used to create the workflow the first time. Each time you run this method it will create a new connection and new assets within that connection — which could lead to duplicate assets if you run the workflow this way multiple times with the same settings.
Instead, when you want to re-crawl assets, re-run the existing workflow (see Re-run existing workflow below).
To create a new crawl of dbt assets from a multi-tenant dbt Cloud account:
- Java
- Python
- Kotlin
- Raw REST API
Workflow crawler = DbtCrawler.creator( // (1)
client, // (2)
"dbt-snowflake", // (3)
List.of(client.getRoleCache().getIdForName("$admin")), // (4)
null,
null
)
.cloud( // (5)
"https://cloud.getdbt.com",
"example-token",
true
)
.limitToConnection( // (6)
"default/snowflake/1234567890"
)
.include("{\"24670\":{\"211208\":{\"163013\":{\"207502\":{}}}}}") // (7)
.exclude("{\"24670\":{}}") // (8)
.tags(true) // (9)
.enrichMaterializedAssets(true) // (10)
.build() // (11)
.toWorkflow(); // (12)
WorkflowResponse response = crawler.run(client); // (13)
-
The
DbtCrawlerpackage will create a workflow to crawl assets from dbt cloud. -
You must provide Atlan client.
-
You must provide a name for the connection that the dbt assets will exist within.
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers) - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from dbt cloud account then you must provide the following information:
- hostname of your dbt cloud instance.
- token to use to authenticate against dbt cloud instance.
- whether to use a multi-tenant cloud config (
true), otherwise a single-tenant cloud config (false).
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify the list of assets to include in crawling. This is a highly-nested map structure of numeric IDs for the account, project, etc. You will almost certainly need to set up the filter you want through the UI first, and look at the developer console of your browser to see the structure.
-
You can also optionally specify the list of assets to exclude from crawling. This follows the same structure as the inclusion filter.
-
You can also optionally specify whether to enable dbt tag syncing as part of crawling dbt.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Build the minimal package object.
-
Now, you can convert the package into a
Workflowobject. -
You can then run the workflow using the
run()method on the object you've created. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import DbtCrawler
client = AtlanClient()
crawler = (
DbtCrawler( # (1)
client=client, # (2)
connection_name="dbt-snowflake", # (3)
admin_roles=[client.role_cache.get_id_for_name("$admin")], # (4)
admin_groups=None,
admin_users=None,
)
.cloud( # (5)
hostname="https://cloud.getdbt.com",
service_token="example-token",
multi_tenant=True,
)
.limit_to_connection( # (6)
connection_qualified_name="default/snowflake/1234567890"
)
.include(filter='{"24690":{"327645":{}}}') # (7)
.exclude(filter='') # (8)
.tags(True) # (9)
.enrich_materialized_assets(True) # (10)
.to_workflow() # (11)
)
response = client.workflow.run(crawler) # (12)
-
Base configuration for a new dbt crawler.
-
You must provide a client instance.
-
You must provide a name for the connection that the dbt assets will exist within.
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers) - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from dbt cloud account then you must provide the following information:
- hostname of your dbt cloud instance.
- token to use to authenticate against dbt cloud instance.
- whether to use a multi-tenant cloud config (
True), otherwise a single-tenant cloud config (False).
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify the list of assets to include in crawling. This is a highly-nested map structure of numeric IDs for the account, project, etc. You will almost certainly need to set up the filter you want through the UI first, and look at the developer console of your browser to see the structure. (If set to None, all assets will be crawled.)
-
You can also optionally specify the list of assets to exclude from crawling. This follows the same structure as the inclusion filter. (If set to None, no assets will be excluded.)
-
You can also optionally specify whether to enable dbt tag syncing as part of crawling dbt.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Now, you can convert the package into a
Workflowobject. -
Run the workflow by invoking the
run()method on the workflow client, passing the created object.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
val crawler = DbtCrawler.creator( // (1)
client, // (2)
"dbt-snowflake", // (3)
listOf(client.getRoleCache().getIdForName("\$admin")), // (4)
null,
null
)
.cloud( // (5)
"https://cloud.getdbt.com",
"example-token",
true
)
.limitToConnection( // (6)
"default/snowflake/1234567890"
)
.include("{\"24670\":{\"211208\":{\"163013\":{\"207502\":{}}}}}") // (7)
.exclude("{\"24670\":{}}") // (8)
.tags(true) // (9)
.enrichMaterializedAssets(true) // (10)
.build() // (11)
.toWorkflow() // (12)
val response = crawler.run(client) // (13)
-
The
DbtCrawlerpackage will create a workflow to crawl assets from dbt cloud. -
You must provide Atlan client.
-
You must provide a name for the connection that the dbt assets will exist within.
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers) - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from dbt cloud account then you must provide the following information:
- hostname of your dbt cloud instance.
- token to use to authenticate against dbt cloud instance.
- whether to use a multi-tenant cloud config (
true), otherwise a single-tenant cloud config (false).
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify the list of assets to include in crawling. This is a highly-nested map structure of numeric IDs for the account, project, etc. You will almost certainly need to set up the filter you want through the UI first, and look at the developer console of your browser to see the structure.
-
You can also optionally specify the list of assets to exclude from crawling. This follows the same structure as the inclusion filter.
-
You can also optionally specify whether to enable dbt tag syncing as part of crawling dbt.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Build the minimal package object.
-
Now, you can convert the package into a
Workflowobject. -
You can then run the workflow using the
run()method on the object you've created. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
We recommend creating the workflow only via the UI. To rerun an existing workflow, see the steps below.
Core
This should only be used to create the workflow the first time. Each time you run this method it will create a new connection and new assets within that connection — which could lead to duplicate assets if you run the workflow this way multiple times with the same settings.
Instead, when you want to re-crawl assets, re-run the existing workflow (see Re-run existing workflow below).
To create a new crawl of dbt assets from a dbt files location:
- Java
- Python
- Kotlin
- Raw REST API
Workflow crawler = DbtCrawler.creator( // (1)
client, // (2)
"dbt-snowflake", // (3)
List.of(client.getRoleCache().getIdForName("$admin")), // (4)
null,
null
)
.core( // (5)
"dbt-bucket",
"dbt-data",
"ap-south-1"
)
.limitToConnection( // (6)
"default/snowflake/1234567890"
)
.tags(true) // (7)
.enrichMaterializedAssets(true) // (8)
.build() // (9)
.toWorkflow(); // (10)
WorkflowResponse response = crawler.run(client); // (11)
-
The
DbtCrawlerpackage will create a workflow to crawl assets from dbt files location. -
You must provide Atlan client.
-
You must provide a name for the connection that the dbt assets will exist within
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers) - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from the dbt files location, you must provide the following information:
- s3 bucket containing the dbt Core files.
- prefix within the S3 bucket where the dbt Core files are located.
- s3 region where the bucket is located.
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify the list of assets to include in crawling. This is a highly-nested map structure of numeric IDs for the account, project, etc. You will almost certainly need to set up the filter you want through the UI first, and look at the developer console of your browser to see the structure.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Build the minimal package object.
-
Now, you can convert the package into a
Workflowobject. -
You can then run the workflow using the
run()method on the object you've created. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import DbtCrawler
client = AtlanClient()
crawler = (
DbtCrawler( # (1)
client=client, # (2)
connection_name="dbt-snowflake", # (3)
admin_roles=[client.role_cache.get_id_for_name("$admin")], # (4)
admin_groups=None,
admin_users=None,
)
.core( # (5)
s3_bucket="dbt-bucket",
s3_prefix="dbt-data",
s3_region="ap-south-1",
)
.limit_to_connection( # (6)
connection_qualified_name="default/snowflake/1234567890"
)
.tags(True) # (7)
.enrich_materialized_assets(True) # (8)
.to_workflow() # (9)
)
response = client.workflow.run(crawler) # (10)
-
Base configuration for a new dbt crawler.
-
You must provide a client instance.
-
You must provide a name for the connection that the dbt assets will exist within.
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers). - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from the dbt files location, you must provide the following information:
- s3 bucket containing the dbt Core files.
- prefix within the S3 bucket where the dbt Core files are located.
- s3 region where the bucket is located.
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify whether to enable dbt tag syncing as part of crawling dbt.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Now, you can convert the package into a
Workflowobject. -
Run the workflow by invoking the
run()method on the workflow client, passing the created object.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
val crawler = DbtCrawler.creator( // (1)
client, // (2)
"dbt-snowflake", // (3)
listOf(client.getRoleCache().getIdForName("\$admin")), // (4)
null,
null
)
.core( // (5)
"dbt-bucket",
"dbt-data",
"ap-south-1"
)
.limitToConnection( // (6)
"default/snowflake/1234567890"
)
.tags(true) // (7)
.enrichMaterializedAssets(true) // (8)
.build() // (9)
.toWorkflow() // (10)
val response = crawler.run(client) // (11)
-
The
DbtCrawlerpackage will create a workflow to crawl assets from dbt files location. -
You must provide Atlan client.
-
You must provide a name for the connection that the dbt assets will exist within
-
You must specify at least one connection admin, either:
- everyone in a role (in this example, all
$adminusers) - a list of groups (names) that will be connection admins.
- a list of users (names) that will be connection admins.
- everyone in a role (in this example, all
-
To configure the crawler for extracting dbt assets directly from the dbt files location, you must provide the following information:
- s3 bucket containing the dbt Core files.
- prefix within the S3 bucket where the dbt Core files are located.
- s3 region where the bucket is located.
-
You can also optionally specify the
qualifiedNameof a connection to a source (such as Snowflake), to limit the crawling of dbt assets to that existing connection in Atlan. -
You can also optionally specify the list of assets to include in crawling. This is a highly-nested map structure of numeric IDs for the account, project, etc. You will almost certainly need to set up the filter you want through the UI first, and look at the developer console of your browser to see the structure.
-
You can also optionally set whether to enable the enrichment of materialized SQL assets as part of crawling dbt.
-
Build the minimal package object.
-
Now, you can convert the package into a
Workflowobject. -
You can then run the workflow using the
run()method on the object you've created. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
We recommend creating the workflow only via the UI. To rerun an existing workflow, see the steps below.
Re-run existing workflow
To re-run an existing workflow for dbt assets:
- Java
- Python
- Kotlin
- Raw REST API
List<WorkflowSearchResult> existing = WorkflowSearchRequest // (1)
.findByType(client, DbtCrawler.PREFIX, 5); // (2)
// Determine which of the results is the dbt workflow you want to re-run...
WorkflowRunResponse response = existing.get(n).rerun(client); // (3)
-
You can search for existing workflows through the
WorkflowSearchRequestclass. -
You can find workflows by their type using the
findByType()helper method and providing the prefix for one of the packages. In this example, we do so for theDbtCrawler. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the
rerun()helper method on the workflow search result. TheWorkflowRunResponseis just a subtype ofWorkflowResponseso has the same helper method to monitor progress of the workflow run. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.- Optionally, you can use the
rerun(client, true)method with idempotency to avoid re-running a workflow that is already in running or in a pending state. This will return details of the already running workflow if found, and by default, it is set tofalse
Workflows run asynchronously - Optionally, you can use the
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import WorkflowPackage
client = AtlanClient()
existing = client.workflow.find_by_type( # (1)
prefix=WorkflowPackage.DBT, max_results=5
)
# Determine which dbt workflow (n)
# from the list of results you want to re-run.
response = client.workflow.rerun(existing[n]) # (2)
-
You can find workflows by their type using the workflow client
find_by_type()method and providing the prefix for one of the packages. In this example, we do so for theDbtCrawler. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the workflow client
rerun()method.- Optionally, you can use
rerun(idempotent=True)to avoid re-running a workflow that is already in running or in a pending state. This will return details of the already running workflow if found, and by default, it is set toFalse.
Workflows run asynchronously - Optionally, you can use
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
val existing = WorkflowSearchRequest // (1)
.findByType(client, DbtCrawler.PREFIX, 5) // (2)
// Determine which of the results is the
// dbt workflow you want to re-run...
val response = existing.get(n).rerun(client) // (3)
-
You can search for existing workflows through the
WorkflowSearchRequestclass. -
You can find workflows by their type using the
findByType()helper method and providing the prefix for one of the packages. In this example, we do so for theDbtCrawler. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the
rerun()helper method on the workflow search result. TheWorkflowRunResponseis just a subtype ofWorkflowResponseso has the same helper method to monitor progress of the workflow run. Because this operation will execute work in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.- Optionally, you can use the
rerun(true)method with idempotency to avoid re-running a workflow that is already in running or in a pending state. This will return details of the already running workflow if found, and by default, it is set tofalse
Workflows run asynchronously - Optionally, you can use the
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
- Find the existing workflow.
- Send through the resulting re-run request.
{
"from": 0,
"size": 5,
"query": {
"bool": {
"filter": [
{
"nested": {
"path": "metadata",
"query": {
"prefix": {
"metadata.name.keyword": {
"value": "atlan-dbt" // (1)
}
}
}
}
}
]
}
},
"sort": [
{
"metadata.creationTimestamp": {
"nested": {
"path": "metadata"
},
"order": "desc"
}
}
],
"track_total_hits": true
}
-
Searching by the
atlan-dbtprefix will ensure you only find existing dbt assets workflows.Name of the workflow
The name of the workflow will be nested within the _source.metadata.name property of the response object.
(Remember since this is a search, there could be multiple results, so you may want to use the other
details in each result to determine which workflow you really want.)
:::
{
"namespace": "default",
"resourceKind": "WorkflowTemplate",
"resourceName": "atlan-dbt-1684500411" // (1)
}
- Send the name of the workflow as the
resourceNameto rerun it.