Manage lineage
Create lineage between assets
Directly
To create lineage between assets, you need to create a Process entity.
Note that the assets you reference as the inputs and outputs of the process must already exist, before creating the process.
- Java
- Python
- Kotlin
- Raw REST API
LineageProcess process = LineageProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
List.of( // (5)
Table.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Table.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS")),
List.of( // (6)
Table.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS")),
null) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build();
AssetMutationResponse response = process.save(client); // (10)
assert response.getCreatedAssets().size() == 1 // (11)
assert response.getUpdatedAssets().size() == 5 // (12)
-
Use the
creator()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the process will be shown in the UI.
-
Provide the
qualifiedNameof the connection that ran the process.Tips for the connection
The process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the process:
- You could use the same connection
qualifiedNameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualifiedNameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualifiedNamefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send
nulland the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.
By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.
:::
5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the static
<Type>.refByGuid()method) - its
qualifiedName(for the static<Type>.refByQualifiedName()method)
- Provide the list of outputs to the process. Note that each of these is again only a
Referenceto an asset. - (Optional) Provide the parent
LineageProcessin which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also sendnullfor this parameter (as in this example). - (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
- (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
- Call the
save()method to actually create the process. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single lineage process asset that was created.
- The response will also include the 5 data assets (3 inputs, 2 outputs) that were updated.
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Process, Table
client = AtlanClient()
process = Process.creator( # (1)
name="Source 1, Source 2, Source 3 -> Target 1, Target 2", # (2)
connection_qualified_name="default/snowflake/1657025257", # (3)
process_id="dag_123", # (4)
inputs=[ # (5)
Table.ref_by_guid(guid="495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Table.ref_by_guid(guid="d002dead-1655-4d75-abd6-ad889fa04bd4"),
Table.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS"),
],
outputs=[ # (6)
Table.ref_by_guid(guid="86d9a061-7753-4884-b988-a02d3954bc24"),
Table.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS"),
],
) # (7)
process.sql = "select * from somewhere;" # (8)
process.source_url = "https://your.orchestrator/unique/id/123" # (9)
response = client.asset.save(process) # (10)
assert (processes := response.assets_created(Process)) # (11)
assert len(processes) == 1 # (12)
assert (tables := response.assets_updated(Table)) # (13)
assert len(tables) == 2 # (14)
-
Use the
create()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the process will be shown in the UI.
-
Provide the
qualified_nameof the connection that ran the process.Tips for the connection
The process itself must be created within a connection for both access control and icon labelling. Use a connection qualified_name that indicates the system that ran the process:
- You could use the same connection
qualified_nameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualified_nameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualified_namefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also leave it out and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.
Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.
By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.
:::
5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the
ref_by_guid()method) - its
qualifiedName(for theref_by_qualified_name()method)
- Provide the list of outputs to the process. Note that each of these is again only a
Referenceto an asset. - (Optional) Provide the parent
Processin which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also sendNonefor this parameter (as in this example). - (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
- (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
- Call the
save()method to actually create the process. - Check that a
Processwas created. - Check that only 1
Processwas created. - Check that tables were updated.
- Check that 5 tables (3 inputs, 2 outputs) were updated.
val process = LineageProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
listOf<ICatalog>( // (5)
Table.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Table.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS")),
listOf<ICatalog>( // (6)
Table.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS")),
null) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build()
val response = process.save(client) // (10)
assert(response.createdAssets.size == 1) // (11)
assert(response.updatedAssets.size == 5) // (12)
-
Use the
creator()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the process will be shown in the UI.
-
Provide the
qualifiedNameof the connection that ran the process.Tips for the connection
The process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the process:
- You could use the same connection
qualifiedNameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualifiedNameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualifiedNamefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send
nulland the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.
By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.
:::
5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the static
<Type>.refByGuid()method) - its
qualifiedName(for the static<Type>.refByQualifiedName()method)
- Provide the list of outputs to the process. Note that each of these is again only a
Referenceto an asset. - (Optional) Provide the parent
LineageProcessin which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also sendnullfor this parameter (as in this example). - (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
- (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
- Call the
save()method to actually create the process. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single lineage process asset that was created.
- The response will also include the 5 data assets (3 inputs, 2 outputs) that were updated.
{
"entities": [ // (1),
{
"typeName": "Table",
"guid": "d002dead-1655-4d75-abd6-ad889fa04bd4"
},
{
"typeName": "Table",
"uniqueAttributes": {
"qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS"
}
}
],
"outputs": [ // (6),
{
"typeName": "Table",
"uniqueAttributes": {
"qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS"
}
}
]
}
}
]
}
- All assets must be wrapped in an
entitiesarray. - You must provide the exact type name for a
Processasset (case-sensitive). - You must provide a name of the integration process.
- You must provide a unique
qualifiedNamefor the integration process (case-sensitive). - You must list all of the input assets to the process. These can be referenced by GUID or by
qualifiedName. - You must list all of the output assets from the process. These can also be referenced by either GUID or
qualifiedName.
Using OpenLineage
Creating connection for OpenLineage
You must first configure OpenLineage before creating lineage between assets. You can either configure a Spark Assets connection in Atlan before sending any OpenLineage events. (You can skip the Configure the integration in Apache Spark section), or you can follow the steps below to create the Spark connection via SDKs.
- Java
- Python
- Kotlin
- Raw REST API
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import AtlanConnectorType
client = AtlanClient()
admin_role_guid = client.role_cache.get_id_for_name("$admin") #(1)
spark_connection = client.open_lineage.create_connection( #(2)
name="open_lineage_connection",
connector_type = AtlanConnectorType.SPARK,
admin_roles=[admin_role_guid],
admin_users=["jsmith"],
admin_groups=["group2"],
)
-
Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
-
To create OpenLineage connection using the
open_lineage.create_connection()method. Below params are required:-
name: Provide a human-readable name for your connections. -
connector_type: Set the type of connection. Defaults toAtlanConnectorType.SPARK. -
(Optional)
admin_roles: List the workspace roles that should be able to administer the connection (if any, defaults toNone). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUIDs of the workspace roles. At least one of admin_roles, admin_groups, or admin_users must be provided. -
(Optional)
admin_users: List the user names that can administer this connection (if any, defaults toNone). Note that the values here are the usernames of the users. At least one of admin_roles, admin_groups, or admin_users must be provided. -
(Optional)
admin_groups: List the group names that can administer this connection (if any, defaults toNone). All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one of admin_roles, admin_groups, or admin_users must be provided.warning
-
Note: At least one of the optional parameters admin_roles, admin_users, or admin_groups must be provided to successfully create the connection.
:::
{
"authType": "atlan_api_key",// (1)
"name": "default-spark-1716979138-0", //(2)
"connector": "spark", // (3)
"connectorConfigName": "atlan-connectors-spark", // (4)
"connectorType": "event", // (5)
"extra": {
"events.enable-partial-assets": true,
"events.enabled": true,
"events.topic": "openlineage_spark", // (6)
"events.urlPath": "/events/openlineage/spark/api/v1/lineage"// (7)
}
}
- The
authTypemust be exactlyatlan_api_key. - Human-readable name for your credential which should follow the pattern:
default-spark-<epoch>-0, where<epoch>is the time in milliseconds at which the credential is being created. - The
connectormust be exactlyspark. - The
connectorConfigNamemust be exactlyatlan-connectors-spark. - The
connectorTypemust be exactlyevent. - The
events.topicmust be exactlyopenlineage_spark. - The
events.urlPathmust be exactly/events/openlineage/spark/api/v1/lineages.
{
"entities": [
{
"typeName": "Connection", // (1)
"attributes": {
"name": "open_lineage_connection", // (2)
"connectorName": "spark", // (3)
"qualifiedName": "default/spark/123456789", // (4)
"category": "connector", // (5)
"defaultCredentialGuid": "8b579147-6054-4a4c-8137-463cd349b393", // (6)
"adminRoles": [ // (7)
"e7ae0295-c60a-469a-bd2c-fb903943aa02"
],
"adminGroups": [ // (8)
"group2"
],
"adminUsers": [ // (9)
"jsmith"
]
}
}
]
}
-
The
typeNamemust be exactlyConnection. -
Human-readable name for your connection, such as
productionordevelopment. -
The
connectorNamemust be exactlyspark.Determines the icon
This determines the icon that Atlan will use for all the assets in the connection. If you use a value that's not a known value, you will have a default gear icon instead.
:::
4. The qualifiedName should follow the pattern: default/spark/<epoch>, where <epoch> is the time in milliseconds at which the connection is being created.
5. The category must be exactly connector.
6. The defaultCredentialGuid should be obtained from the id in the response of the previous request.
7. List any workspace roles that can administer this connection. All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUIDs of the workspace roles. At least one of adminRoles, adminGroups, or adminUsers must be provided.
8. List any groups that can administer this connection. All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one of adminRoles, adminGroups, or adminUsers must be provided.
9. List any users that can administer this connection. Note that the values here are the usernames of the users. At least one of adminRoles, adminGroups, or adminUsers must be provided.
Creating lineage between assets using OpenLineage
To create lineage between assets through OpenLineage, you need to send at least two events: one indicating the start of a job run and the other indicating that job run is finished.
- Java
- Python
- Kotlin
- Raw REST API
String snowflake = "snowflake://abc123.snowflakecomputing.com"; // (1)
OpenLineageJob olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build();
OpenLineageRun olr = OpenLineageRun.creator(olj).build(); // (3)
OpenLineageInputDataset inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build(); // (4)
OpenLineageOutputDataset outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS")
.build(); // (5)
OpenLineageEvent start = OpenLineageEvent.creator( // (6)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (7)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (8)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build();
start.emit(client); // (9)
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
- A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. - You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - Each run of a job must consist of at least two events—a
STARTevent indicating when the job ran began, and some terminal state indicating when the job run finished. - You can chain any number of
inputs to the event to indicate the source datasets for the lineage. - You can chain any number of
outputs to the event to indicate the target datasets for the lineage. - Use the
emit()method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successfulemit()will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
OpenLineageEvent complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build();
complete.emit(client); // (2)
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import OpenLineageEventType
from pyatlan.model.open_lineage import OpenLineageEvent, OpenLineageJob, OpenLineageRun
client = AtlanClient()
snowflake = "snowflake://abc123.snowflakecomputing.com" # (1)
job = OpenLineageJob.creator( # (2)
connection_name="ol-spark",
job_name="dag_123",
producer="https://your.orchestrator/unique/id/123"
)
run = OpenLineageRun.creator(job=job) # (3)
input_dataset = job.create_input(
namespace=snowflake, asset_name="OPS.DEFAULT.RUN_STATS"
) # (4)
output_dataset = job.create_output(
namespace=snowflake, asset_name="OPS.DEFAULT.FULL_STATS"
) # (5)
start = OpenLineageEvent.creator(
run=run, event_type=OpenLineageEventType.START
) # (6)
start.inputs = [
input_dataset,
job.create_input(namespace=snowflake, asset_name="SOME.OTHER.TBL"),
job.create_input(namespace=snowflake, asset_name="AN.OTHER.TBL"),
] # (7)
start.outputs = [
output_dataset,
job.create_output(namespace=snowflake, asset_name="AN.OTHER.VIEW")
] # (8)
start.emit(client=client) # (9)
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
- A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. - You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - Each run of a job must consist of at least two events—a
STARTevent indicating when the job ran began, and some terminal state indicating when the job run finished. - You can chain any number of
inputs to the event to indicate the source datasets for the lineage. - You can chain any number of
outputs to the event to indicate the target datasets for the lineage. - Use the
emit()method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successfulemit()will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed.
complete = OpenLineageEvent.creator(
run=run, event_type=OpenLineageEventType.COMPLETE
) # (1)
complete.emit(client=client) # (2)
- Since each run of a job must consist of at least two events,
don't forget to send the terminal state indicating when the job
has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously).
val snowflake = "snowflake://abc123.snowflakecomputing.com" // (1)
val olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build()
val olr = OpenLineageRun.creator(olj).build() // (3)
val inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build() // (4)
val outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS")
.build() // (5)
val start = OpenLineageEvent.creator( // (6)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (7)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (8)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build()
start.emit(client) // (9)
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
- A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. - You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. - Each run of a job must consist of at least two events—a
STARTevent indicating when the job ran began, and some terminal state indicating when the job run finished. - You can chain any number of
inputs to the event to indicate the source datasets for the lineage. - You can chain any number of
outputs to the event to indicate the target datasets for the lineage. - Use the
emit()method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successfulemit()will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
val complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build()
complete.emit(client) // (2)
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
{
"eventTime": "2024-07-01T08:23:37.491542Z", // (1)
"producer": "https://your.orchestrator/unique/id/123", // (2)
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START", // (3)
"job": { // (4)
"namespace": "ol-spark",
"name": "dag_123",
"facets": {}
},
"run": { // (5)
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53",
"facets": {}
},
"inputs": [ // (6)
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "SOME.OTHER.TBL",
"facets": {}
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "AN.OTHER.TBL",
"facets": {}
}
],
"outputs": [ // (7)
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "AN.OTHER.VIEW",
"facets": {}
}
]
}
- Each event for a job run must have a time at which the event occurred.
- Each event must have a URI indicating the code or system responsible for producing this lineage.
- Each run of a job must consist of at least two events—a
STARTevent indicating when the job ran began, and some terminal state indicating when the job run finished. - Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan) as its
namespace, - a unique job name (used to idempotently update the same job with multiple runs)
- the name of a connection (that already exists in Atlan) as its
- A job must be run at least once for any lineage to exist, and each event for the same run of a job must be associated with the same
runId. - You can define any number of inputs (sources) for lineage.
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
- Datasets used in data lineage need a
- You can define any number of outputs (targets) for lineage.
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
- Datasets used in data lineage need a
{
"eventTime": "2024-07-01T08:23:38.360567Z",
"producer": "https://your.orchestrator/unique/id/123",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE", // (1)
"run": {
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53",
"facets": {}
},
"job": {
"namespace": "ol-spark",
"name": "dag_123",
"facets": {}
}
}
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.)
Create lineage between columns
Directly
To create lineage between relational asset columns,
it's necessary to create a ColumnProcess entity.
Before creating the ColumnProcess, verify lineage already exists between the associated relational assets, and make sure that the columns referenced as inputs and outputs already exist.
- Java
- Python
- Kotlin
- Raw REST API
ColumnProcess columnProcess = ColumnProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
List.of( // (5)
Column.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Column.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS/COLUMN")),
List.of( // (6)
Column.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS/COLUMN")),
Process.refByGuid("76d9a061-7753-9884-b988-a02d3954bc25")) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build();
AssetMutationResponse response = columnProcess.save(client); // (10)
assert response.getCreatedAssets().size() == 1 // (11)
assert response.getUpdatedAssets().size() == 5 // (12)
-
Use the
creator()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the column process will be shown in the UI.
-
Provide the
qualifiedNameof the connection that ran the column process.Tips for the connection
The column process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the column process:
- You could use the same connection
qualifiedNameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualifiedNameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualifiedNamefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the column process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send
nulland the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the column process.Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra column processes in lineage as this process itself changes over time.
By using your own ID for the column process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.
:::
5. Provide the list of inputs to the column process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the static
<Type>.refByGuid()method) - its
qualifiedName(for the static<Type>.refByQualifiedName()method)
- Provide the list of outputs to the column process. Note that each of these is again only a
Referenceto an asset. - Provide the parent
LineageProcessin which this process ran since this process is a subprocess of some higher-level process. - (Optional) You can also add other properties to the column process, such as SQL code that runs within the column process.
- (Optional) You can also provide a link to the column process, which will provide a button to click to go to that link from the Atlan UI when viewing the column process in Atlan.
- Call the
save()method to actually create the column process. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single column process asset that was created.
- The response will also include the 5 column assets (3 inputs, 2 outputs) that were updated.
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Process, ColumnProcess, Column
client = AtlanClient()
column_process = ColumnProcess.creator( # (1)
name="Source 1, Source 2, Source 3 -> Target 1, Target 2", # (2)
connection_qualified_name="default/snowflake/1657025257", # (3)
process_id="dag_123", # (4)
inputs=[ # (5)
Column.ref_by_guid(guid="495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Column.ref_by_guid(guid="d002dead-1655-4d75-abd6-ad889fa04bd4"),
Column.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS/COLUMN"),
],
outputs=[ # (6)
Column.ref_by_guid(guid="86d9a061-7753-4884-b988-a02d3954bc24"),
Column.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS/COLUMN"),
],
parent=Process.ref_by_guid("76d9a061-7753-9884-b988-a02d3954bc25"),
) # (7)
column_process.sql = "select * from somewhere;" # (8)
column_process.source_url = "https://your.orchestrator/unique/id/123" # (9)
response = client.asset.save(column_process) # (10)
assert (column_processes := response.assets_created(ColumnProcess)) # (11)
assert len(column_processes) == 1 # (12)
assert (columns := response.assets_updated(Column)) # (13)
assert len(columns) == 2 # (14)
-
Use the
create()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the column process will be shown in the UI.
-
Provide the
qualified_nameof the connection that ran the column process.Tips for the connection
The column process itself must be created within a connection for both access control and icon labelling. Use a connection qualified_name that indicates the system that ran the column process:
- You could use the same connection
qualified_nameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualified_nameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualified_namefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the column process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also leave it out and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the column process.
Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra column processes in lineage as this column process itself changes over time.
By using your own ID for the column process, any changes that occur in that column process over time (even if the inputs or outputs change) the same single column process in Atlan will be updated.
:::
5. Provide the list of inputs to the column process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the
ref_by_guid()method) - its
qualifiedName(for theref_by_qualified_name()method)
- Provide the list of outputs to the column process. Note that each of these is again only a
Referenceto an asset. - Provide the parent
Processin which this process ran since this process is a subprocess of some higher-level process. - (Optional) You can also add other properties to the column process, such as SQL code that runs within the column process.
- (Optional) You can also provide a link to the column process, which will provide a button to click to go to that link from the Atlan UI when viewing the column process in Atlan.
- Call the
save()method to actually create the column process. - Check that a
ColumnProcesswas created. - Check that only 1
ColumnProcesswas created. - Check that tables were updated.
- Check that 5 tables (3 inputs, 2 outputs) were updated.
val columnProcess = ColumnProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
listOf<ICatalog>( // (5)
Column.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Column.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS/COLUMN")),
listOf<ICatalog>( // (6)
Column.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS/COLUMN")),
Process.refByGuid("76d9a061-7753-9884-b988-a02d3954bc25")) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build()
val response = columnProcess.save(client) // (10)
assert(response.createdAssets.size == 1) // (11)
assert(response.updatedAssets.size == 5) // (12)
-
Use the
creator()method to initialize the object with all necessary attributes for creating it. -
Provide a name for how the column process will be shown in the UI.
-
Provide the
qualifiedNameof the connection that ran the column process.Tips for the connection
The column process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the column process:
- You could use the same connection
qualifiedNameas the source system, if it was the source system "pushing" data to the targets. - You could use the same connection
qualifiedNameas the target system, if it was the target system "pulling" data from the sources. - You could use a different connection
qualifiedNamefrom either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
-
(Optional) Provide the unique ID of the column process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send
nulland the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the column process.Use your own ID if you can
While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra column processes in lineage as this process itself changes over time.
By using your own ID for the column process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.
:::
5. Provide the list of inputs to the column process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:
- its GUID (for the static
<Type>.refByGuid()method) - its
qualifiedName(for the static<Type>.refByQualifiedName()method)
- Provide the list of outputs to the column process. Note that each of these is again only a
Referenceto an asset. - Provide the parent
LineageProcessin which this process ran since this process is a subprocess of some higher-level process. - (Optional) You can also add other properties to the column process, such as SQL code that runs within the column process.
- (Optional) You can also provide a link to the column process, which will provide a button to click to go to that link from the Atlan UI when viewing the column process in Atlan.
- Call the
save()method to actually create the column process. Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single column process asset that was created.
- The response will also include the 5 column assets (3 inputs, 2 outputs) that were updated.
{
"entities": [ // (1),
{
"typeName": "Column",
"guid": "d002dead-1655-4d75-abd6-ad889fa04bd4"
},
{
"typeName": "Column",
"uniqueAttributes": {
"qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS"
}
}
],
"outputs": [ // (6),
{
"typeName": "Column",
"uniqueAttributes": {
"qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS"
}
}
],
"process": { // (7)
"guid": "76d9a061-7753-9884-b988-a02d3954bc25",
"typeName": "Process",
"uniqueAttributes": {
"qualifiedName": "default/snowflake/1657025257/parent_123"
}
}
}
}
]
}
- All assets must be wrapped in an
entitiesarray. - You must provide the exact type name for a
ColumnProcessasset (case-sensitive). - You must provide a name of the integration column process.
- You must provide a unique
qualifiedNamefor the integration column process (case-sensitive). - You must list all of the input assets to the column process. These can be referenced by GUID or by
qualifiedName. - You must list all of the output assets from the column process. These can also be referenced by either GUID or
qualifiedName. - You must provide the parent
LineageProcessin which this process ran since this process is a subprocess of some higher-level process.
Using OpenLineage
To create column-lineage between assets through OpenLineage, you need only extend the details of the outputs you send in your OpenLineage events.
You must first configure a Spark Assets connection in Atlan before sending any OpenLineage events. (You can skip the Configure the integration in Apache Spark section.)
- Java
- Python
- Kotlin
- Raw REST API
String snowflake = "snowflake://abc123.snowflakecomputing.com"; // (1)
OpenLineageJob olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build();
OpenLineageRun olr = OpenLineageRun.creator(olj).build(); // (3)
OpenLineageInputDataset inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build(); // (4)
OpenLineageOutputDataset outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS") // (5)
.toField( // (6)
"COLUMN", // (7)
listOf( // (8)
inputDataset.fromField("COLUMN").build(),
inputDataset.fromField("ONE").build(),
inputDataset.fromField("TWO").build(),
),
)
.toField(
"ANOTHER",
listOf(
inputDataset.fromField("THREE").build(),
),
)
.build();
OpenLineageEvent start = OpenLineageEvent.creator( // (9)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (10)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (11)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build();
start.emit(client); // (12)
-
Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. -
Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
-
A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. -
You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
For column-level lineage, you specify the mapping only on the target (outputs) end of the lineage, by chaining a
toFieldfor each output column. -
Each key for such a
toField()chain is the name of a field (column) in the output dataset. -
You can then provide a list that defines all input (source) fields that map to this output field in column-level lineage.
Create input fields from input datasets
You can quickly create such a input (source) field from an input dataset using the fromField() method and the name of the column in that input dataset.
:::
9. Each run of a job must consist of at least two events—a START event indicating when the job ran began, and some terminal state indicating when the job run finished.
10. You can chain any number of inputs to the event to indicate the source datasets for the lineage.
11. You can chain any number of outputs to the event to indicate the target datasets for the lineage.
12. Use the emit() method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successful emit() will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
OpenLineageEvent complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build();
complete.emit(client); // (2)
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import OpenLineageEventType
from pyatlan.model.open_lineage import OpenLineageEvent, OpenLineageJob, OpenLineageRun
client = AtlanClient()
snowflake = "snowflake://abc123.snowflakecomputing.com" # (1)
job = OpenLineageJob.creator( # (2)
connection_name="ol-spark",
job_name="dag_123",
producer="https://your.orchestrator/unique/id/123"
)
run = OpenLineageRun.creator(job=job) # (3)
input_dataset = job.create_input(
namespace=snowflake, asset_name="OPS.DEFAULT.RUN_STATS"
) # (4)
output_dataset = job.create_output(
namespace=snowflake, asset_name="OPS.DEFAULT.FULL_STATS"
) # (5)
output_dataset.to_fields = [ # (6),
{
"ANOTHER": [
input_dataset.from_field(field_name="THREE"),
]
},
]
start = OpenLineageEvent.creator(
run=run, event_type=OpenLineageEventType.START
) # (9)
start.inputs = [
input_dataset,
job.create_input(namespace=snowflake, asset_name="SOME.OTHER.TBL"),
job.create_input(namespace=snowflake, asset_name="AN.OTHER.TBL"),
] # (10)
start.outputs = [
output_dataset,
job.create_output(namespace=snowflake, asset_name="AN.OTHER.VIEW")
] # (11)
start.emit() # (12)
-
Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. -
Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
-
A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. -
You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
For column-level lineage, you specify the mapping only on the target (outputs) end of the lineage to the
to_fieldsattribute. -
Each key is the name of a field (column) in the output dataset.
-
You can then provide a list that defines all input (source) fields that map to this output field in column-level lineage.
Create input fields from input datasets
You can quickly create such a input (source) field from an input dataset using the from_Field() method and the name of the column in that input dataset.
:::
9. Each run of a job must consist of at least two events—a START event indicating when the job ran began, and some terminal state indicating when the job run finished.
10. You can chain any number of inputs to the event to indicate the source datasets for the lineage.
11. You can chain any number of outputs to the event to indicate the target datasets for the lineage.
12. Use the emit() method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successful emit() will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed.
complete = OpenLineageEvent.creator(
run=run, event_type=OpenLineageEventType.COMPLETE
) # (1)
complete.emit() # (2)
- Since each run of a job must consist of at least two events,
don't forget to send the terminal state indicating when the job
has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously).
val snowflake = "snowflake://abc123.snowflakecomputing.com" // (1)
val olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build()
val olr = OpenLineageRun.creator(olj).build() // (3)
val inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build() // (4)
val outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS") // (5)
.toField( // (6)
"COLUMN", // (7)
listOf( // (8)
inputDataset.fromField("COLUMN").build(),
inputDataset.fromField("ONE").build(),
inputDataset.fromField("TWO").build(),
),
)
.toField(
"ANOTHER",
listOf(
inputDataset.fromField("THREE").build(),
),
)
.build()
val start = OpenLineageEvent.creator( // (9)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (10)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (11)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build()
start.emit(client) // (12)
-
Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. -
Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan),
- a unique job name (used to idempotently update the same job with multiple runs), and
- a unique URI indicating the code or system responsible for producing this lineage.
-
A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through
OpenLineageRunobjects. -
You can define any number of inputs (sources) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
You can define any number of outputs (targets) for lineage. The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME. -
For column-level lineage, you specify the mapping only on the target (outputs) end of the lineage, by chaining a
toFieldfor each output column. -
Each key for such a
toField()chain is the name of a field (column) in the output dataset. -
You can then provide a list that defines all input (source) fields that map to this output field in column-level lineage.
Create input fields from input datasets
You can quickly create such a input (source) field from an input dataset using the fromField() method and the name of the column in that input dataset.
:::
9. Each run of a job must consist of at least two events—a START event indicating when the job ran began, and some terminal state indicating when the job run finished.
10. You can chain any number of inputs to the event to indicate the source datasets for the lineage.
11. You can chain any number of outputs to the event to indicate the target datasets for the lineage.
12. Use the emit() method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successful emit() will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
val complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build()
complete.emit(client) // (2)
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.) - Once again, use the
emit()method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it anAtlanClientthrough which to connect to the tenant.
{
"eventTime": "2024-07-01T08:23:37.491542Z", // (1)
"producer": "https://your.orchestrator/unique/id/123", // (2)
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START", // (3)
"job": { // (4)
"namespace": "ol-spark",
"name": "dag_123",
"facets": {}
},
"run": { // (5)
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53",
"facets": {}
},
"inputs": [ // (6)
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "SOME.OTHER.TBL",
"facets": {}
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "AN.OTHER.TBL",
"facets": {}
}
],
"outputs": [ // (7),
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "OPS.DEFAULT.RUN_STATS",
"field": "ONE"
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "OPS.DEFAULT.RUN_STATS",
"field": "TWO"
}
]
},
"ANOTHER": {
"inputFields": [
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "OPS.DEFAULT.RUN_STATS",
"field": "THREE"
}
]
}
}
}
}
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "AN.OTHER.VIEW",
"facets": {}
}
]
}
- Each event for a job run must have a time at which the event occurred.
- Each event must have a URI indicating the code or system responsible for producing this lineage.
- Each run of a job must consist of at least two events—a
STARTevent indicating when the job ran began, and some terminal state indicating when the job run finished. - Lineage is tracked through jobs. Each job must have:
- the name of a connection (that already exists in Atlan) as its
namespace, - a unique job name (used to idempotently update the same job with multiple runs)
- the name of a connection (that already exists in Atlan) as its
- A job must be run at least once for any lineage to exist, and each event for the same run of a job must be associated with the same
runId. - You can define any number of inputs (sources) for lineage.
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
- Datasets used in data lineage need a
- You can define any number of outputs (targets) for lineage.
- Datasets used in data lineage need a
namespacethat follows the source-specific naming standards of OpenLineage. - The
nameof a dataset should use a.-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
- Datasets used in data lineage need a
- For column-level lineage, you specify the mapping only on the target (outputs) end of the lineage, by including a
columnLineagefacet with an embeddedfieldsobject. - Each key for the
fieldsobject is the name of a field (column) in the output dataset. - You can then provide a list that defines all input (source) fields that map to this output field in column-level lineage.
{
"eventTime": "2024-07-01T08:23:38.360567Z",
"producer": "https://your.orchestrator/unique/id/123",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE", // (1)
"run": {
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53",
"facets": {}
},
"job": {
"namespace": "ol-spark",
"name": "dag_123",
"facets": {}
}
}
- Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a
COMPLETEor had some error with aFAIL.)
(Optional) Send raw OpenLineage events
If you have raw OpenLineage run events that you want to send directly to Atlan, use the OpenLineageEvent.emit_raw() method:
Raw events bypass model validation
Unlike events constructed through the SDK, raw events are passed without strict validation. Only basic type validation occurs (string, JSON string, list[dict], or dict). This gives you flexibility but requires ensuring your event structure is correct.
Use with caution - malformed events may cause processing issues.
Python
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.open_lineage import OpenLineageEvent
from pyatlan.model.enums import AtlanConnectorType
# Example raw OpenLineage events
events = [
{
"eventTime": "2025-01-04T21:35:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START", # Job start event
"run": {
"runId": "event-1-run-id", # Unique run identifier
"facets": {}
},
"job": {
"namespace": "default", # Connection name in Atlan
"name": "job_1" # Unique job name
}
},
{
"eventTime": "2025-01-04T21:36:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE", # Successful job completion
"run": {
"runId": "event-2-run-id",
"facets": {}
},
"job": {
"namespace": "default",
"name": "job_2"
}
},
{
"eventTime": "2025-01-04T21:37:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "FAIL", # Job failure event
"run": {
"runId": "event-3-run-id",
"facets": {}
},
"job": {
"namespace": "default",
"name": "job_3"
}
}
]
client = AtlanClient()
# Send each raw event to Atlan
for event in events:
OpenLineageEvent.emit_raw( # (1)
client=client,
event=event, # Raw event dictionary
connector_type=AtlanConnectorType.SPARK, # Specify connector type
)
print(f"Sent {event['eventType']} event for job {event['job']['name']}")
- To send raw OpenLineage event data to Atlan, you need to provide:
client: connectivity to an Atlan tenantevent: raw events as JSON string, dict, list of dicts, orOpenLineageRawEventconnector_type: connector type for the open lineage event, defaults toAtlanConnectorType.SPARK
Remove lineage between assets
To remove lineage between assets, you need to delete the Process entity that links them:
Also be aware that this will only delete the process with the GUID specified. It will not remove any column processes that may also exist. To remove those column processes as well, you must identify the GUID of each column-level process and call the same purge method against each of those GUIDs.
- Java
- Python
- Kotlin
- Raw REST API
AssetMutationResponse response =
Asset.purge(client, "b4113341-251b-4adc-81fb-2420501c30e6"); // (1)
Asset deleted = response.getDeletedAssets().get(0); // (2)
LineageProcess process;
if (deleted instanceof LineageProcess)
- Provide the GUID for the process to the static
Asset.purge()method. Because this operation will directly remove the asset from Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single process that was purged.
- If you want to confirm the details, you'll need to type-check and then cast the generic
Assetreturned into aProcess.
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Process
client = AtlanClient()
response = client.asset.purge_by_guid( # (1)
guid="b4113341-251b-4adc-81fb-2420501c30e6") # (2)
assert (processes := response.assets_deleted(Process)) # (3)
assert len(processes) == 1 # (4)
- Invoke the
asset.purge_by_guidto delete theProcess. - Provide the GUID of the process to be purged.
- Check that a
Processwas purged. - Check that only 1
Processwas purged.
val response: AssetMutationResponse =
Asset.purge(client, "b4113341-251b-4adc-81fb-2420501c30e6") // (1)
val deleted = response.deletedAssets[0] // (2)
val process = if (deleted is LineageProcess) deleted else null // (3)
- Provide the GUID for the process to the static
Asset.purge()method. Because this operation will directly remove the asset from Atlan, you must provide it anAtlanClientthrough which to connect to the tenant. - The response will include that single process that was purged.
- If you want to confirm the details, you'll need to type-check and then cast the generic
Assetreturned into aProcess.
// (1)
- All of the details are in the request URL, there is no payload for a deletion. The GUID for the process itself (not any of its inputs or outputs) is what's listed in the URL.
More information
This will irreversibly delete the process, and therefore the lineage it represented. The input and output assets themselves will also be updated, to no longer be linked to the (now non-existent) process. However, the input and output assets themselves will continue to exist in Atlan.