Skip to main content

OpenLineage configuration and facets

OpenLineage is a lineage metadata extraction library that you can install in a target application such as Apache Airflow or Apache Spark. Once you have installed OpenLineage, you can configure the target application to integrate with Atlan. This will allow Atlan to receive OpenLineage events and catalog your assets from supported sources. You will neither have to clone a GitHub repository nor make any code changes to your DAGs.

To install OpenLineage, refer to the documentation for supported sources:

Did you know?

To add lineage support to sources other than the ones listed above, you can use OpenLineage's extensible specification. Refer to our developer documentation to learn more.

Example

Apache Airflow

Once you have configured a supported Apache Airflow distribution, you can run a sample DAG to confirm that your assets are being crawled in Atlan. Although Atlan strongly recommends running the preflight check DAG to test your Apache Airflow connection, you can also use the example DAG below to verify your setup.

For example:

import json
from pendulum import datetime

from airflow.decorators import (
dag,
task,
)
@dag(
dag_id="example_dag_basic",
schedule="@once",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example"],
)
def example_dag_basic():

@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

order_data_dict = json.loads(data_string)
return order_data_dict

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0

for value in order_data_dict.values():
total_order_value += value

return {"total_order_value": total_order_value}

@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])


example_dag_basic()

Apache Spark

Once you have configured Apache Spark, you can run a sample Spark job to confirm that your assets are being crawled in Atlan.

For example:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session and configure the spark properties
spark = (SparkSession.builder.master('local')
.appName('data_pipeline_sample')
.getOrCreate())

snowflake_options = {
"sfURL": ".snowflakecomputing.com",
"sfUser": "",
"sfPassword": "",
"sfDatabase": "",
"sfWarehouse": "",
"sfSchema": "",
"sfRole": "",
}

instacart_df = spark.read \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "table1") \
.load()

filtered_df = instacart_df.filter(col('"order_id"') == '123456')

filtered_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "table2") \
.mode("append") \
.save()

spark.stop()

Supported facets

An OpenLineage event will contain the following object model: dataset, job, and run entities. In addition, OpenLineage supports facets to provide contextual metadata for events.

Atlan currently only processes the following facets for OpenLineage events:

Apache Airflow

OpenLineage facetDescriptionWhere in Atlan
job.facets.jobTypeApache Airflow asset type (task or DAG)asset profile, preview, and sidebar
run.facets.airflowDAG details, including runs, tasks, owner, and task groupasset profile, overview sidebar, and pipeline graph
run.facets.airflow_versionApache Airflow version and DAG metadataAPI only
run.facets.parentRunparent DAG for tasksAPI only
run.facets.processing_engineApache Airflow and OpenLineage versionsAPI only
outputs.facets.columnLineagefetches column lineagelineage graph

Apache Spark

OpenLineage facetDescriptionWhere in Atlan
eventTypejob run statusoverview sidebar
eventTimejob start and end timeasset profile
job.namespaceconnection nameasset profile and overview sidebar
job.nameSpark job nameasset name
run.runIdSpark job name run IDAPI only
run.facets.spark_versionSpark versionoverview sidebar
run.facets.spark_propertiesOpenLineage package versionAPI only
run.facets.processing_engineSpark cluster detailsAPI only
inputs.facets.namelinks input facetsrelated assets and pipeline graph
outputs.facets.namelinks output facetsrelated assets and pipeline graph
inputs.facets.namespaceinput typerelated assets and pipeline graph
outputs.facets.namespaceoutput typerelated assets and pipeline graph
inputs.facets.symlinksretrieves logical entityAPI only
outputs.facets.symlinksretrieves logical entityAPI only
outputs.facets.columnLineagefetches column lineagelineage graph