Confluent Kafka assets app
The Confluent Kafka assets app crawls Kafka topics (and, optionally, schema-registry
subjects) from Confluent Cloud and publishes them to Atlan. Build it with the
KafkaConfluent builder.
Creating an app creates a new connection
Each create mints a new connection and new assets. To re-crawl, re-run the existing workflow (see Re-run an existing app).
API key / secret
To crawl Confluent Kafka using an API key and secret:
- Python
Confluent Kafka crawling with API key/secret
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.apps import KafkaConfluent
client = AtlanClient()
response = (
KafkaConfluent(client)
.basic( # (1)
host="pkc-abc12.us-east-2.aws.confluent.cloud:9092", # (2)
username="ABCD1234EFGH5678", # (3)
password="cflt...", # (4)
security_protocol="SASL_SSL", # (5)
include_cloud_metrics="false", # (6)
include_schema_registry="false", # (7)
)
.connection(
name="production-kafka",
admin_roles=[client.role_cache.get_id_for_name("$admin")],
)
.skip_internal_topics(True) # (8)
.run(name="kafka-prod")
)
print(response.slug, response.run_id)
- Step 1—Credential. API key/secret auth; the secret is vaulted.
- The bootstrap server (
host:port). - The cluster API key.
- The cluster API secret.
- The security protocol (for example
SASL_SSL). - Whether to include Confluent Cloud metrics. When
"true", also providecloud_api_key=,cloud_api_secret=, andcluster_id=. - Whether to include the schema registry. When
"true", also provideschema_registry_host=,schema_registry_username=, andschema_registry_password=. - Step 3—Metadata. Skip Kafka's internal topics (for example
__consumer_offsets). This takes priority over the include/exclude filters.
With Cloud metrics and Schema Registry
- Python
Include Cloud metrics + schema registry
(
KafkaConfluent(client)
.basic(
host="pkc-abc12.us-east-2.aws.confluent.cloud:9092",
username="ABCD1234EFGH5678",
password="cflt...",
security_protocol="SASL_SSL",
include_cloud_metrics="true", # (1)
cloud_api_key="...",
cloud_api_secret="...",
cluster_id="lkc-12345",
include_schema_registry="true", # (2)
schema_registry_host="https://psrc-abc12.us-east-2.aws.confluent.cloud",
schema_registry_username="SR-KEY",
schema_registry_password="SR-SECRET",
)
.connection(name="production-kafka", admin_roles=[...])
.run(name="kafka-prod")
)
- Enable Cloud metrics—requires the
cloud_api_key/cloud_api_secret/cluster_idfields. - Enable schema registry—requires the
schema_registry_*fields.
Topic filters
- Python
Include / exclude topics by regex
(
KafkaConfluent(client)
.basic(host="...", username="...", password="...",
security_protocol="SASL_SSL", include_cloud_metrics="false",
include_schema_registry="false")
.connection(name="production-kafka", admin_roles=[...])
.include_topic_regex("^prod\\..*") # (1)
.exclude_topic_regex(".*\\.internal$") # (2)
.run(name="kafka-prod")
)
- Regex of topics to include (default: everything).
- Regex of topics to exclude—takes priority over include.