Handle events
Handlers provide the crucial link between the generic components of the Application SDK (like Application
and Activities
) and the specific logic required to interact with a particular target system. Handlers encapsulate this system-specific interaction logic and serve as the core implementation layer for a connector.
Core capabilities
System abstraction
Handlers provide a consistent interface for interacting with different types of data sources, from REST APIs to SQL databases.
- Standardized method signatures across all handler types
- Unified error handling and logging
- Consistent authentication and credential management
Async operations
All handler operations are designed to work with async/await patterns, ensuring high-performance concurrent operations.
- Native async method support
- Non-blocking I/O operations
- Optimized for high-throughput scenarios
Extensible architecture
Multiple handler base classes support different data source types while maintaining consistent interfaces.
- Base classes for SQL and non-SQL sources
- Easy subclassing for custom implementations
- Built-in validation and type checking
Purpose and role
Handlers serve as the core implementation layer for a connector with four primary responsibilities:
- System interaction logic: Containing the methods that perform actions specific to the target system, such as validating credentials, performing health checks, fetching metadata lists, or executing specific queries/API calls needed by activities.
- Client management: Often managing the lifecycle (loading, closing) of the specific
ClientInterface
implementation needed to communicate with the target system. - API implementation: Providing the concrete implementation for the standard API endpoints exposed by
application_sdk.server.fastapi.Application
(like/workflows/v1/auth
,/workflows/v1/metadata
,/workflows/v1/check
). - Activity logic delegation: Providing methods that activities can call (via the shared activity state) to perform tasks related to the target system.
Essentially, while Clients
provide the connection abstraction, Handlers
provide the operational abstraction for a specific data source type.
How handlers are used
By the application
When you create an instance of application_sdk.server.fastapi.APIServer
, you pass your custom handler instance to its constructor:
app = APIServer(handler=MyConnectorHandler())
The APIServer
's default API endpoints (/workflows/v1/auth
, /workflows/v1/metadata
, /workflows/v1/check
) directly call the corresponding methods (test_auth
, fetch_metadata
, preflight_check
) on the provided handler instance to perform the actual work.
By activities
Standard activity base classes (like BaseSQLMetadataExtractionActivities
) typically expect a specific handler type (for example, BaseSQLHandler
). During activity initialization (usually within the overridden _set_state
method of the activity class), the appropriate handler is instantiated (often passing the corresponding client).
This handler instance is stored in the ActivitiesState
object associated with the workflow run. Activity methods then access this handler via state.handler
:
state = await self._get_state(...)
await state.handler.test_auth(**credentials)
metadata = await state.handler.fetch_metadata(**filters)
Handler types
All handler types are built by extending the abstract HandlerInterface, which defines the required methods and behaviors for all handlers.
BaseHandler
Non-SQLConcrete implementation for non-SQL data sources such as REST APIs, NoSQL databases, message queues, and file systems.
BaseSQLHandler
SQLSpecialized implementation for SQL-based data sources, supporting databases like PostgreSQL, MySQL, SQL Server, and Oracle.
Extending handlers
Creating a new connector requires creating a custom handler. You have two main approaches depending on your target system:
For non-SQL sources
If your target system isn't SQL-based (for example, a REST API, NoSQL database), inherit directly from HandlerInterface
and implement all required abstract methods:
Example: REST API handler implementation
# Example for a hypothetical REST API source
# In my_api_connector/handlers.py
from typing import Any, Dict, List
from application_sdk.handlers import HandlerInterface
from .clients import MyApiClient
class MyApiHandler(HandlerInterface):
api_client: MyApiClient
async def load(self, **kwargs: Any) -> None:
print("Loading MyApiHandler")
self.api_client = MyApiClient()
# Pass credentials from kwargs to the client's load/connect method
await self.api_client.connect(credentials=kwargs.get("credentials", {}))
async def test_auth(self, **kwargs: Any) -> bool:
print("Testing API Authentication")
# Use the client to validate credentials against the API
return await self.api_client.check_token()
async def preflight_check(self, **kwargs: Any) -> Any:
print("Performing API Preflight Checks")
# Use the client to check connectivity, permissions etc.
ping_ok = await self.api_client.ping()
read_perms_ok = await self.api_client.can_read_data()
return {
"success": ping_ok and read_perms_ok,
"data": {
"connectivityCheck": {"success": ping_ok, "message": "API reachable"},
"permissionCheck": {"success": read_perms_ok, "message": "Read access verified"}
}
}
async def fetch_metadata(self, **kwargs: Any) -> List[Dict[str, str]]:
print("Fetching API Metadata")
# Use the client to fetch metadata (e.g., list available datasets)
if kwargs.get("metadata_type") == "datasets":
return await self.api_client.list_datasets()
else:
return []
For SQL sources
If your target is a SQL database, inherit from BaseSQLHandler
. You'll typically need to:
- Provide the appropriate
SQLClient
subclass during initialization - Override specific methods if the default SQL-based logic needs modification
- Override SQL query class attributes if the default queries are incorrect for your SQL dialect
Example: PostgreSQL handler implementation
# Example for a specific SQL database (e.g., PostgreSQL)
# In my_postgres_connector/handlers.py
from typing import Dict, Any
from application_sdk.handlers.sql import BaseSQLHandler
from application_sdk.observability.logger_adaptor import get_logger
from .clients import PostgreSQLClient
logger = get_logger(__name__)
class MyPostgresHandler(BaseSQLHandler):
# Override specific SQL queries if needed
client_version_sql = "SELECT version();"
def __init__(self, sql_client: PostgreSQLClient):
super().__init__(sql_client)
async def test_auth(self, **kwargs: Any) -> bool:
logger.info("Running PostgreSQL specific test_auth")
try:
# Reuse the base class logic or add specific checks
return await super().test_auth(**kwargs)
except Exception as e:
logger.error(f"PostgreSQL auth failed: {e}")
return False
async def preflight_check(self, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
logger.info("Running PostgreSQL specific preflight_check")
# Call base checks and add custom ones
base_checks = await super().preflight_check(payload, **kwargs)
# Add custom check for PostgreSQL extension
extension_check = await self.check_pg_extension("uuid-ossp")
base_checks["extensionCheck"] = extension_check
base_checks["success"] = base_checks["success"] and extension_check["success"]
return base_checks
async def check_pg_extension(self, extension_name: str) -> Dict[str, Any]:
# Custom method using self.sql_client
query = f"SELECT 1 FROM pg_extension WHERE extname = '{extension_name}';"
try:
async for batch in self.sql_client.run_query(query):
if batch:
return {"success": True, "message": f"Extension '{extension_name}' found."}
return {"success": False, "message": f"Required extension '{extension_name}' not found."}
except Exception as e:
return {"success": False, "message": f"Error checking extension: {e}"}
See also
- HandlerInterface reference: Complete API documentation for the base handler interface
- BaseHandler reference: Implementation details for non-SQL data sources
- BaseSQLHandler reference: SQL-specific handler implementation
- Application architecture: Technical deep dive into application architecture