Relational assets builder package
The relational assets builder package allows you to create (and update) net-new relational assets: connections, databases, schemas, tables, views, materialized views and columns.
Import relational assets from object store
To import relational assets directly from the object store:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import RelationalAssetsBuilder
from pyatlan.model.assets import Asset
from pyatlan.model.enums import AssetInputHandling, AssetDeltaHandling, AssetRemovalType
client = AtlanClient()
workflow = (
RelationalAssetsBuilder() # (1)
.object_store( # (2)
prefix="/test/prefix",
object_key="assets-test.csv",
)
.s3( # (3)
access_key="test-access-key",
secret_key="test-secret-key",
bucket="my-bucket",
region="us-west-1",
)
.assets_semantics( # (4)
input_handling=AssetInputHandling.UPSERT,
delta_handling=AssetDeltaHandling.INCREMENTAL,
removal_type=AssetRemovalType.ARCHIVE,
)
.options( # (5)
remove_attributes=[Asset.CERTIFICATE_STATUS, Asset.ANNOUNCEMENT_TYPE],
fail_on_errors=True,
field_separator=",",
batch_size=20,
)
).to_workflow() # (6)
response = client.workflow.run(workflow) # (7)
-
The
RelationalAssetsBuilderallows you to create (and update) net-new relational assets. -
To set up the package for importing metadata directly from the object store, provide the following information:
prefix: directory (path) within the bucket/container from which to retrieve the objects.object_key: object key (filename), including its extension, within the bucket/container and prefix.
-
You can use different object store methods (e.g:
s3(),gcs(),adls()). In this example, we're building a workflow usings3()and for that, you’ll need to provide the following information:- AWS access key.
- AWS secret key.
- name of the bucket/storage that contains the metadata CSV files.
- name of the AWS region.
-
To set up the package to import metadata with semantics, you need to provide:
input_handling: whether to allow the creation of new full (AssetInputHandling.UPSERT) or partial (AssetInputHandling.PARTIAL) assets from the input CSV, or make sure assets are only updated (AssetInputHandling.UPDATED) if they already exist in Atlan.delta_handling: whether to treat the input file as an initial load, full replacement [AssetDeltaHandling.FULL_REPLACEMENT] (deleting any existing assets not in the file) or only incremental [AssetDeltaHandling.INCREMENTAL] (no deletion of existing assets).removal_type: ifdelta_handlingis set toFULL_REPLACEMENT, this parameter specifies whether to delete any assets not found in the latest file by archive (recoverable) [AssetRemovalType.ARCHIVE] or purge (non-recoverable) [AssetRemovalType.PURGE]. Ifdelta_handlingis set toINCREMENTAL, this parameter is ignored and assets are archived.
-
(Optional) To set up the package for importing relational assets with advanced configuration, provide the following information:
remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file.fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False).field_separator: character used to separate fields in the input file (e.g:','or';').batch_size: maximum number of rows to process at a time (per API request).
-
Convert the package into a
Workflowobject. -
Run the workflow by invoking the
run()method on the workflow client, passing the created object.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how to check the status and wait until the workflow has been completed. :::
We recommend creating the workflow only via the UI. To rerun an existing workflow, see the steps below.
Re-run existing workflow
To re-run an existing relational assets builder workflow:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import WorkflowPackage
client = AtlanClient()
existing = client.workflow.find_by_type( # (1)
prefix=WorkflowPackage.RELATIONAL_ASSETS_BUILDER, max_results=5
)
# Determine which relational assets builder workflow (n)
# from the list of results you want to re-run.
response = client.workflow.rerun(existing[n]) # (2)
-
You can find workflows by their type using the workflow client
find_by_type()method and providing the prefix for one of the packages. In this example, we do so for theRelationalAssetsBuilder. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the workflow client
rerun()method.- Optionally, you can use
rerun(idempotent=True)to avoid re-running a workflow that's already in running or in a pending state. This will return details of the already running workflow if found, and by default, it's set toFalse.
Workflows run asynchronously - Optionally, you can use
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
- Find the existing workflow.
- Send through the resulting re-run request.
{
"from": 0,
"size": 5,
"query": {
"bool": {
"filter": [
{
"nested": {
"path": "metadata",
"query": {
"prefix": {
"metadata.name.keyword": {
"value": "csa-relational-assets-builder" // (1)
}
}
}
}
}
]
}
},
"sort": [
{
"metadata.creationTimestamp": {
"nested": {
"path": "metadata"
},
"order": "desc"
}
}
],
"track_total_hits": true
}
-
Searching by the
csa-relational-assets-builderprefix will make sure you only find existing relational assets builder workflows.Name of the workflow
The name of the workflow will be nested within the _source.metadata.name property of the response object.
(Remember since this is a search, there could be multiple results, so you may want to use the other
details in each result to determine which workflow you really want.)
:::
{
"namespace": "default",
"resourceKind": "WorkflowTemplate",
"resourceName": "csa-relational-assets-builder-1684500411" // (1)
}
- Send the name of the workflow as the
resourceNameto rerun it.