Asset import package
The asset import package loads metadata from a CSV file that matches the format of one extracted using either of the asset export packages (basic or advanced).
Import assets from object store
To import assets directly from the object store:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import AssetImport
from pyatlan.model.assets import Asset
from pyatlan.model.enums import AssetInputHandling
client = AtlanClient()
workflow = (
AssetImport() # (1)
.object_store() # (2)
.s3( # (3)
access_key="test-access-key",
secret_key="test-secret-key",
bucket="my-bucket",
region="us-west-1",
)
.assets( # (4)
prefix="/test/prefix",
object_key="assets-test.csv",
input_handling=AssetInputHandling.UPSERT,
)
.assets_advanced( # (5)
remove_attributes=[Asset.CERTIFICATE_STATUS, Asset.ANNOUNCEMENT_TYPE],
fail_on_errors=True,
case_sensitive_match=False,
field_separator=",",
batch_size=20,
)
.glossaries( # (6)
prefix="/test/prefix",
object_key="glossaries-test.csv",
input_handling=AssetInputHandling.UPDATE,
)
.glossaries_advanced( # (7)
remove_attributes=[Asset.CERTIFICATE_STATUS, Asset.ANNOUNCEMENT_TYPE],
fail_on_errors=True,
field_separator=",",
batch_size=20,
)
.data_products( # (8)
prefix="/test/prefix",
object_key="data-products-test.csv",
input_handling=AssetInputHandling.UPDATE,
)
.data_product_advanced( # (9)
remove_attributes=[Asset.CERTIFICATE_STATUS, Asset.ANNOUNCEMENT_TYPE],
fail_on_errors=True,
field_separator=",",
batch_size=20,
)
).to_workflow() # (10)
response = client.workflow.run(workflow) # (11)
-
The
AssetImportloads metadata from a CSV file. -
Set up the package to import metadata directly from the object store.
-
You can use different object store methods (e.g:
s3(),gcs(),adls()). In this example, we're building a workflow usings3()and for that, you’ll need to provide the following information:- AWS access key.
- AWS secret key.
- name of the bucket/storage that contains the metadata CSV files.
- name of the AWS region.
-
(Optional) To set up the package for importing assets, provide the following information:
prefix: directory (path) within the object store from which to retrieve the file containing asset metadata.object_key: object key (filename), including its extension, within the object store and prefixinput_handling: specifies whether to allow the creation of new assets from the input CSV with full (AssetInputHandling.UPSERT) or partial assets (AssetInputHandling.PARTIAL) or only update (AssetInputHandling.UPDATE) existing assets in Atlan.
-
(Optional) To set up the package for importing assets with advanced configuration, provide the following information:
remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file.fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False).case_sensitive_match: indicates whether to use case-sensitive matching when running in update-only mode (True) or to try case-insensitive matching (False).is_table_view_agnostic: specifies whether to treat tables, views, and materialized views as interchangeable (True) or to strictly adhere to specified types in the input (False).field_separator: character used to separate fields in the input file (e.g:','or';').batch_size: maximum number of rows to process at a time (per API request).
-
(Optional) To set up the package for importing glossaries, provide the following information:
prefix: directory (path) within the object store from which to retrieve the file containing glossaries, categories and terms.object_key: object key (filename), including its extension, within the object store and prefixinput_handling: specifies whether to allow the creation of new glossaries, categories and terms from the input CSV (AssetInputHandling.UPSERT) or or make sure these are only updated (AssetInputHandling.UPDATE) if they already exist in Atlan.
-
(Optional) To set up the package for importing glossaries with advanced configuration, provide the following information:
remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file.fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False).field_separator: character used to separate fields in the input file (e.g:','or';').batch_size: maximum number of rows to process at a time (per API request).
-
(Optional) To set up the package for importing data products, provide the following information:
prefix: directory (path) within the object store from which to retrieve the file containing data domains, and data products.object_key: object key (filename), including its extension, within the object store and prefixinput_handling: specifies whether to allow the creation of new data domains, and data products from the input CSV (AssetInputHandling.UPSERT) or or make sure these are only updated (AssetInputHandling.UPDATE) if they already exist in Atlan.
-
(Optional) To set up the package for importing data domain and data products with advanced configuration, provide the following information:
remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file.fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False).field_separator: character used to separate fields in the input file (e.g:','or';').batch_size: maximum number of rows to process at a time (per API request).
-
Convert the package into a
Workflowobject. -
Run the workflow by invoking the
run()method on the workflow client, passing the created object.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how to check the status and wait until the workflow has been completed. :::
We recommend creating the workflow only via the UI. To rerun an existing workflow, see the steps below.
Re-run existing workflow
To re-run an existing asset import workflow:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import WorkflowPackage
client = AtlanClient()
existing = client.workflow.find_by_type( # (1)
prefix=WorkflowPackage.ASSET_IMPORT, max_results=5
)
# Determine which asset import workflow (n)
# from the list of results you want to re-run.
response = client.workflow.rerun(existing[n]) # (2)
-
You can find workflows by their type using the workflow client
find_by_type()method and providing the prefix for one of the packages. In this example, we do so for theAssetImport. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the workflow client
rerun()method.- Optionally, you can use
rerun(idempotent=True)to avoid re-running a workflow that's already in running or in a pending state. This will return details of the already running workflow if found, and by default, it's set toFalse.
Workflows run asynchronously - Optionally, you can use
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
- Find the existing workflow.
- Send through the resulting re-run request.
{
"from": 0,
"size": 5,
"query": {
"bool": {
"filter": [
{
"nested": {
"path": "metadata",
"query": {
"prefix": {
"metadata.name.keyword": {
"value": "csa-asset-import" // (1)
}
}
}
}
}
]
}
},
"sort": [
{
"metadata.creationTimestamp": {
"nested": {
"path": "metadata"
},
"order": "desc"
}
}
],
"track_total_hits": true
}
-
Searching by the
csa-asset-importprefix will make sure you only find existing asset import workflows.Name of the workflow
The name of the workflow will be nested within the _source.metadata.name property of the response object.
(Remember since this is a search, there could be multiple results, so you may want to use the other
details in each result to determine which workflow you really want.)
:::
{
"namespace": "default",
"resourceKind": "WorkflowTemplate",
"resourceName": "csa-asset-import-1684500411" // (1)
}
- Send the name of the workflow as the
resourceNameto rerun it.