Manage workflow schedules
Schedule workflow run
Directly on run:
You can directly add a schedule to a workflow run. For example, with Snowflake Miner:
- Java
- Python
- Kotlin
- Go
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.packages import SnowflakeMiner
from pyatlan.model.workflow import WorkflowSchedule
client = AtlanClient()
miner = ( # (1)
SnowflakeMiner(
connection_qualified_name="default/snowflake/1234567890"
)
.s3(
s3_bucket="test-s3-bucket",
s3_prefix="test-s3-prefix",
s3_bucket_region="test-s3-bucket-region",
sql_query_key="TEST_QUERY",
default_database_key="TEST_SNOWFLAKE",
default_schema_key="TEST_SCHEMA",
session_id_key="TEST_SESSION_ID",
)
.popularity_window(days=15)
.native_lineage(enabled=True)
.custom_config(config={"test": True, "feature": 1234})
.to_workflow()
)
schedule = WorkflowSchedule(
cron_schedule="45 5 * * *",
timezone="Europe/Paris",
) # (2)
response = client.workflow.run(
workflow=miner, workflow_schedule=schedule
) # (3)
-
Begin by constructing the Snowflake miner workflow.
-
To create a new schedule for the workflow, specify:
cron schedule expression, for example:45 5 * * *(scheduled for tomorrow at04:05:00).- time zone for the cron schedule, such as
Europe/Paris.
-
Finally, use the
client.workflow.run()method to add this new schedule. It will both add the schedule and execute the workflow in Atlan.
miner := assets.NewSnowflakeMiner("default/snowflake/1234567890"). // (1)
S3(
"test-s3-bucket",
"test-s3-prefix",
"TEST_QUERY",
"TEST_SNOWFLAKE",
"TEST_SCHEMA",
"TEST_SESSION_ID",
structs.StringPtr("test-s3-bucket-region"),
).
PopularityWindow(30).
NativeLineage(true).
CustomConfig(map[string]interface{}{
"test": true,
"feature": 1234,
}).
ToWorkflow()
Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"} // (2)
response, atlanErr := ctx.WorkflowClient.Run(miner, &Schedule) // (3)
if atlanErr != nil {
logger.Log.Errorf("Error : %v", atlanErr)
}
-
Begin by constructing the Snowflake miner workflow.
-
To create a new schedule for the workflow, specify:
cron schedule expression, for example:45 5 * * *(scheduled for tomorrow at04:05:00).- time zone for the cron schedule, such as
Europe/Paris.
-
Finally, use the
ctx.WorkflowClient.Run()method to add this new schedule. It will both add the schedule and execute the workflow in Atlan.
Existing workflow:
You can also add a schedule to an existing workflow run:
- Java
- Python
- Kotlin
- Go
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.workflow import WorkflowSchedule
client = AtlanClient()
existing_workflow = client.workflow.find_by_type(
prefix=WorkflowPackage.SNOWFLAKE_MINER
)[0] # (1)
schedule = WorkflowSchedule(
cron_schedule="45 5 * * *",
timezone="Europe/Paris",
) # (2)
response = client.workflow.add_schedule(
workflow=existing_workflow, workflow_schedule=schedule
) # (3)
-
You can retrieve workflows based on their type (prefix).
Note:Only workflows that have been run will be found. -
To create a new schedule for an existing workflow, provide:
cron schedule expression, e.g:45 5 * * *(tomorrow at04:05:00).- time zone for the cron schedule, e.g:
Europe/Paris.
-
Finally, to apply this new schedule to the existing workflow, use
client.workflow.add_schedule()method.
existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1) // (1)
Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"} // (2)
response, atlanErr := ctx.WorkflowClient.AddSchedule(existingWorkflow[0], &Schedule)
-
You can retrieve workflows based on their type (prefix). (You can also specify the maximum number of resulting workflows you want to retrieve as results.)
Note:Only workflows that have been run will be found. -
To create a new schedule for an existing workflow, provide:
cron schedule expression, e.g:45 5 * * *(tomorrow at04:05:00).- time zone for the cron schedule, e.g:
Europe/Paris.
-
Finally, to apply this new schedule to the existing workflow, use
ctx.WorkflowClient.AddSchedule()method.Note:The AddSchedule() method returns an error in case of failure, manage it accordingly by returning it.
Retrieve scheduled workflow run
Retrieve by name:
To retrieve an existing scheduled workflow run by its name:
- Java
- Python
- Go
from pyatlan.client.atlan import AtlanClient
client = AtlanClient()
response = client.workflow.get_scheduled_run(
workflow_name="atlan-snowflake-miner-1714638976"
) # (1)
-
To retrieve an existing scheduled workflow runs, specify:
- name of the workflow as displayed in the
UI (e.g:
atlan-snowflake-miner-1714638976).
- name of the workflow as displayed in the
UI (e.g:
response, atlanErr := ctx.WorkflowClient.GetScheduledRun("atlan-snowflake-miner-1714638976") // (1)
if atlanErr != nil {
logger.Log.Errorf("Error : %v", atlanErr)
}
-
To retrieve an existing scheduled workflow runs, specify:
- name of the workflow as displayed in the
UI (e.g:
atlan-snowflake-miner-1714638976).
- name of the workflow as displayed in the
UI (e.g:
Retrieve all scheduled runs:
To retrieve all existing scheduled workflow runs:
- Java
- Python
- Kotlin
- Go
from pyatlan.client.atlan import AtlanClient
client = AtlanClient()
response = client.workflow.get_all_scheduled_runs() # (1)
- To retrieve all scheduled runs for workflows,
use
client.workflow.get_all_scheduled_runs()method.
response, atlanErr := ctx.WorkflowClient.GetAllScheduledRuns() // (1)
- To retrieve all scheduled runs for workflows,
use
ctx.WorkflowClient.GetAllScheduledRuns()method.
Remove schedule from workflow run
To remove a schedule from an existing workflow run:
- Java
- Python
- Kotlin
- Go
from pyatlan.client.atlan import AtlanClient
client = AtlanClient()
existing_workflow = client.workflow.find_by_type(
prefix=WorkflowPackage.SNOWFLAKE_MINER
)[0] # (1)
response = client.workflow.remove_schedule(
workflow=existing_workflow
) # (2)
-
You can retrieve workflows based on their type (prefix).
Note:Only workflows that have been run will be found. -
Finally, to remove a schedule from this workflow, use the
client.workflow.remove_schedule()method.
existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1) // (1)
response, atlanErr := ctx.WorkflowClient.RemoveSchedule(existingWorkflow[0]) // (2)
-
You can retrieve workflows based on their type (prefix). (You can also specify the maximum number of resulting workflows you want to retrieve as results.)
Note:Only workflows that have been run will be found. -
Finally, to remove a schedule from this workflow, use the
ctx.WorkflowClient.RemoveSchedule()method.