Coding your logic
What events you want to process and how you want to process them is up to you.
We've tried to make writing this logic as simple as possible via the SDKs:
- Java
- Python
In Java, create a new class that implements the following from the Java SDK:
- Extend the
com.atlan.events.AbstractLambdaHandlerclass. - Implement the
com.atlan.events.AtlanEventHandlerinterface.
The AbstractLambdaHandler class handles receiving and parsing the event through the AWS Lambda function, and passing it along to your logic for processing. The AtlanEventHandler interface defines 5 methods that are executed in logical sequence to carry out your event-handling:
-
validatePrerequisites()validates the event contains information you expect and intend to process. -
getCurrentState()retrieves the current state of the asset in the event from Atlan. This limits the possibility you are working against stale data. -
calculateChanges()is where you will implement the majority of your logic. This is where you look up any additional information, make any changes to assets, create notifications in external systems, and so on.Make sure idempotency
You should only return assets from this method that have actually changed, to make sure idempotency. If you blindly return every asset of interest every time from this method, you may end up with an infinite loop of picking up an event, sending back a change to Atlan, which generates another event, this handler sends back another change, and so on ad infinitum.
The hasChanges() method below can be helpful to think through how to detect what has actually changed and thus needs to be sent back to Atlan.
:::
4. hasChanges() is a convenience method you can implement to determine whether an asset has changed or not. You would typically call it from within your calculateChanges() implementation above.
5. upsertChanges() takes any of the changed assets from the calculateChanges() method and actually sends these back to Atlan for persistence. (Each such change may then generate subsequent events that either this or other handlers can then pickup.)
import com.atlan.AtlanClient;
import com.atlan.events.AbstractLambdaHandler;
import com.atlan.events.AtlanEventHandler;
import com.atlan.exception.AtlanException;
import com.atlan.exception.ErrorCode;
import com.atlan.exception.NotFoundException;
import com.atlan.model.assets.Asset;
import com.atlan.model.enums.CertificateStatus;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
public class VerificationEnforcer // (1)
extends AbstractLambdaHandler // (2)
implements AtlanEventHandler { // (3)
private static final List<String> REQUIRED_ATTRS = List.of(
"description",
"userDescription",
"ownerUsers",
"ownerGroups",
"certificateStatus"); // (4)
@Override // (5)
public Asset getCurrentState(AtlanClient client, Asset fromEvent, Logger log) throws AtlanException {
Asset asset = AtlanEventHandler.getCurrentViewOfAsset( // (6)
fromEvent, REQUIRED_ATTRS, false, false);
if (asset == null)
return asset;
}
@Override // (7)
public Collection<Asset> calculateChanges(Asset asset, Logger log) throws AtlanException {
if (asset.getCertificateStatus() == CertificateStatus.VERIFIED) else {
log.info(
"Asset has all required information present to be verified, no enforcement required: {}",
asset.getQualifiedName());
}
} else {
log.info("Asset is no longer verified, no enforcement action to consider: {}", asset.getQualifiedName());
}
return Collections.emptySet();
}
// (9)
private static final VerificationEnforcer INSTANCE = createInstance();
private static VerificationEnforcer createInstance()
public static VerificationEnforcer getInstance()
public VerificationEnforcer()
}
-
You can package your event handler with whatever name you want.
-
Make sure that it extends the
com.atlan.events.AbstractLambdaHandlerclass. -
Make sure that it also implements the
com.atlan.events.AtlanEventHandlerclass. -
Of course you can set up any of your own variables. In this example, since we want to validate certain information is present on the asset, we're defining a list of those attributes we want to check on the asset.
-
Most methods in
AtlanEventHandlerprovide a default implementation that you may be able to reuse (likevalidatePrerequisites()in this example), but you can always override the default implementation as well. -
AtlanEventHandleralso provides a number of static methods for common actions, such as an efficient way of retrieving the current view of an asset with a minimal set of information. -
By breaking down the logic into these 5 different methods, you can more precisely think through the logic of how to handle your events. In this example, we only need to consider changing an asset if:
- It's currently verified, and
- it's missing either a description or an owner
-
During event-handling you may want to change several assets. For example, you may want to look at lineage and change multiple upstream or downstream assets. Hence the method allows returning any collection of assets.
The functionality in this example only needs to potentially modify the single asset in the event: specifically changing its certificate back to
DRAFTand leaving an enforcement message, if it shouldn't have been verified in the first place. -
For efficiency, you may want to create a singleton for this class for reuse in the next step.
-
All you need to then do to "register" this class to handle events through the Lambda function is define a default constructor that sends your class up to the
AbstractLambdaHandlersuperclass. The superclass will then take care of everything else.
In Python, create a module (file) that extends the following from the Python SDK:
- Defines a new class, that extends the
pyatlan.events.atlan_event_handler.AtlanEventHandlerclass. - Defines a new function
lambda_handler(event, context)that calls thepyatlan.events.atlan_lambda_handler.process_eventfunction.
The lambda_handler function is necessary to receive the event from the AWS Lambda function, but then only needs to pass it off to the process_event function for parsing and processing using your logic. The AtlanEventHandler superclass defines 5 methods that are executed in logical sequence (by process_event) to carry out your event-handling:
-
validate_prerequisites()validates the event contains information you expect and intend to process. -
get_current_state()retrieves the current state of the asset in the event from Atlan. This limits the possibility you are working against stale data. -
calculate_changes()is where you will implement the majority of your logic. This is where you look up any additional information, make any changes to assets, create notifications in external systems, and so on.Make sure idempotency
You should only return assets from this method that have actually changed, to make sure idempotency. If you blindly return every asset of interest every time from this method, you may end up with an infinite loop of picking up an event, sending back a change to Atlan, which generates another event, this handler sends back another change, and so on ad infinitum.
The has_changes() method below can be helpful to think through how to detect what has actually changed and thus needs to be sent back to Atlan.
:::
4. has_changes() is a convenience method you can implement to determine whether an asset has changed or not. You would typically call it from within your calculate_changes() implementation above.
5. upsert_changes() takes any of the changed assets from the calculate_changes() method and actually sends these back to Atlan for persistence. (Each such change may then generate subsequent events that either this or other handlers can then pickup.)
from typing import List, Optional
from pyatlan.client.atlan import AtlanClient
from pyatlan.events.atlan_lambda_handler import process_event
from pyatlan.events.atlan_event_handler import (
AtlanEventHandler,
get_current_view_of_asset,
has_description,
has_owner,
has_lineage,
)
from pyatlan.model.assets import Asset
from pyatlan.model.enums import CertificateStatus
REQUIRED_ATTRS = [ # (1)
"description",
"userDescription",
"ownerUsers",
"ownerGroups",
"certificateStatus",
]
ENFORCEMENT_MESSAGE = (
"To be verified, an asset must have a description, at least one owner, and lineage."
)
client = AtlanClient() # (2)
class VerificationEnforcer(AtlanEventHandler): # (3)
# (4)
def get_current_state(self, from_event: Asset) -> Optional[Asset]:
return get_current_view_of_asset(self.client, from_event, REQUIRED_ATTRS) # (5)
# (6)
def calculate_changes(self, asset: Asset) -> List[Asset]:
if asset.certificate_status == CertificateStatus.VERIFIED:
if (
not has_description(asset)
or not has_owner(asset)
or not has_lineage(asset)
):
trimmed = asset.trim_to_required()
trimmed.certificate_status = CertificateStatus.DRAFT
trimmed.certificate_status_message = ENFORCEMENT_MESSAGE
return [trimmed] # (7)
else:
print(
f"Asset has all required information present to be verified, no enforcement required: {asset.qualified_name}"
)
else:
print(
f"Asset is no longer verified, no enforcement action to consider: {asset.qualified_name}"
)
return []
def lambda_handler(event, context): # (8)
process_event(LambdaEnforcer(client), event, context)
-
You can of course set up any of your own variables. In this example, since we want to validate certain information is present on the asset, we're defining a list of those attributes we want to check on the asset.
-
You will need to initialize an AtlanClient, to interact with Atlan. (This will auto-configure based on those environment variables you set up earlier.)
-
You can package your event handler with whatever name you want. Just make sure it extends the
pyatlan.events.atlan_event_handler.AtlanEventHandlerclass. -
Most methods in
AtlanEventHandlerprovide a default implementation that you may be able to reuse (likevalidate_prerequisites()in this example), but you can always override the default implementation as well. -
AtlanEventHandleralso provides a number of utility methods for common actions, such as an efficient way of retrieving the current view of an asset with a minimal set of information. -
By breaking down the logic into these 5 different methods, you can more precisely think through the logic of how to handle your events. In this example, we only need to consider changing an asset if:
- It's currently verified, and
- it's missing either a description or an owner
-
During event-handling you may want to change several assets. For example, you may want to look at lineage and change multiple upstream or downstream assets. Hence the method allows returning any collection of assets.
The functionality in this example only needs to potentially modify the single asset in the event: specifically changing its certificate back to
DRAFTand leaving an enforcement message, if it shouldn't have been verified in the first place. -
All you need to then do to "register" this class to handle events through the Lambda function is define a
lambda_handler(event, context)function that sends your class and client to theprocess_eventfunction - it will then take care of everything else.