Logging, Events, and Publishers
This page documents what the BFF logs and where it sends events, based on the current code.
Kafka producers (two layers)
-
Platform producer wrapper (
ms_bff_spike/ms_bff/src/core/kafka_producer.py)- Publishes structured events via a shared platform utility (
publish_structured) - Topic names used:
empowernow.bff.auditempowernow.bff.authempowernow.bff.sessions
- Helper functions (async):
publish_audit_event(event_type, payload)publish_auth_event(event_type, request_context)publish_session_event(event_type, session_context)
- Expected context keys are visible in the helpers (e.g.,
user_id,session_id,correlation_id).
- Publishes structured events via a shared platform utility (
-
Application producer (
ms_bff_spike/ms_bff/src/services/kafka_producer.py)- Uses
aiokafka.AIOKafkaProducer - Standard topics (prefixed):
events→<prefix>.eventsrequests→<prefix>.requestserrors→<prefix>.errorsmetrics→<prefix>.metrics
- Event envelope (from
publish_event):event_id(uuid)event_type(string)timestamp(ISO-8601, UTC)correlation_id(uuid)service=ms-bffdata(payload)
- API events (
publish_api_event): method, path, status_code, elapsed_ms, optionaluser_id; sent torequeststopic; partition key defaults topath. - Error events (
publish_error): type, message, source, optional details; sent toerrorstopic; partition key<source>:<error_type>. - Metrics (
publish_metrics):metric_type,values; sent tometricstopic; partition keymetric_type. - JSON serializer: handles
datetime(UTC ISO),Enum,UUID, sets, pydantic models, and circular references. - Lifecycle:
init_kafka_producer(...)creates a singleton;start()must be called to connect;stop()closes the producer.
- Uses
CAEP publisher (Redis queue)
- Module:
ms_bff_spike/ms_bff/src/shared_signals/publisher.py - Queue: Redis list
caep:outbound - Functions enqueue CAEP events (async):
send_session_created(user_id, session_id)→ event type.../session-createdsend_session_terminated(user_id, session_id, everywhere=False)→.../session-terminatedsend_token_rotated(user_id, session_id)→.../token-claims-change
- Payload format:
- If
CAEP_SIGNING_KEYis set: a signed JWT is produced with claims{ iss: BFF_BASE_URL, iat, jti, events: { <type>: payload } }(alg RS256, headerkidfromCAEP_KID, defaultbff-caep). - Otherwise: JSON object
{ event_type, payload, ts }.
- If
- Metrics: Prometheus counter
caep_enqueued_total{event_type=...}increments per enqueued event.
Configuration (relevant keys)
- Kafka (see
settings.yamland environment):kafka.bootstrap_servers,kafka.client_id,kafka.topic_prefix,kafka.acks,kafka.compression_type,kafka.enabled- Environment examples:
KAFKA_BOOTSTRAP_SERVERS,KAFKA_CLIENT_ID,KAFKA_TOPIC_PREFIX,KAFKA_ACKS,KAFKA_COMPRESSION_TYPE
- CAEP:
CAEP_SIGNING_KEY(PEM/JWK),CAEP_KID(defaultbff-caep),BFF_BASE_URLforiss
Operational notes
- Start/stop: the application startup should call
init_kafka_producer(...)and thenstart(); shutdown path callsstop(). - Partitioning: API events use path as key; errors use
<source>:<type>; metrics use metric type — this keeps related data co‑located. - Backpressure: Producer uses
send_and_wait(awaits broker ACKs peracks).
Example payloads
- Generic event envelope (Kafka
<prefix>.events):
{
"event_id": "5a3b9e2e-2d7e-4d7f-9a6a-0a1e9e5b2d17",
"event_type": "session_created",
"timestamp": "2025-08-10T22:01:02.345Z",
"correlation_id": "c46e5bb1-9c3a-4a3a-9c8a-0e5b8c2f1a77",
"service": "ms-bff",
"data": {
"user_id": "user@example.com",
"session_id": "abc123def",
"ip": "203.0.113.10"
}
}
- API request event (
<prefix>.requests):
{
"event_id": "e2f0b4c3-2fd2-4c9e-9a9d-1a2b3c4d5e6f",
"event_type": "api_request",
"timestamp": "2025-08-10T22:02:10.100Z",
"correlation_id": "a2d4e6f8-10ab-42cd-9ef0-112233445566",
"service": "ms-bff",
"data": {
"method": "GET",
"path": "/api/crud/workflows",
"status_code": 200,
"elapsed_ms": 42.7,
"user_id": "user@example.com"
}
}
- Error event (
<prefix>.errors):
{
"event_id": "1f2e3d4c-5b6a-7081-92a3-b4c5d6e7f809",
"event_type": "error",
"timestamp": "2025-08-10T22:03:55.000Z",
"correlation_id": "c0ffee00-1234-4abc-9def-777777777777",
"service": "ms-bff",
"data": {
"error_type": "UpstreamTimeout",
"message": "crud_service: /workflows timed out",
"source": "proxy",
"timestamp": "2025-08-10T22:03:55.000Z",
"details": {
"target_service": "crud_service",
"path": "/workflows",
"timeout_ms": 5000
}
}
}
- Metrics event (
<prefix>.metrics):
{
"event_id": "0b1c2d3e-4f50-6172-8394-a5b6c7d8e9f0",
"event_type": "metrics",
"timestamp": "2025-08-10T22:04:20.000Z",
"correlation_id": "f00ba4-55aa-46d2-b9f1-1122aa33bb44",
"service": "ms-bff",
"data": {
"metric_type": "auth_flow",
"timestamp": "2025-08-10T22:04:20.000Z",
"values": {
"oauth_flow_duration_ms": 820,
"token_refresh_success": 1
}
}
}
- Platform audit/auth/session events (topic
empowernow.bff.*): payloads are app‑defined JSON; typical fields includeuser_id,session_id,correlation_id, and event‑specific data. Example audit:
{
"event": "bff_startup",
"version": "1.0.0",
"node": "bff-0",
"correlation_id": "b0a1a2a3-a4a5-46b7-88c9-d0e1f2a3b4c5"
}
- CAEP message enqueued to Redis (
caep:outbound) when unsigned mode is used:
{
"event_type": "https://schemas.openid.net/secevent/caep/event-type/session-created",
"payload": {"sub": "user@example.com", "sid": "abc123def"},
"ts": 1723332000
}
- CAEP signed JWT (header + claims structure):
{
"header": {"alg": "RS256", "kid": "bff-caep"},
"claims": {
"iss": "https://api.ocg.labs.empowernow.ai",
"iat": 1723332000,
"jti": "session-created:abc123def:1723332000",
"events": {
"https://schemas.openid.net/secevent/caep/event-type/session-created": {
"sub": "user@example.com",
"sid": "abc123def"
}
}
}
}
How to test locally
- Ensure Kafka is up (see compose stack) and set
KAFKA_BOOTSTRAP_SERVERSandKAFKA_TOPIC_PREFIX. - On app startup, initialize and start the producer; call an API route to generate a
requestsevent and inspect the topic with your Kafka UI. - For CAEP, set
CAEP_SIGNING_KEY(PEM) to emit signed JWTs, or omit to send JSON to the Redis listcaep:outbound.