Manage Kafka assets
In general, these should be:
- Created in top-down order (connection, then KafkaTopic, then KafkaConsumerGroup)
- Deleted in bottom-up order (consumer groups, then topics, then connections)1
Asset structure
Connection
A Kafka connection requires a name and qualifiedName. For creation, specific settings are also required to distinguish it as a Kafka connection rather than another type of connection. In addition, at least one of adminRoles, adminGroups, or adminUsers must be provided.
- Java
- Python
- Kotlin
- Raw REST API
String adminRoleGuid = client.getRoleCache().getIdForName("$admin"); // (1)
Connection connection = Connection.creator( // (2)
"kafka-connection", // (3)
AtlanConnectorType.KAFKA, // (4)
List.of(adminRoleGuid), // (5)
List.of("group2"), // (6)
List.of("jsmith")) // (7)
.build();
AssetMutationResponse response = connection.save(client); // (8)
String connectionQualifiedName = response.getCreatedAssets().get(0).getQualifiedName(); // (9)
- Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
- Build up the minimum request to create a connection.
- Provide a human-readable name for your connection, such as
productionordevelopment. - Set the type of connection to
KAFKA. - List the workspace roles that should be able to administer the connection
(or null if none). All users with that workspace role (current and future) will be
administrators of the connection. Note that the values here need to be the GUIDs
of the workspace roles. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List the group names that can administer this connection (or null if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List the user names that can administer this connection (or null if none). Note that the values here are the usernames of the users. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - Actually call Atlan to create the connection. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant. - Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some null checking first.)
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Connection, KafkaTopic, KafkaConsumerGroup
from pyatlan.model.enums import AtlanConnectorType
client = AtlanClient()
admin_role_guid = client.role_cache.get_id_for_name("$admin") # (1)
connection = Connection.creator( # (2)
client=client, # (3)
name="kafka-connection", # (4)
connector_type=AtlanConnectorType.KAFKA, # (5)
admin_roles=[admin_role_guid], # (6)
admin_groups=["group2"], # (7)
admin_users=["jsmith"] # (8)
)
response = client.asset.save(connection) # (9)
connection_qualified_name = response.assets_created(asset_type=Connection)[0].qualified_name # (10)
- Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
- Build up the minimum request to create a connection.
- You must provide a client instance.
- Provide a human-readable name for your connection, such as
productionordevelopment. - Set the type of connection to
KAFKA. - List the workspace roles that should be able to administer the connection
(or
Noneif none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUIDs of the workspace roles. At least one ofadmin_roles,admin_groups, oradmin_usersmust be provided. - List the group names that can administer this connection (or
Noneif none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one ofadmin_roles,admin_groups, oradmin_usersmust be provided. - List the user names that can administer this connection (or
Noneif none). Note that the values here are the usernames of the users. At least one ofadmin_roles,admin_groups, oradmin_usersmust be provided. - Actually call Atlan to create the connection.
- Retrieve the qualifiedName for use in subsequent creation calls.
(You'd probably want to do some
Nonechecking first.)
val adminRoleGuid = client.roleCache.getIdForName("\$admin") // (1)
val connection = Connection.creator( // (2)
"kafka-connection", // (3)
AtlanConnectorType.KAFKA, // (4)
listOf(adminRoleGuid), // (5)
listOf("group2"), // (6)
listOf("jsmith")) // (7)
.build()
val response = connection.save(client) // (8)
val connectionQualifiedName = response.createdAssets[0].qualifiedName // (9)
- Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
- Build up the minimum request to create a connection.
- Provide a human-readable name for your connection, such as
productionordevelopment. - Set the type of connection to
KAFKA. - List the workspace roles that should be able to administer the connection
(or null if none). All users with that workspace role (current and future) will be
administrators of the connection. Note that the values here need to be the GUIDs
of the workspace roles. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List the group names that can administer this connection (or null if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List the user names that can administer this connection (or null if none). Note that the values here are the usernames of the users. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - Actually call Atlan to create the connection. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant. - Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some null checking first.)
{
"entities": [
{
"typeName": "Connection", // (1)
"attributes": {
"name": "kafka-connection", // (2)
"connectorName": "kafka", // (3)
"qualifiedName": "default/kafka/123456789", // (4)
"category": "eventbus", // (5)
"adminRoles": [ // (6)
"e7ae0295-c60a-469a-bd2c-fb903943aa02"
],
"adminGroups": [ // (7)
"group2"
],
"adminUsers": [ // (8)
"jsmith"
]
}
}
]
}
- The
typeNamemust be exactlyConnection. - Human-readable name for your connection, such as
productionordevelopment. - The
connectorNamemust be exactlykafka. - The
qualifiedNameshould follow the pattern:default/kafka/<epoch>, where<epoch>is the time in milliseconds at which the connection is being created. - The
categorymust beeventbus. - List any workspace roles that can administer this connection. All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUIDs of the workspace roles. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List any groups that can administer this connection. All users within that group (current and future) will be administrators of the connection. Note that the values here are the names of the groups. At least one of
adminRoles,adminGroups, oradminUsersmust be provided. - List any users that can administer this connection. Note that the values here are the usernames of the users. At least one of
adminRoles,adminGroups, oradminUsersmust be provided.
Atlan creates the policies that grant access to a connection, including the ability to retrieve the connection and to create assets within it, asynchronously. It can take several seconds (even up to approximately 30 seconds) before these are in place after creating the connection.
You may therefore need to wait before you'll be able to create the assets below within the connection.
To confirm access, retrieve the connection after it has been created. The SDKs' retry loops will automatically retry until the connection can be successfully retrieved. At that point, your API token has permission to create the other assets.
Note: if you are reusing an existing connection rather than creating one via your API token, you must give your API token a persona that has access to that connection. Otherwise all attempts to create, read, update, or delete assets within that connection will fail due to a lack of permissions.
KafkaTopic
A KafkaTopic requires a name and a qualifiedName.
For creation, you also need to specify the connectionQualifiedName of the connection for the topic.
- Java
- Python
- Kotlin
- Raw REST API
KafkaTopic kafkaTopic = KafkaTopic.creator( // (1)
"myKafkaTopic", // (2)
connectionQualifiedName) // (3)
.build();
response = kafkaTopic.save(client); // (4)
kafkaTopic = response.getResult(kafkaTopic); // (5)
- Build up the minimum request to create a topic.
- Provide a human-readable name for your topic.
- Provide the
qualifiedNameof the Kafka connection. - Actually call Atlan to create the topic. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant. - Retrieve the created topic for use in subsequent creation calls.
kafka_topic = KafkaTopic.creator( # (1)
name="myKafkaTopic", # (2)
connection_qualified_name=connection_qualified_name # (3)
)
response = client.asset.save(kafka_topic) # (4)
kafka_topic_qualifed_name = response.assets_created(asset_type=KafkaTopic)[0].qualified_name # (5)
- Build up the minimum request to create a topic.
- Provide a human-readable name for your topic.
- Provide the
qualifiedNameof the Kafka connection. - Actually call Atlan to create the topic.
- Retrieve the created topic for use in subsequent creation calls.
(You'd probably want to do some
Nonechecking first.)
var kafkaTopic = KafkaTopic.creator( // (1)
"myKafkaTopic", // (2)
connectionQualifiedName) // (3)
.build()
response = kafkaTopic.save(client) // (4)
kafkaTopic = response.getResult(kafkaTopic) // (5)
- Build up the minimum request to create a topic.
- Provide a human-readable name for your topic.
- Provide the
qualifiedNameof the Kafka connection. - Actually call Atlan to create the topic. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant. - Retrieve the created topic for use in subsequent creation calls.
{
"entities": [
{
"typeName": "KafkaTopic", // (1)
"attributes": {
"name": "myKafkaTopic", // (2)
"qualifiedName": "default/kafka/123456789/topic/myKafkaTopic", // (3)
"connectionQualifiedName": "default/kafka/123456789", // (4)
"connectorName": "kafka" // (5)
}
}
]
}
- The
typeNamemust be exactlyKafkaTopic. - Human-readable name for your topic.
- The
qualifiedNameshould follow the pattern:default/kafka/<epoch>/topic/<name>, wheredefault/kafka/<epoch>is thequalifiedNameof the connection for this topic and<name>is the unique name for this topic. - The
connectionQualifiedNamemust be the exactqualifiedNameof the connection for this topic. - The
connectorNamemust be exactlykafka.
KafkaConsumerGroup
A KafkaConsumerGroup
requires a name and a qualifiedName. For creation, you also need to
specify the list of kafkaTopicQualifiedNames of the topics that will contain the consumer group.
- Java
- Python
- Kotlin
- Raw REST API
KafkaConsumerGroup consumerGroup = KafkaConsumerGroup.creatorObj( // (1)
"myKafkaConsumerGroup", // (2)
List.of(kafkaTopic)) // (3)
.build();
response = consumerGroup.save(client); // (4)
- Build up the minimum request to create a consumer group.
- Provide a human-readable name for your consumer group.
- Provide the list of topics for this consumer group.
- Actually call Atlan to create the consumer group. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant.
consumer_group = KafkaConsumerGroup.creator( # (1)
name="myKafkaConsumerGroup", # (2)
kafka_topic_qualified_names=[kafka_topic_qualified_name], # (3)
)
response = client.asset.save(consumer_group) # (4)
- Build up the minimum request to create a consumer group.
- Provide a human-readable name for your consumer group.
- Provide the list of
qualified_namesof the topic for this consumer group. - Actually call Atlan to create the consumer group.
val consumerGroup = KafkaConsumerGroup.creatorObj( // (1)
"myKafkaConsumerGroup", // (2)
listOf(kafkaTopic)) // (3)
.build()
response = consumerGroup.save(client) // (4)
- Build up the minimum request to create a consumer group.
- Provide a human-readable name for your consumer group.
- Provide the list of topics for this consumer group.
- Actually call Atlan to create the consumer group. Because this operation will persist the asset in Atlan, you must provide it an
AtlanClientthrough which to connect to the tenant.
{
"entities": [
{
"typeName": "KafkaConsumerGroup", // (1)
"attributes": {
"name": "myKafkaConsumerGroup", // (2)
"qualifiedName": "default/kafka/123456789/consumer-group/myKafkaConsumerGroup", // (3)
"connectionQualifiedName": "default/kafka/123456789", // (4)
"connectorName": "kafka", // (5)
"kafkaTopics": [{ // (6)
"typeName": "KafkaTopic", // (7)
"uniqueAttributes": { // (8)
"qualifiedName": "default/kafka/123456789/topic/myKafkaTopic",
}
}],
"kafkaTopicQualifiedNames": ["default/kafka/123456789/topic/myKafkaTopic"], // (9)
}
}
]
}
- The
typeNamemust be exactlyKafkaConsumerGroup. - Human-readable name for your consumer group.
- The
qualifiedNameshould follow the pattern:default/kafka/<epoch>/consumer-group/<name>, wheredefault/kafka/<epoch>is the qualifiedName of the connection that contains this consumer group and<name>is the unique name for this consumer group. - The
connectionQualifiedNamemust be the exactqualifiedNameof the connection for this consumer group. - The
connectorNamemust be exactlykafka. - The list of topics in which this consumer group exists is embedded in the
kafkaTopicsattribute. - The
typeNamefor this embedded reference must beKafkaTopic. - To complete the reference, you must include a
uniqueAttributeswith thequalifiedNameof the topic. Note: the topic must already exist in Atlan before creating the consumer group. - The list of topic qualified names.
Available relationships
Every level of the Kafka structure is an Asset,
and can therefore be related to the following other assets.