Skip to main content

Manage lineage

Create lineage between assets

Directly

To create lineage between assets, you need to create a Process entity.

Input and output assets must already exist

Note that the assets you reference as the inputs and outputs of the process must already exist, before creating the process.

Create lineage between assets
LineageProcess process = LineageProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
List.of( // (5)
Table.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Table.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS")),
List.of( // (6)
Table.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS")),
null) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build();
AssetMutationResponse response = process.save(client); // (10)
assert response.getCreatedAssets().size() == 1 // (11)
assert response.getUpdatedAssets().size() == 5 // (12)
  1. Use the creator() method to initialize the object with all necessary attributes for creating it.

  2. Provide a name for how the process will be shown in the UI.

  3. Provide the qualifiedName of the connection that ran the process.

    Tips for the connection

The process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the process:

  • You could use the same connection qualifiedName as the source system, if it was the source system "pushing" data to the targets.
  • You could use the same connection qualifiedName as the target system, if it was the target system "pulling" data from the sources.
  • You could use a different connection qualifiedName from either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
  1. (Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send null and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.

    Use your own ID if you can

While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.

By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated. ::: 5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:

  • its GUID (for the static <Type>.refByGuid() method)
  • its qualifiedName (for the static <Type>.refByQualifiedName() method)
  1. Provide the list of outputs to the process. Note that each of these is again only a Reference to an asset.
  2. (Optional) Provide the parent LineageProcess in which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also send null for this parameter (as in this example).
  3. (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
  4. (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
  5. Call the save() method to actually create the process. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  6. The response will include that single lineage process asset that was created.
  7. The response will also include the 5 data assets (3 inputs, 2 outputs) that were updated.

Using OpenLineage

Creating connection for OpenLineage

You must first configure OpenLineage before creating lineage between assets. You can either configure a Spark Assets connection in Atlan before sending any OpenLineage events. (You can skip the Configure the integration in Apache Spark section), or you can follow the steps below to create the Spark connection via SDKs.

Coming soon

Creating lineage between assets using OpenLineage

To create lineage between assets through OpenLineage, you need to send at least two events: one indicating the start of a job run and the other indicating that job run is finished.

Start lineage between assets via OpenLineage
String snowflake = "snowflake://abc123.snowflakecomputing.com"; // (1)
OpenLineageJob olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build();
OpenLineageRun olr = OpenLineageRun.creator(olj).build(); // (3)
OpenLineageInputDataset inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build(); // (4)
OpenLineageOutputDataset outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS")
.build(); // (5)
OpenLineageEvent start = OpenLineageEvent.creator( // (6)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (7)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (8)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build();
start.emit(client); // (9)
  1. Datasets used in data lineage need a namespace that follows the source-specific naming standards of OpenLineage.
  2. Lineage is tracked through jobs. Each job must have:
    • the name of a connection (that already exists in Atlan),
    • a unique job name (used to idempotently update the same job with multiple runs), and
    • a unique URI indicating the code or system responsible for producing this lineage.
  3. A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through OpenLineageRun objects.
  4. You can define any number of inputs (sources) for lineage. The name of a dataset should use a .-qualified form. For example, a table should be DATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
  5. You can define any number of outputs (targets) for lineage. The name of a dataset should use a .-qualified form. For example, a table should be DATABASE_NAME.SCHEMA_NAME.TABLE_NAME.
  6. Each run of a job must consist of at least two events—a START event indicating when the job ran began, and some terminal state indicating when the job run finished.
  7. You can chain any number of inputs to the event to indicate the source datasets for the lineage.
  8. You can chain any number of outputs to the event to indicate the target datasets for the lineage.
  9. Use the emit() method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successful emit() will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
Complete lineage between assets via OpenLineage
OpenLineageEvent complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build();
complete.emit(client); // (2)
  1. Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a COMPLETE or had some error with a FAIL.)
  2. Once again, use the emit() method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.

Create lineage between columns

Directly

To create lineage between relational asset columns, it's necessary to create a ColumnProcess entity.

Lineage with relational columns

Before creating the ColumnProcess, verify lineage already exists between the associated relational assets, and make sure that the columns referenced as inputs and outputs already exist.

Create lineage between columns
ColumnProcess columnProcess = ColumnProcess.creator( // (1)
"Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
"default/snowflake/1657025257", // (3)
"dag_123", // (4)
List.of( // (5)
Column.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
Column.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS/COLUMN")),
List.of( // (6)
Column.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
Column.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS/COLUMN")),
Process.refByGuid("76d9a061-7753-9884-b988-a02d3954bc25")) // (7)
.sql("select * from somewhere;") // (8)
.sourceURL("https://your.orchestrator/unique/id/123") // (9)
.build();
AssetMutationResponse response = columnProcess.save(client); // (10)
assert response.getCreatedAssets().size() == 1 // (11)
assert response.getUpdatedAssets().size() == 5 // (12)
  1. Use the creator() method to initialize the object with all necessary attributes for creating it.

  2. Provide a name for how the column process will be shown in the UI.

  3. Provide the qualifiedName of the connection that ran the column process.

    Tips for the connection

The column process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the column process:

  • You could use the same connection qualifiedName as the source system, if it was the source system "pushing" data to the targets.
  • You could use the same connection qualifiedName as the target system, if it was the target system "pulling" data from the sources.
  • You could use a different connection qualifiedName from either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator). :::
  1. (Optional) Provide the unique ID of the column process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it's optional, you can also send null and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the column process.

    Use your own ID if you can

While the SDK can generate this ID for you, since it's based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra column processes in lineage as this process itself changes over time.

By using your own ID for the column process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated. ::: 5. Provide the list of inputs to the column process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:

  • its GUID (for the static <Type>.refByGuid() method)
  • its qualifiedName (for the static <Type>.refByQualifiedName() method)
  1. Provide the list of outputs to the column process. Note that each of these is again only a Reference to an asset.
  2. Provide the parent LineageProcess in which this process ran since this process is a subprocess of some higher-level process.
  3. (Optional) You can also add other properties to the column process, such as SQL code that runs within the column process.
  4. (Optional) You can also provide a link to the column process, which will provide a button to click to go to that link from the Atlan UI when viewing the column process in Atlan.
  5. Call the save() method to actually create the column process. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  6. The response will include that single column process asset that was created.
  7. The response will also include the 5 column assets (3 inputs, 2 outputs) that were updated.

Using OpenLineage

To create column-lineage between assets through OpenLineage, you need only extend the details of the outputs you send in your OpenLineage events.

You must first configure OpenLineage

You must first configure a Spark Assets connection in Atlan before sending any OpenLineage events. (You can skip the Configure the integration in Apache Spark section.)

Start column-level lineage between assets via OpenLineage
String snowflake = "snowflake://abc123.snowflakecomputing.com"; // (1)
OpenLineageJob olj = OpenLineageJob.creator( // (2)
"ol-spark",
"dag_123",
"https://your.orchestrator/unique/id/123"
).build();
OpenLineageRun olr = OpenLineageRun.creator(olj).build(); // (3)
OpenLineageInputDataset inputDataset = olj.createInput(snowflake, "OPS.DEFAULT.RUN_STATS")
.build(); // (4)
OpenLineageOutputDataset outputDataset = olj.createOutput(snowflake, "OPS.DEFAULT.FULL_STATS") // (5)
.toField( // (6)
"COLUMN", // (7)
listOf( // (8)
inputDataset.fromField("COLUMN").build(),
inputDataset.fromField("ONE").build(),
inputDataset.fromField("TWO").build(),
),
)
.toField(
"ANOTHER",
listOf(
inputDataset.fromField("THREE").build(),
),
)
.build();
OpenLineageEvent start = OpenLineageEvent.creator( // (9)
olr,
OpenLineage.RunEvent.EventType.START
)
.input(inputDataset) // (10)
.input(olj.createInput(snowflake, "SOME.OTHER.TBL").build())
.input(olj.createInput(snowflake, "AN.OTHER.TBL").build())
.output(outputDataset) // (11)
.output(olj.createOutput(snowflake, "AN.OTHER.VIEW").build())
.build();
start.emit(client); // (12)
  1. Datasets used in data lineage need a namespace that follows the source-specific naming standards of OpenLineage.

  2. Lineage is tracked through jobs. Each job must have:

    • the name of a connection (that already exists in Atlan),
    • a unique job name (used to idempotently update the same job with multiple runs), and
    • a unique URI indicating the code or system responsible for producing this lineage.
  3. A job must be run at least once for any lineage to exist, and these separate runs of the same job are tracked through OpenLineageRun objects.

  4. You can define any number of inputs (sources) for lineage. The name of a dataset should use a .-qualified form. For example, a table should be DATABASE_NAME.SCHEMA_NAME.TABLE_NAME.

  5. You can define any number of outputs (targets) for lineage. The name of a dataset should use a .-qualified form. For example, a table should be DATABASE_NAME.SCHEMA_NAME.TABLE_NAME.

  6. For column-level lineage, you specify the mapping only on the target (outputs) end of the lineage, by chaining a toField for each output column.

  7. Each key for such a toField() chain is the name of a field (column) in the output dataset.

  8. You can then provide a list that defines all input (source) fields that map to this output field in column-level lineage.

    Create input fields from input datasets

You can quickly create such a input (source) field from an input dataset using the fromField() method and the name of the column in that input dataset. ::: 9. Each run of a job must consist of at least two events—a START event indicating when the job ran began, and some terminal state indicating when the job run finished. 10. You can chain any number of inputs to the event to indicate the source datasets for the lineage. 11. You can chain any number of outputs to the event to indicate the target datasets for the lineage. 12. Use the emit() method to actually send the event to Atlan to be processed. The processing itself occurs asynchronously, so a successful emit() will only indicate that the event has been successfully sent to Atlan, not that it has (yet) been processed. Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.

Complete lineage between assets via OpenLineage
OpenLineageEvent complete = OpenLineageEvent.creator( // (1)
olr,
OpenLineage.RunEvent.EventType.COMPLETE
).build();
complete.emit(client); // (2)
  1. Since each run of a job must consist of at least two events, don't forget to send the terminal state indicating when the job has finished (and whether it was successful with a COMPLETE or had some error with a FAIL.)
  2. Once again, use the emit() method to actually send the event to Atlan to be processed (asynchronously). Because this operation will directly persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.

(Optional) Send raw OpenLineage events

If you have raw OpenLineage run events that you want to send directly to Atlan, use the OpenLineageEvent.emit_raw() method:

Raw events bypass model validation

Unlike events constructed through the SDK, raw events are passed without strict validation. Only basic type validation occurs (string, JSON string, list[dict], or dict). This gives you flexibility but requires ensuring your event structure is correct.

Use with caution - malformed events may cause processing issues.

Python

Send raw OL events

from pyatlan.client.atlan import AtlanClient
from pyatlan.model.open_lineage import OpenLineageEvent
from pyatlan.model.enums import AtlanConnectorType

# Example raw OpenLineage events
events = [
{
"eventTime": "2025-01-04T21:35:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START", # Job start event
"run": {
"runId": "event-1-run-id", # Unique run identifier
"facets": {}
},
"job": {
"namespace": "default", # Connection name in Atlan
"name": "job_1" # Unique job name
}
},
{
"eventTime": "2025-01-04T21:36:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE", # Successful job completion
"run": {
"runId": "event-2-run-id",
"facets": {}
},
"job": {
"namespace": "default",
"name": "job_2"
}
},
{
"eventTime": "2025-01-04T21:37:06.116Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "FAIL", # Job failure event
"run": {
"runId": "event-3-run-id",
"facets": {}
},
"job": {
"namespace": "default",
"name": "job_3"
}
}
]

client = AtlanClient()

# Send each raw event to Atlan
for event in events:
OpenLineageEvent.emit_raw( # (1)
client=client,
event=event, # Raw event dictionary
connector_type=AtlanConnectorType.SPARK, # Specify connector type
)
print(f"Sent {event['eventType']} event for job {event['job']['name']}")
  1. To send raw OpenLineage event data to Atlan, you need to provide:
    • client: connectivity to an Atlan tenant
    • event: raw events as JSON string, dict, list of dicts, or OpenLineageRawEvent
    • connector_type: connector type for the open lineage event, defaults to AtlanConnectorType.SPARK

Remove lineage between assets

To remove lineage between assets, you need to delete the Process entity that links them:

Only deletes the process indicated, no more

Also be aware that this will only delete the process with the GUID specified. It will not remove any column processes that may also exist. To remove those column processes as well, you must identify the GUID of each column-level process and call the same purge method against each of those GUIDs.

Remove lineage between assets
AssetMutationResponse response =
Asset.purge(client, "b4113341-251b-4adc-81fb-2420501c30e6"); // (1)
Asset deleted = response.getDeletedAssets().get(0); // (2)
LineageProcess process;
if (deleted instanceof LineageProcess)
  1. Provide the GUID for the process to the static Asset.purge() method. Because this operation will directly remove the asset from Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  2. The response will include that single process that was purged.
  3. If you want to confirm the details, you'll need to type-check and then cast the generic Asset returned into a Process.
Was this page helpful?