Lineage builder package
The lineage builder package allows you to create lineage between any source and any target asset.
Retrieve lineage file from object store
To retrieve the lineage file from cloud object storage:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import LineageBuilder
from pyatlan.model.enums import AssetInputHandling
client = AtlanClient()
workflow = (
LineageBuilder() # (1)
.object_store( # (2)
prefix="text-prefix",
object_key="test-object-key"
)
.s3( # (3)
access_key="test-access-key",
secret_key="test-secret-key",
region="test-region",
bucket="test-bucket",
)
.options( # (4)
input_handling=AssetInputHandling.UPSERT,
fail_on_errors=True,
case_sensitive_match=False,
field_separator=",",
batch_size=20,
)
).to_workflow() # (5)
response = client.workflow.run(workflow) # (6)
-
Lineage builder package allows you to create lineage between any source and any target asset.
-
Set up the package to retrieve the lineage file from cloud object storage.
-
You can use different object store methods (e.g:
s3(),gcs(),adls()). In this example, we're building a workflow usings3()and for that, you’ll need to provide the following information:- AWS access key.
- AWS secret key.
- name of the bucket/storage that contains the metadata CSV files.
- name of the AWS region.
-
You can provide other following
options():input_handling: specifies whether to allow the creation of new assets from the input CSV (full (UPSERT) or partial (PARTIAL) assets) or only update existing (UPDATE) assets in Atlan.fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False).case_sensitive_match: indicates whether to use case-sensitive matching when running in update-only mode (True) or to try case-insensitive matching (False).field_separator: character used to separate fields in the input file (e.g:','or';').batch_size: maximum number of rows to process at a time (per API request).
-
Convert the package into a
Workflowobject. -
Run the workflow by invoking the
run()method on the workflow client, passing the created object.Workflows run asynchronously
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how to check the status and wait until the workflow has been completed. :::
We recommend creating the workflow only via the UI. To rerun an existing workflow, see the steps below.
Re-run existing workflow
To re-run an existing lineage builder workflow:
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import WorkflowPackage
client = AtlanClient()
existing = client.workflow.find_by_type( # (1)
prefix=WorkflowPackage.LINEAGE_BUILDER, max_results=5
)
# Determine which lineage builder workflow (n)
# from the list of results you want to re-run.
response = client.workflow.rerun(existing[n]) # (2)
-
You can find workflows by their type using the workflow client
find_by_type()method and providing the prefix for one of the packages. In this example, we do so for theLineageBuilder. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) -
Once you've found the workflow you want to re-run, you can simply call the workflow client
rerun()method.- Optionally, you can use
rerun(idempotent=True)to avoid re-running a workflow that's already in running or in a pending state. This will return details of the already running workflow if found, and by default, it's set toFalse.
Workflows run asynchronously - Optionally, you can use
Remember that workflows run asynchronously. See the packages and workflows introduction for details on how you can check the status and wait until the workflow has been completed. :::
- Find the existing workflow.
- Send through the resulting re-run request.
{
"from": 0,
"size": 5,
"query": {
"bool": {
"filter": [
{
"nested": {
"path": "metadata",
"query": {
"prefix": {
"metadata.name.keyword": {
"value": "csa-lineage-builder" // (1)
}
}
}
}
}
]
}
},
"sort": [
{
"metadata.creationTimestamp": {
"nested": {
"path": "metadata"
},
"order": "desc"
}
}
],
"track_total_hits": true
}
-
Searching by the
csa-lineage-builderprefix will make sure you only find existing asset import workflows.Name of the workflow
The name of the workflow will be nested within the _source.metadata.name property of the response object.
(Remember since this is a search, there could be multiple results, so you may want to use the other
details in each result to determine which workflow you really want.)
:::
{
"namespace": "default",
"resourceKind": "WorkflowTemplate",
"resourceName": "csa-lineage-builder-1684500411" // (1)
}
- Send the name of the workflow as the
resourceNameto rerun it.