Event System & Webhooks Guide¶
ALMA's event system allows external systems to react to memory changes through in-process callbacks or HTTP webhooks.
Overview¶
The event system supports two delivery mechanisms:
- In-Process Callbacks - For same-process reactions
- Webhooks - For external service notifications
Event Types¶
| Event | Trigger | Payload |
|---|---|---|
CREATED |
New memory saved | Memory details |
UPDATED |
Memory modified | Old + new values |
DELETED |
Memory removed | Memory ID |
ACCESSED |
Memory retrieved | Query details |
CONSOLIDATED |
Memories merged | Merge details |
In-Process Callbacks¶
Basic Usage¶
from alma.events import get_emitter, MemoryEventType
# Get the global event emitter
emitter = get_emitter()
# Define callback
def on_memory_created(event):
print(f"Memory created: {event.memory_id}")
print(f"Agent: {event.agent}")
print(f"Type: {event.memory_type}")
# Subscribe to events
emitter.subscribe(MemoryEventType.CREATED, on_memory_created)
Event Object¶
from alma.events import MemoryEvent, MemoryEventType
def handler(event: MemoryEvent):
event.event_type # MemoryEventType enum
event.memory_id # UUID of the memory
event.memory_type # 'heuristic', 'outcome', etc.
event.agent # Agent name
event.project_id # Project ID
event.timestamp # When event occurred
event.data # Additional event data
Multiple Subscriptions¶
def on_created(event):
log.info(f"Created: {event.memory_id}")
def on_updated(event):
log.info(f"Updated: {event.memory_id}")
def on_any(event):
# Runs for all events
metrics.increment(f"alma.events.{event.event_type.value}")
emitter.subscribe(MemoryEventType.CREATED, on_created)
emitter.subscribe(MemoryEventType.UPDATED, on_updated)
# Subscribe to multiple types
for event_type in MemoryEventType:
emitter.subscribe(event_type, on_any)
Unsubscribing¶
# Returns subscription ID
sub_id = emitter.subscribe(MemoryEventType.CREATED, handler)
# Later, unsubscribe
emitter.unsubscribe(sub_id)
Async Handlers¶
import asyncio
async def async_handler(event):
await asyncio.sleep(1) # Some async operation
await notify_external_service(event)
# Async handlers are supported
emitter.subscribe(MemoryEventType.CREATED, async_handler)
Webhooks¶
Basic Configuration¶
from alma.events import WebhookConfig, WebhookManager, get_emitter
manager = WebhookManager()
# Add webhook endpoint
manager.add_webhook(WebhookConfig(
url="https://your-app.com/alma-webhook",
events=[MemoryEventType.CREATED, MemoryEventType.UPDATED],
secret="your-secret-key" # For signature verification
))
# Start webhook delivery
manager.start(get_emitter())
WebhookConfig Options¶
WebhookConfig(
# Required
url="https://your-app.com/webhook",
events=[MemoryEventType.CREATED],
# Authentication
secret="hmac-secret", # For X-ALMA-Signature header
headers={ # Custom headers
"Authorization": "Bearer token"
},
# Retry configuration
retry_count=3, # Max retries (default: 3)
retry_delay=5.0, # Initial delay in seconds
retry_backoff=2.0, # Backoff multiplier
# Filtering
agent_filter=["agent1"], # Only these agents
project_filter=["proj1"], # Only these projects
# Delivery
timeout=30.0, # Request timeout in seconds
batch_size=10, # Max events per request
batch_delay=1.0 # Wait time to batch events
)
Webhook Payload¶
{
"event_type": "CREATED",
"memory_id": "550e8400-e29b-41d4-a716-446655440000",
"memory_type": "heuristic",
"agent": "qa_tester",
"project_id": "my-project",
"timestamp": "2026-01-28T12:00:00Z",
"data": {
"strategy": "Use incremental validation for large forms",
"category": "testing_strategies",
"confidence": 0.85
}
}
Signature Verification¶
ALMA signs webhook payloads using HMAC-SHA256:
Verify in your webhook handler:
import hmac
import hashlib
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
expected = 'sha256=' + hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
# In your webhook endpoint
@app.post("/alma-webhook")
async def webhook(request: Request):
payload = await request.body()
signature = request.headers.get("X-ALMA-Signature", "")
if not verify_signature(payload, signature, WEBHOOK_SECRET):
return Response(status_code=401)
event = json.loads(payload)
# Process event...
Multiple Webhooks¶
manager = WebhookManager()
# Different endpoints for different events
manager.add_webhook(WebhookConfig(
url="https://analytics.example.com/events",
events=[MemoryEventType.CREATED, MemoryEventType.UPDATED],
secret="analytics-secret"
))
manager.add_webhook(WebhookConfig(
url="https://alerts.example.com/alma",
events=[MemoryEventType.DELETED], # Alert on deletions
secret="alerts-secret"
))
manager.add_webhook(WebhookConfig(
url="https://backup.example.com/sync",
events=list(MemoryEventType), # All events
secret="backup-secret"
))
manager.start(get_emitter())
Webhook Delivery Status¶
from alma.events import WebhookDeliveryStatus
# After processing
for delivery in manager.get_recent_deliveries():
print(f"URL: {delivery.url}")
print(f"Status: {delivery.status}") # SUCCESS, FAILED, PENDING
print(f"Attempts: {delivery.attempts}")
print(f"Last error: {delivery.last_error}")
Retry Logic¶
Failed deliveries are retried with exponential backoff:
Attempt 1: Immediate
Attempt 2: retry_delay seconds
Attempt 3: retry_delay * retry_backoff seconds
Attempt 4: retry_delay * retry_backoff^2 seconds
...
Default: 3 retries with 5s initial delay and 2x backoff.
Event-Aware Storage¶
Automatically emit events when using storage backends:
from alma.events import EventAwareStorageMixin, get_emitter
from alma.storage import SQLiteStorage
# Storage that emits events
class EventAwareSQLite(EventAwareStorageMixin, SQLiteStorage):
pass
storage = EventAwareSQLite(config)
storage.set_emitter(get_emitter())
# Now all save operations emit events
storage.save_heuristic(heuristic) # Emits CREATED event
Using the Decorator¶
from alma.events import emit_on_save
class MyStorage(StorageBackend):
@emit_on_save(MemoryEventType.CREATED, memory_type='heuristic')
def save_heuristic(self, heuristic):
# Your implementation
return heuristic.id
Common Patterns¶
Analytics Tracking¶
def track_metrics(event):
metrics.increment('alma.memory.created', tags={
'agent': event.agent,
'type': event.memory_type
})
emitter.subscribe(MemoryEventType.CREATED, track_metrics)
Cache Invalidation¶
def invalidate_cache(event):
cache_key = f"memories:{event.agent}:{event.project_id}"
cache.delete(cache_key)
emitter.subscribe(MemoryEventType.CREATED, invalidate_cache)
emitter.subscribe(MemoryEventType.UPDATED, invalidate_cache)
emitter.subscribe(MemoryEventType.DELETED, invalidate_cache)
Audit Logging¶
def audit_log(event):
audit_db.insert({
'event_type': event.event_type.value,
'memory_id': event.memory_id,
'agent': event.agent,
'timestamp': event.timestamp,
'data': event.data
})
for event_type in MemoryEventType:
emitter.subscribe(event_type, audit_log)
Real-time Dashboard¶
import socketio
sio = socketio.AsyncServer()
async def broadcast_event(event):
await sio.emit('alma_event', {
'type': event.event_type.value,
'agent': event.agent,
'memory': event.data
})
emitter.subscribe(MemoryEventType.CREATED, broadcast_event)
Testing¶
Resetting the Emitter¶
from alma.events import reset_emitter
def test_my_handler():
# Fresh emitter for each test
reset_emitter()
emitter = get_emitter()
received = []
emitter.subscribe(MemoryEventType.CREATED, lambda e: received.append(e))
# Trigger your code...
assert len(received) == 1
Mocking Webhooks¶
from unittest.mock import patch
def test_webhook_delivery():
manager = WebhookManager()
manager.add_webhook(WebhookConfig(
url="https://test.example.com",
events=[MemoryEventType.CREATED]
))
with patch('aiohttp.ClientSession.post') as mock_post:
mock_post.return_value.__aenter__.return_value.status = 200
# Trigger event...
assert mock_post.called
Best Practices¶
- Idempotent handlers - Events may be delivered more than once
- Fast callbacks - Keep in-process handlers quick; offload heavy work
- Verify signatures - Always verify webhook signatures in production
- Monitor delivery - Track webhook success/failure rates
- Set reasonable timeouts - Prevent hanging requests
- Use filtering - Only subscribe to events you need