Actor events & state persistence
During its runtime, the Actor receives Actor events sent by the Apify platform or generated by the Apify SDK itself.
Event types
| Event | Data | Description |
|---|---|---|
SYSTEM_INFO |
{
"created_at": datetime,
"cpu_current_usage": float,
"mem_current_bytes": int,
"is_cpu_overloaded": bool
} | This event is emitted regularly and it indicates the current resource usage of the Actor. Theis_cpu_overloaded argument indicates whether the current CPU usage is higher than Config.max_used_cpu_ratio |
MIGRATING | None | Emitted when the Actor running on the Apify platform is going to be migrated to another worker server soon. You can use it to persist the state of the Actor so that once it is executed again on the new server, it doesn't have to start over from the beginning. Once you have persisted the state of your Actor, you can callActor.reboot
to reboot the Actor and trigger the migration immediately, to speed up the process. |
ABORTING | None | When a user aborts an Actor run on the Apify platform,
they can choose to abort gracefully to allow the Actor some time before getting killed.
This graceful abort emits the |
PERSIST_STATE | { "is_migrating": bool } | Emitted in regular intervals (by default 60 seconds) to notify the Actor that it should persist its state, in order to avoid repeating all work when the Actor restarts. This event is also emitted automatically when the PERSIST_STATE event is provided merely for user convenience,
you can achieve the same effect by persisting the state regularly in an interval and listening for the migrating event. |
EXIT | None | Emitted by the SDK (not the platform) when the Actor is about to exit. You can use this event to perform final cleanup tasks, such as closing external connections or sending notifications, before the Actor shuts down. |
Adding handlers to events
To add handlers to these events, you use the Actor.on method,
and to remove them, you use the Actor.off method.
import asyncio
from typing import Any
from apify import Actor, Event
async def main() -> None:
async with Actor:
total_items = 1000
# Load the state if it's saved from some previous execution
processed_items = 0
actor_state = await Actor.get_value('STATE')
if actor_state is not None:
processed_items = actor_state
# Save the state when the `PERSIST_STATE` event happens
async def save_state(event_data: Any) -> None:
nonlocal processed_items
Actor.log.info('Saving Actor state', extra=event_data)
await Actor.set_value('STATE', processed_items)
Actor.on(Event.PERSIST_STATE, save_state)
# Do some fake work
for i in range(processed_items, total_items):
Actor.log.info(f'Processing item {i}...')
processed_items = i
await asyncio.sleep(0.1)
# Suppose we can stop saving the state now
Actor.off(Event.PERSIST_STATE, save_state)
# Do some more fake work, this time something that can't be restarted,
# so no point persisting the state
for j in range(10):
Actor.log.info(f'Processing item {j} of another kind...')
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Automatic state persistence
The example above shows how to manually persist state using the PERSIST_STATE event. For most use cases, you can use the Actor.use_state method instead, which handles state persistence automatically.
Actor.use_state returns a dictionary that is automatically saved to the default key-value store at regular intervals and whenever a migration or shutdown occurs. You can modify the dictionary in place, and changes are persisted without any manual set_value calls.
You can optionally specify a key (the key-value store key under which the state is stored) and a kvs_name (the name of the key-value store to use). By default, the state is stored in the default key-value store under a default key.
import asyncio
from apify import Actor
async def main() -> None:
async with Actor:
# Get or create an auto-persisted state dict.
# On restart or migration, the state is loaded from the KVS.
state = await Actor.use_state(default_value={'processed_items': 0})
# Resume from previous state
start_index = state['processed_items']
Actor.log.info(f'Resuming from item {start_index}')
# Do some work and update the state — it is persisted automatically
for i in range(start_index, 100): # type: ignore[arg-type]
Actor.log.info(f'Processing item {i}...')
state['processed_items'] = i + 1
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
For more details on platform events and state persistence, see the system events and state persistence documentation on the Apify platform.