Connect PySpark to Lakehouse
This guide explains how to connect PySpark to Lakehouse through the Iceberg REST catalog and Polaris credential vending, so you can query Atlan metadata from a Spark environment.
Polaris issues short-lived cloud storage credentials for each request through credential vending. You don't configure storage access keys in Spark.
Prerequisites
Before you begin, make sure that:
- You have enabled Lakehouse for your Atlan tenant. See Enable Lakehouse.
- Your Spark environment can reach your Atlan tenant over HTTPS. If your tenant uses private networking (IP allowlists), see Private networking to allowlist your egress IPs first.
- You have Python 3.9 or later and Java 11 or later (17 and 21 are also supported) installed.
- You have the following values from the Atlan Lakehouse setup flow. In the Marketplace, open the Atlan Lakehouse tile and select View connection details (see Enable Lakehouse):
- Catalog URI
- Catalog name (warehouse)
- OAuth Client ID and Client Secret
- Reader role name (for example,
lake_readers)
- AWS-hosted tenants only: You know the AWS Region where Lakehouse is deployed (for example,
us-east-1). AWS credential vending requires this value. Contact your Atlan Customer Success team if you need confirmation. GCP and Azure tenants don't use a Region setting in Spark.
Install PySpark
Install PySpark 3.5 with pip:
pip install 'pyspark==3.5.*'
This guide targets PySpark 3.5 and Iceberg 1.9.2. For other Spark versions, use a matching Iceberg runtime JAR. See the Iceberg compatibility matrix for supported combinations.
Connect to Lakehouse
-
Create a Python file and configure a
SparkSessionthat registers Lakehouse as an Iceberg REST catalog. Select the tab for your Atlan tenant's cloud provider (AWS, GCP, or Azure) and use the configuration shown.The catalog type is REST: Spark connects to Polaris over HTTPS. OAuth authenticates the client. Vended credentials instruct Polaris to return short-lived storage credentials when you read table data, so you don't store cloud storage keys in Spark.
- AWS
- GCP
- Azure
from pyspark.sql import SparkSession
CATALOG_NAME = "<catalog_name>"
spark = (
SparkSession.builder
.appName("LakehouseQuickStart")
.master("local[*]")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,"
"org.apache.iceberg:iceberg-aws-bundle:1.9.2")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Iceberg REST catalog settings
.config(f"spark.sql.catalog.{CATALOG_NAME}",
"org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.type", "rest")
.config(f"spark.sql.catalog.{CATALOG_NAME}.uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.oauth2-server-uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog/v1/oauth/tokens")
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential",
"<client_id>:<client_secret>")
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", CATALOG_NAME)
.config(f"spark.sql.catalog.{CATALOG_NAME}.token-refresh-enabled", "true")
.config(f"spark.sql.catalog.{CATALOG_NAME}.header.X-Iceberg-Access-Delegation",
"vended-credentials")
.config(f"spark.sql.catalog.{CATALOG_NAME}.scope",
"PRINCIPAL_ROLE:<reader_role>")
.config(f"spark.sql.catalog.{CATALOG_NAME}.client.region", "<aws_region>")
.getOrCreate()
)client.regionis required on AWSFor Polaris credential vending,
client.regionsets the STS client AWS Region when resolving vended session credentials. Without it, PySpark returns aDefaultAwsRegionProviderChainerror on S3 reads.client.regionis separate froms3.region(S3 client only). You needclient.regionfor the STS credential exchange.from pyspark.sql import SparkSession
CATALOG_NAME = "<catalog_name>"
spark = (
SparkSession.builder
.appName("LakehouseQuickStart")
.master("local[*]")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,"
"org.apache.iceberg:iceberg-gcp-bundle:1.9.2")
.config("spark.jars",
"https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/"
"gcs-connector/hadoop3-2.2.16/gcs-connector-hadoop3-2.2.16-shaded.jar")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Iceberg REST catalog settings
.config(f"spark.sql.catalog.{CATALOG_NAME}",
"org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.type", "rest")
.config(f"spark.sql.catalog.{CATALOG_NAME}.uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.oauth2-server-uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog/v1/oauth/tokens")
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential",
"<client_id>:<client_secret>")
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", CATALOG_NAME)
.config(f"spark.sql.catalog.{CATALOG_NAME}.token-refresh-enabled", "true")
.config(f"spark.sql.catalog.{CATALOG_NAME}.header.X-Iceberg-Access-Delegation",
"vended-credentials")
.config(f"spark.sql.catalog.{CATALOG_NAME}.scope",
"PRINCIPAL_ROLE:<reader_role>")
.getOrCreate()
)GCS connector JARLoad the GCS connector with
spark.jars, notspark.jars.packages, to avoid Guava dependency conflicts.from pyspark.sql import SparkSession
CATALOG_NAME = "<catalog_name>"
spark = (
SparkSession.builder
.appName("LakehouseQuickStart")
.master("local[*]")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,"
"org.apache.hadoop:hadoop-common:3.4.0,"
"org.apache.hadoop:hadoop-azure:3.4.0,"
"org.apache.iceberg:iceberg-azure-bundle:1.9.2")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Iceberg REST catalog settings
.config(f"spark.sql.catalog.{CATALOG_NAME}",
"org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.type", "rest")
.config(f"spark.sql.catalog.{CATALOG_NAME}.uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog")
.config(f"spark.sql.catalog.{CATALOG_NAME}.oauth2-server-uri",
"https://<tenant>.atlan.com/api/polaris/api/catalog/v1/oauth/tokens")
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential",
"<client_id>:<client_secret>")
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", CATALOG_NAME)
.config(f"spark.sql.catalog.{CATALOG_NAME}.token-refresh-enabled", "true")
.config(f"spark.sql.catalog.{CATALOG_NAME}.header.X-Iceberg-Access-Delegation",
"vended-credentials")
.config(f"spark.sql.catalog.{CATALOG_NAME}.scope",
"PRINCIPAL_ROLE:<reader_role>")
.getOrCreate()
)Azure-specific JARsInclude
hadoop-commonandhadoop-azurewithiceberg-azure-bundleso ADLS credential vending works. -
Replace the placeholders in your code with the values from View connection details.
- Replace
<catalog_name>with the catalog name from Lakehouse (for example,context_store). - Replace
<tenant>with your Atlan tenant subdomain in the catalog and OAuth URLs (for example, usemycompanyforhttps://mycompany.atlan.com). - Replace
<client_id>:<client_secret>with the OAuth Client ID and Client Secret inid:secretformat. - Replace
<reader_role>with the reader role name from Lakehouse (for example,lake_readers). - AWS only: Replace
<aws_region>with the AWS Region for your Lakehouse deployment (for example,us-east-1).
- Replace
-
Run the Python file or start a Python REPL. The first run downloads Iceberg JAR dependencies and creates the
SparkSession. If the session starts without errors, PySpark is connected to Lakehouse. -
Verify the connection.
Example 1: List namespaces in the Lakehouse:
spark.sql("SHOW NAMESPACES IN <catalog_name>").show()Replace
<catalog_name>with the catalog name from your configuration. Namespace names can vary by tenant, so use this listing to confirm names such asentity_metadatabefore you run Example 2.Example 2: Return metadata for tables registered in Atlan:
spark.sql("SHOW TABLES IN <catalog_name>.entity_metadata").show()
spark.sql("""
SELECT qualifiedName, name, typeName
FROM <catalog_name>.entity_metadata.`Table`
LIMIT 10
""").show(truncate=False)
Next steps
After you connect PySpark to Lakehouse:
- Query Atlan metadata from PySpark: See Entity metadata reference.
- Use cases: See Use cases for patterns such as metadata enrichment tracking, lineage impact analysis, and glossary alignment.