CRUD Service Kafka events (business and error)
Topics used
Note: Actual topic names are prefixed by KAFKA_TOPIC_PREFIX (default crud). Examples below assume the default prefix.
crud.operations: per‑command success/failurecrud.errors: validation errors from/execute(and general error events when targeted)crud.workflows: workflow started/completed/failedcrud.secrets,crud.secrets.audit: vault/secret retrieval successes, cache hits, errorscrud.metrics: operational metrics (not errors; useful for monitoring)
Event shapes and when they are emitted
CRUD operation events (topic crud.operations)
- Emitted on both success and failure of a command.
- Payload (data):
operation: action name (for example,create_user)system,object_type,object_idparams: masked parametersresult: present on successerror: present on failure (string)success: booleantimestamp
- Context added by producer (envelope):
event_id,event_type(create/read/update/delete/list)correlation_id,service,trace_id,span_id,user_context
- Where emitted (examples):
# execute_routes.py
await producer.publish_crud_operation(
operation=execute_request.action,
system=execute_request.system,
object_type=execute_request.object_type,
object_id=object_id,
params=masked_params,
error=str(http_exc.detail), # on failure
correlation_id=correlation_id,
execution_context=ExecutionContext(request),
)
# command_executor.py
await kafka_producer.publish_crud_operation(
operation=action,
system=system_name,
object_type=object_type,
object_id=object_id,
params=params,
error=str(e), # on failure
correlation_id=correlation_id,
execution_context=execution_context,
)
Validation error events (topic crud.errors)
- Emitted by the global handler when FastAPI raises
RequestValidationErrorfor/execute. - Payload (data):
path,errors(FastAPI validation details)system,object_type,action,object_id
- Context:
correlation_idadded by producer wrapper - Where emitted (example):
# main.py
await producer.publish_event(
event_type="validation_error",
data=data,
correlation_id=str(getattr(request.state, "correlation_id", "")) or None,
topic=producer.topics.get("errors"),
execution_context=None,
)
Workflow events (topic crud.workflows)
- Emitted when a workflow starts and when it completes/fails/waits.
- Payload (data):
workflow_name,step_name(optional),status,error(on failure),metadata,started_by,timestamp
- Where emitted (example):
# final_executor.py
await producer.publish_workflow_event(
event_type=evt,
workflow_name=self.workflow_name,
status=self.context.status.value,
correlation_id=self.context.id,
started_by=self.context.started_by_arn,
)
Secret/vault events (topics crud.secrets and crud.secrets.audit)
- Emitted on cache hits, successful retrievals, and errors when fetching secrets from vault providers.
- Payload (data):
vault_type,credential_id,success,error(on failure),cached,timestamp
- Emitted to both functional and audit topics.
- Where emitted (examples):
# vault_service.py (error path)
await producer.publish_secret_event(
event_type=EventType.SECRET_ERROR,
vault_type=vault_type,
credential_id=pointer.credential_id,
success=False,
error="Vault strategy not found",
)
# crud_producer.py (audit fan‑out)
await self.publish_event(
event_type=event_type,
data=data,
topic=self.topics["secrets_audit"],
# ...
)
Masking and PII
- Parameters are masked before being passed to the producer; sinks also mask sensitive fields.
- Secrets and headers are sanitized by callers.
credential_idis included as‑is; secret values are never included.
Correlation and tracing
- Each event includes a
correlation_id(request or workflow id). - If OpenTelemetry is active,
trace_idandspan_idare attached. user_contextcarries subject/issuer when available. The subject/unique_id uses the canonical provider alias from IdP config for ARNs (provider falls back to IdP entryname).
Example event (failure on crud.operations)
{
"event_id": "...",
"event_type": "update",
"timestamp": "...",
"correlation_id": "b1e-...",
"service": "crud-service",
"trace_id": "...",
"span_id": "...",
"user_context": {"unique_id": "...", "subject": "auth:account:...", "issuer": "..."},
"data": {
"operation": "update",
"system": "entraid_contractors",
"object_type": "user",
"object_id": "E123456",
"params": {"Password": "****MASKED****", "Email": "john@example.com"},
"error": "Validation failed: missing field 'LastName'",
"success": false,
"timestamp": "..."
}
}
Consuming for troubleshooting
- CRUD failures: read
crud.operationsfiltering wheresuccess=falseorerrorpresent; group bycorrelation_idorobject_id. - Validation problems: read last N from
crud.errors. - Workflow issues: read
crud.workflowswherestatus=FAILEDor anerroris present. - Vault issues: read
crud.secretsandcrud.secrets.auditwheresuccess=false.
Examples (host with kcat):
kcat -b <broker> -t crud.errors -C -o -10
kcat -b <broker> -t crud.operations -C -o -10 | jq 'select(.data.success==false)'