ADR-OS01: Object-Service Design Architecture V3¶
Practical, opinionated architecture for design, implementation, and evolution of complex objects and API-backed services across the TNH Scholar suite.
- Filename:
adr-os01-design-v3.md - Heading:
# ADR-OS01: Object-Service Design Architecture V3 - Status: Accepted
- Date: 2025-10-24
- Authors: Aaron Solomon, OpenAI GPT-5, Anthropic Claude Sonnet 4.5
- Version: 3.0 (Integrated)
0. Big Picture (At a Glance)¶
Goal: Ship reliable, composable features fast by separating concerns and keeping boundaries explicit.
Architectural Overview¶
Application Layer (CLI, notebooks, web, Streamlit)
└─ Orchestrators (thin): <Feature>Processor, ResultWriter
▲
│ domain objects / protocols
│
Domain Service Layer
└─ <Feature>Service (Protocol-based orchestrator)
└─ ProviderClient (impl: OpenAIClient, PyannoteClient, etc.)
└─ RequestMapper (bi-directional: domain ↔ transport)
└─ TransportClient (HTTP, polling, streaming)
▲
│ transport models (anti-corruption boundary)
│
Transport Layer
└─ VendorClient (upload/start/status/poll, retries, rate-limit)
JobPoller (backoff, jitter, deadline)
Unified Layer Model¶
graph TD
A[App / CLI] --> B[Service Layer / Orchestrator]
B --> C[Port / Protocol]
C --> D[Adapter + Mapper]
D --> E[Transport Client]
E --> F[External Provider / SDK]
subgraph Domain Layer
B
C
end
subgraph Infrastructure Layer
D
E
end
Core Contracts¶
- Config at init, Params per call, Response envelope always
- Service protocol is minimal:
start(),get_response(),generate()for async jobs;generate()orrun()for sync - Adapter maps API shapes → canonical domain shapes (bi-directional)
- All payloads use strong typing (Pydantic or dataclass models)
- No literals or untyped dicts in application logic
1. Design Goals & Principles¶
1.1 Core Design Goals¶
- Strong Typing: All payloads, parameters, and responses use Pydantic or dataclass models
- No Literals: No string or numeric literals in app logic; defaults come from Settings or Policies
- Bi-directional Adapters: Each adapter maps domain → SDK (request) and SDK → domain (response)
- Protocol-based Ports: All boundaries declared as
Protocolinterfaces (structural typing) - Replaceable Providers: Any external dependency isolated behind an adapter implementing common protocol
- Declarative Configuration: Every component derives runtime config from typed sources
- Transport / Domain Separation: Domain logic independent of serialization or transmission
- No Side-Effects in Domain: All side-effects (I/O, network) live exclusively in adapters or clients
- Explicit Boundaries: Clear separation between config, params, policy, and context
1.2 Rule of Thumb¶
- Wire behavior → Client
- Canonical data shape → Adapter/Service
- Usage intent → Params/Policy
- Long-lived configuration → Config/Settings
- Defaults → Settings < Pattern < Policy < Request (precedence order)
2. Core Concepts & Naming Taxonomy¶
2.1 Configuration & Runtime Concepts¶
| Concept | Type | Scope | Purpose | Example |
|---|---|---|---|---|
| Settings | BaseSettings |
Application | Environment/deployment config | API keys, base URLs, default models |
| Config | <Thing>Config |
Service/Client | Construction-time settings; immutable | TransportConfig, ProviderConfig |
| Params | <Thing>Params |
Per-call | Request-specific inputs; vary run-to-run | DiarizationParams, RenderRequest |
| Policy | DomainPolicy, ServicePolicy |
Service | Opinionated behavior toggles | ParamsPolicy, completion_mode, overlap handling |
| Pattern | Pattern |
Domain | Prompt templates with metadata | System/user message templates |
| Context | ExecutionContext |
Pipeline | Ephemeral runtime state (optional) | Current stage, accumulated metadata |
2.2 Response & Result Concepts¶
| Concept | Type | Purpose | Presence |
|---|---|---|---|
| Envelope | Envelope |
Universal response wrapper | Always |
| Result | <Feature>Result |
Domain success payload | Only when status == "succeeded" |
| Response | ProviderResponse |
Transport-level response | At adapter boundary |
| Provenance | Provenance |
Metadata about execution | Always (in Envelope) |
2.3 Architectural Concepts¶
| Concept | Purpose | Location | Implements |
|---|---|---|---|
| Client | Pure transport/HTTP operations | transport/ |
Protocol (e.g., ProviderClient) |
| Mapper | Bi-directional type translation | Inside Adapter | Pure functions or classes |
| Adapter | Transport ↔ domain mapping | adapters/ or providers/ |
Protocol + uses Mapper |
| Service | Orchestration, composes ports | service/ |
Protocol |
| Processor | Application façade | app/ |
Thin; wires persistence |
| Port | Abstract interface | Domain layer | Protocol definition |
3. Layer Structure & Responsibilities¶
3.1 Conceptual Layers¶
| Layer | Responsibility | Example Classes | Allowed Dependencies |
|---|---|---|---|
| Application | Entry points, user interaction | CLI, notebooks, Streamlit | Service protocols only |
| Service (Orchestrator) | Coordinates flow, composes ports | GenAIService, DiarizationService |
Ports, Settings, Policy |
| Ports (Protocols) | Abstract interfaces | ProviderClient, AudioDiarizer |
Domain models only |
| Adapters | Implement protocols, SDK mapping | OpenAIClient, PyannoteClient |
Mappers, Transport models |
| Mappers | Bi-directional type translation | OpenAIRequestMapper |
Transport + SDK types |
| Transport Clients | HTTP, polling, streaming | VendorClient, JobPoller |
Config only |
| Domain Models | Typed business objects | CompletionResult, SpeakerBlock |
Nothing (pure) |
| Transport Models | I/O-level contracts | ProviderRequest, JobStatusResponse |
Nothing (pure) |
3.2 Dependency Direction¶
Dependencies flow inward: Application → Service → Port → (Adapter → Mapper → Transport Client) → External.
Forbidden dependencies:
- Domain → Infrastructure
- Models → Services
- Ports → Implementations
4. Configuration Taxonomy¶
4.1 Settings (Application-wide)¶
# config/settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application-wide environment configuration."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore"
)
# Provider credentials
openai_api_key: str | None = None
openai_org: str | None = None
assemblyai_api_key: str | None = None
# Defaults
default_provider: str = "openai"
default_model: str = "gpt-4o-mini"
default_temperature: float = 0.2
default_max_output_tokens: int = 512
# System configuration
log_level: str = "INFO"
enable_tracing: bool = False
@classmethod
def from_env(cls) -> "Settings":
return cls()
4.2 Transport Configuration¶
# config/transport.py
from pydantic import BaseModel, Field
class TransportConfig(BaseModel):
"""Pure transport/HTTP configuration; only the client sees this."""
base_url: str
api_key: str = Field(repr=False)
organization: str | None = None
connect_timeout_s: float = 10.0
read_timeout_s: float = 30.0
max_retries: int = 5
backoff_base_s: float = 0.5
backoff_max_s: float = 8.0
rate_limit_rps: float = 5.0
@classmethod
def from_env(cls, prefix: str) -> "TransportConfig":
import os
return cls(
base_url=os.environ.get(f"{prefix}_BASE_URL", ""),
api_key=os.environ.get(f"{prefix}_API_KEY", ""),
)
4.3 Policy Structures¶
# config/policy.py
from typing import Literal
from pydantic import BaseModel
class DomainPolicy(BaseModel):
"""Domain concerns about how the app wants to use the service."""
completion_mode: Literal["snapshot", "wait"] = "wait"
deadline_s: float | None = None
allow_partials: bool = True
overlap_mode: Literal["ignore", "merge", "separate"] = "merge"
class ServicePolicy(BaseModel):
"""Adapter/service-side normalization and mapping decisions."""
default_label: str = "DEFAULT"
normalize_case: bool = True
min_unit_size: int = 0
preserve_formatting: bool = True
class ParamsPolicy(BaseModel):
"""GenAI-specific parameter precedence policy."""
provider: str
model: str
temperature: float
max_output_tokens: int
seed: int | None = None
top_p: float | None = None
4.4 Policy Precedence¶
Order of precedence (highest to lowest):
- Explicit request parameters (RenderRequest, FeatureParams)
- Pattern defaults (from Pattern catalog)
- Config policy (YAML or database)
- Settings defaults (environment/deployment)
5. Canonical Domain Envelope¶
5.1 Envelope Structure¶
# domain/models.py
from typing import Any, Literal
from pydantic import BaseModel
class Provenance(BaseModel):
"""Metadata about how/when/with what the result was produced."""
backend: str # e.g., "openai", "pyannote", "assemblyai"
model: str | None = None
job_id: str | None = None
started_at: str | None = None
completed_at: str | None = None
schema_version: str = "1.0"
policy_version: str = "1.0"
params: dict = {} # Effective parameters used
system_version: str | None = None # Git SHA/tag for reproducibility
class Envelope(BaseModel):
"""Universal response wrapper for all services."""
status: Literal["pending", "running", "succeeded", "failed", "timeout"]
result: Any | None = None # Feature-specific; present only on success
error: str | None = None
diagnostics: dict = {} # Curated metrics, not raw dump
provenance: Provenance
def succeeded(self) -> bool:
return self.status == "succeeded"
def failed(self) -> bool:
return self.status == "failed"
5.2 Envelope Invariants¶
resultonly present whenstatus == "succeeded"erroronly present forstatus == "failed"provenance.job_idset when available (embed at transport boundary)diagnosticscontains curated, actionable metrics (not raw dumps)provenance.paramsrecords effective policy/parameters used- Always include
schema_versionandpolicy_versionin provenance
6. Service Protocols¶
6.1 Protocol Design Rules¶
- Use
Protocolfor interfaces (structural typing) - Use
ABConly when enforcing init-time invariants or providing mixins - Every service has exactly one orchestrator class
- Every adapter implements exactly one protocol
- Keep protocol interfaces minimal (3-5 methods maximum)
6.2 Async Job Service Protocol¶
# domain/service.py
from pathlib import Path
from typing import Protocol
from .models import Envelope
class FeatureParams(BaseModel):
"""API-facing parameters (safe subset)."""
pass
class AsyncFeatureService(Protocol):
"""Protocol for services that support async job patterns."""
def start(
self,
input_path: Path,
params: FeatureParams | None = None
) -> str:
"""Start async job, return job_id."""
...
def get_response(
self,
job_id: str,
*,
wait_until_complete: bool = False
) -> Envelope:
"""Get job status/result. If wait=True, poll until complete."""
...
def generate(
self,
input_path: Path,
params: FeatureParams | None = None,
*,
wait_until_complete: bool = True
) -> Envelope:
"""Convenience: start + wait for result."""
...
6.3 Sync Request/Response Protocol¶
# domain/service.py
class SyncFeatureService(Protocol):
"""Protocol for synchronous request/response services."""
def generate(self, req: RequestParams) -> ResultModel:
"""Synchronous generation; returns domain result directly."""
...
6.4 GenAI Service Protocol (Example)¶
class GenAIServicePort(Protocol):
"""Protocol for GenAI text generation services."""
def generate(self, req: RenderRequest) -> CompletionResult:
"""Generate completion from rendered prompt."""
...
class GenAIService:
"""Orchestrator for GenAI generation."""
def __init__(
self,
settings: Settings,
catalog: PatternCatalog | None = None,
provider: ProviderClient | None = None,
):
self.settings = settings
self.catalog = catalog or LegacyPatternCatalog()
self.provider = provider or OpenAIClient(
settings.openai_api_key,
settings.openai_org
)
def generate(self, req: RenderRequest) -> CompletionResult:
# 1. Get pattern from catalog
pattern = self.catalog.get(req.pattern_key)
# 2. Render prompt with context
rendered = self.catalog.render(pattern, req)
# 3. Apply policy precedence
params = apply_policy(
req.intent,
pattern.default_params,
req.model_hint
)
# 4. Build transport request
provider_req = ProviderRequest(
provider=params.provider,
model=params.model,
messages=rendered.messages,
system=rendered.system,
temperature=params.temperature,
max_output_tokens=params.max_output_tokens,
seed=params.seed,
)
# 5. Call provider through port
resp = self.provider.generate(provider_req)
# 6. Return domain result
return CompletionResult(
text=resp.text,
usage=resp.usage,
model=resp.model,
provider=resp.provider,
prompt_fingerprint=rendered.fingerprint,
)
7. Transport Models & Domain Models¶
7.1 Transport Models (at boundaries)¶
# models/transport.py
from pydantic import BaseModel
class Message(BaseModel):
"""Universal message format for chat-based APIs."""
role: Literal["system", "user", "assistant"]
content: str
class ProviderRequest(BaseModel):
"""Transport-level request for any GenAI provider."""
provider: str
model: str
messages: list[Message]
system: str | None = None
temperature: float
max_output_tokens: int
seed: int | None = None
top_p: float | None = None
class ProviderResponse(BaseModel):
"""Transport-level response from GenAI provider."""
text: str
usage: Usage
model: str
provider: str
finish_reason: str | None = None
class JobStatusResponse(BaseModel):
"""Transport-level job status for async services."""
status: str # "pending" | "running" | "succeeded" | "failed"
job_id: str # Embed job_id for propagation
elapsed_s: float = 0.0
payload: dict = {} # Raw vendor payload (optional)
error: str | None = None
7.2 Domain Models¶
# models/domain.py
from pydantic import BaseModel
class CompletionResult(BaseModel):
"""Domain result for GenAI completions."""
text: str
usage: Usage
model: str
provider: str
prompt_fingerprint: str
class DiarizationResult(BaseModel):
"""Domain result for speaker diarization."""
segments: list[SpeakerBlock]
num_speakers: int
duration_s: float
class SpeakerBlock(BaseModel):
"""Individual speaker segment."""
speaker_id: str
start_s: float
end_s: float
confidence: float | None = None
8. Adapter Architecture¶
8.1 Adapter Role & Responsibilities¶
Adapters encapsulate all external I/O logic. Each adapter:
- Implements exactly one
Protocol(port) - Acts as bi-directional translator: domain ↔ transport ↔ SDK
- Isolates SDK-specific code from domain logic
- Handles provider-specific error mapping
8.2 Bi-Directional Mapping Contract¶
Every adapter implements request and response mapping:
class ProviderRequestMapper:
"""Maps between domain and SDK request formats."""
def to_sdk_request(self, req: ProviderRequest) -> SDKRequest:
"""Domain → SDK request transformation."""
...
def from_sdk_response(self, resp: SDKResponse) -> ProviderResponse:
"""SDK → domain response transformation."""
...
8.3 Adapter Boundary Diagram¶
graph LR
A[Domain Model] --> B[ProviderRequest]
B --> C[Mapper.to_sdk_request]
C --> D[SDK Request]
D --> E[External Provider]
E --> F[SDK Response]
F --> G[Mapper.from_sdk_response]
G --> H[ProviderResponse]
H --> I[Domain Model]
8.4 Sync Adapter Pattern (GenAI)¶
# providers/openai_adapter.py
from openai import OpenAI
class OpenAIClient:
"""Adapter for OpenAI chat completions API."""
PROVIDER = "openai"
def __init__(
self,
api_key: str,
organization: str,
mapper: OpenAIRequestMapper | None = None
):
self._client = OpenAI(api_key=api_key, organization=organization)
self._mapper = mapper or OpenAIRequestMapper()
def generate(self, req: ProviderRequest) -> ProviderResponse:
"""Implement ProviderClient protocol."""
# 1. Map domain → SDK
mapped = self._mapper.to_sdk_request(req)
# 2. Call SDK
try:
sdk_resp = self._client.chat.completions.create(
**mapped.model_dump()
)
except Exception as e:
raise ProviderError(f"OpenAI API error: {e}") from e
# 3. Map SDK → domain
return self._mapper.from_sdk_response(sdk_resp)
class OpenAIRequestMapper:
"""Bi-directional mapper for OpenAI API."""
def to_sdk_request(self, req: ProviderRequest) -> dict:
"""Convert ProviderRequest to OpenAI API format."""
messages = [{"role": m.role, "content": m.content} for m in req.messages]
if req.system:
messages.insert(0, {"role": "system", "content": req.system})
return {
"model": req.model,
"messages": messages,
"temperature": req.temperature,
"max_tokens": req.max_output_tokens,
"seed": req.seed,
"top_p": req.top_p,
}
def from_sdk_response(self, resp) -> ProviderResponse:
"""Convert OpenAI response to ProviderResponse."""
return ProviderResponse(
text=resp.choices[0].message.content,
usage=Usage(
input_tokens=resp.usage.prompt_tokens,
output_tokens=resp.usage.completion_tokens,
),
model=resp.model,
provider="openai",
finish_reason=resp.choices[0].finish_reason,
)
8.5 Async Adapter Pattern (Diarization)¶
# adapters/assemblyai_adapter.py
from pathlib import Path
class AssemblyAIClient:
"""Adapter for AssemblyAI async transcription API."""
def __init__(self, cfg: TransportConfig, mapper: AssemblyAIMapper):
self.client = VendorClient(cfg) # Transport layer
self.mapper = mapper
self.backend = "assemblyai"
def start(self, input_path: Path, params: FeatureParams | None = None) -> str:
"""Start transcription job."""
media_id = self.client.upload(input_path)
job_id = self.client.start(media_id, params.model_dump() if params else None)
return job_id
def get_response(
self,
job_id: str,
*,
wait_until_complete: bool = False
) -> Envelope:
"""Get job status/result."""
if wait_until_complete:
jsr = self.client.poll_until_done(job_id)
else:
jsr = self.client.job_status(job_id)
return self.mapper.to_envelope(jsr)
def generate(
self,
input_path: Path,
params: FeatureParams | None = None,
*,
wait_until_complete: bool = True
) -> Envelope:
"""Convenience: start + wait."""
job_id = self.start(input_path, params)
return self.get_response(job_id, wait_until_complete=wait_until_complete)
class AssemblyAIMapper:
"""Maps AssemblyAI responses to domain envelopes."""
def __init__(self, backend: str = "assemblyai", model: str | None = None):
self.backend = backend
self.model = model
def to_envelope(self, jsr: JobStatusResponse) -> Envelope:
"""Convert JobStatusResponse to Envelope."""
prov = Provenance(
backend=self.backend,
model=self.model,
job_id=jsr.job_id,
params={},
)
if jsr.status == "succeeded":
result = self._map_result(jsr)
return Envelope(status="succeeded", result=result, provenance=prov)
elif jsr.status == "failed":
return Envelope(status="failed", error=jsr.error, provenance=prov)
elif jsr.status == "timeout":
return Envelope(status="timeout", error="Job exceeded deadline", provenance=prov)
else:
return Envelope(status=jsr.status, provenance=prov)
def _map_result(self, jsr: JobStatusResponse) -> DiarizationResult:
"""Extract domain result from vendor payload."""
# Parse vendor-specific payload → domain model
segments = [
SpeakerBlock(
speaker_id=seg["speaker"],
start_s=seg["start"],
end_s=seg["end"]
)
for seg in jsr.payload.get("utterances", [])
]
return DiarizationResult(
segments=segments,
num_speakers=len(set(s.speaker_id for s in segments)),
duration_s=jsr.elapsed_s
)
8.6 Adapter Mapping Rules¶
- No literals or untyped dicts in mapping code; use TypedDicts or SDK models
- All defaults come from
SettingsorPolicy - Mapper code is pure (no I/O, no side-effects)
- Client code handles exceptions and wraps them as typed errors
- Map vendor errors →
ProviderErrorsubclasses for uniform handling
8.7 Error Handling¶
# models/errors.py
class ProviderError(Exception):
"""Base error for all provider failures."""
pass
class RateLimitError(ProviderError):
"""Rate limit exceeded."""
pass
class AuthenticationError(ProviderError):
"""Invalid credentials."""
pass
class TimeoutError(ProviderError):
"""Request exceeded deadline."""
pass
All adapters must raise typed errors for uniform error handling by orchestration layers.
8.8 Internal Layer Adapters (Boundary Mapping Inside Our Own System)¶
Adapters and mappers are not just for translating vendor payloads—they are equally critical for translating between our own internal abstraction layers. This principle applies whenever there is a semantic boundary or abstraction shift, even within our own codebase.
Rationale for Internal Mapping¶
- Semantic Shifts: As data moves from transport to domain to orchestration, the meaning, required fields, or invariants of objects often change. Mapping makes these changes explicit.
- Attaching Policy, Safety, Provenance: Internal mappers are the right place to inject policy decisions, provenance metadata, or safety checks as data crosses boundaries.
- Orchestration Cleanliness: Keeping orchestration code free of manual stitching, dict access, or ad hoc conversions avoids “god services” and keeps logic testable.
- Future-Proofing: By isolating mapping, we can later support multiple result types, partial results, or envelope variants without rewriting orchestration.
Example: Mapping Between Internal Layers¶
-
ProviderResponse → CompletionResult (GenAI)
-
JobStatusResponse → Envelope (Async Jobs)
# mappers/job_status_mapper.py def to_envelope(jsr: JobStatusResponse, backend: str, model: str | None = None) -> Envelope: prov = Provenance( backend=backend, model=model, job_id=jsr.job_id, params={}, ) if jsr.status == "succeeded": result = ... # map jsr.payload to the domain result return Envelope(status="succeeded", result=result, provenance=prov) elif jsr.status == "failed": return Envelope(status="failed", error=jsr.error, provenance=prov) elif jsr.status == "timeout": return Envelope(status="timeout", error="Job exceeded deadline", provenance=prov) else: return Envelope(status=jsr.status, provenance=prov)
Explicit Rules for Internal Mapping¶
- All cross-layer translations (even internal) must happen in mappers.
- Mappers are pure: No I/O, no side effects, no logging, no exception handling.
- Orchestrators never manually stitch domain objects or reach into dicts.
- Naming convention: Use
to_<DomainType>()or<DomainType>Mapperfor class mappers. - Class vs Function: Use a class if mapping depends on injected config or policy; use a pure function if stateless.
Benefits¶
- Consistency: All cross-boundary data flows are explicit and uniform.
- Testability: Mappers can be tested in isolation, including edge cases and invariants.
- Evolution: Adding new result types, partials, or shape changes is localized to mappers.
- Prevents “God Services”: Orchestration remains free of ad hoc data stitching, keeping logic readable and maintainable.
9. Transport Layer: Clients & Polling¶
9.1 Transport Client (HTTP operations)¶
# transport/client.py
import time
from pathlib import Path
from .models import JobStatusResponse
class VendorClient:
"""Pure transport/HTTP client for async job APIs."""
def __init__(self, cfg: TransportConfig):
self.cfg = cfg
self._session = self._create_session()
def _create_session(self):
"""Create HTTP session with retries and rate limiting."""
# Implementation details...
pass
def upload(self, path: Path) -> str:
"""Upload file to vendor, return media_id."""
# POST file → return ID
return "media_123"
def start(self, media_id: str, params: dict | None = None) -> str:
"""Start processing job, return job_id."""
# POST /jobs → return job_id
return "job_abc"
def job_status(self, job_id: str) -> JobStatusResponse:
"""Get current job status."""
# GET /jobs/{job_id} → JobStatusResponse
return JobStatusResponse(status="running", job_id=job_id)
def poll_until_done(
self,
job_id: str,
deadline_s: float | None = None
) -> JobStatusResponse:
"""Poll job until completion with exponential backoff."""
backoff = self.cfg.backoff_base_s
start = time.time()
while True:
jsr = self.job_status(job_id)
# Terminal states
if jsr.status in {"succeeded", "failed"}:
return jsr
# Timeout check
if deadline_s and (time.time() - start) > deadline_s:
return JobStatusResponse(
status="timeout",
job_id=job_id,
elapsed_s=time.time() - start
)
# Exponential backoff with jitter
time.sleep(backoff + random.uniform(0, backoff * 0.1))
backoff = min(backoff * 1.6, self.cfg.backoff_max_s)
9.2 Job Poller (Reusable)¶
# transport/poller.py
from typing import Callable, TypeVar
import time
import random
T = TypeVar('T')
class JobPoller:
"""Reusable polling logic with backoff and jitter."""
def __init__(
self,
check_fn: Callable[[str], T],
is_terminal: Callable[[T], bool],
backoff_base: float = 0.5,
backoff_max: float = 8.0,
deadline_s: float | None = None
):
self.check_fn = check_fn
self.is_terminal = is_terminal
self.backoff_base = backoff_base
self.backoff_max = backoff_max
self.deadline_s = deadline_s
def poll(self, job_id: str) -> T:
"""Poll until terminal state or timeout."""
backoff = self.backoff_base
start = time.time()
while True:
result = self.check_fn(job_id)
if self.is_terminal(result):
return result
if self.deadline_s and (time.time() - start) > self.deadline_s:
raise TimeoutError(f"Job {job_id} exceeded deadline")
jitter = random.uniform(0, backoff * 0.1)
time.sleep(backoff + jitter)
backoff = min(backoff * 1.6, self.backoff_max)
10. Testing Strategy¶
10.1 Unit Testing (Mock Protocols)¶
# tests/test_service.py
class FakeProviderClient:
"""Mock implementation of ProviderClient protocol."""
def generate(self, req: ProviderRequest) -> ProviderResponse:
return ProviderResponse(
text="mocked response",
usage=Usage(input_tokens=10, output_tokens=20),
model=req.model,
provider="fake"
)
def test_generate():
"""Test service orchestration with mocked provider."""
fake_provider = FakeProviderClient()
settings = Settings(default_provider="fake")
service = GenAIService(settings, provider=fake_provider)
req = RenderRequest(pattern_key="test", context={})
result = service.generate(req)
assert result.text == "mocked response"
assert result.provider == "fake"
10.2 Integration Testing¶
# tests/integration/test_openai.py
def test_openai_integration():
"""End-to-end test with real OpenAI API."""
settings = Settings.from_env()
service = GenAIService(settings)
req = RenderRequest(
pattern_key="summarize",
context={"text": "Long document..."}
)
result = service.generate(req)
assert result.succeeded()
assert len(result.text) > 0
10.3 Contract Testing (Envelope Shape)¶
def test_envelope_contract():
"""Verify envelope always contains required fields."""
env = service.get_response(job_id)
assert hasattr(env, "status")
assert hasattr(env, "provenance")
assert env.provenance.backend is not None
assert env.provenance.schema_version is not None
if env.succeeded():
assert env.result is not None
assert env.error is None
10.4 Golden Testing (Policy Variants)¶
For each policy profile, snapshot the envelope shape to detect regressions.
11. Interface Modalities & Integration Patterns¶
Design for a range of service shapes. Choose the smallest that fits; keep the envelope consistent.
11.1 Modality Decision Tree¶
- Is the task fast (<2s)? → Synchronous request/response
- Long-running and vendor can't call back? → Async + polling
- Vendor supports webhooks and you can host an endpoint? → Async + webhook
- You need incremental UX? → Streaming
- You need decoupling/scale? → Message queue / Pub-Sub
- It's a local library? → Synchronous or streaming
11.2 Synchronous Request/Response¶
When to use: Small/fast tasks; local libraries; remote endpoints with low latency.
- Contract:
run(params) -> Envelope(nostart()/get_response()) - Failure model: Exceptions for programmer errors;
failedenvelope for domain/remote errors - Examples: Local Whisper inference; lightweight text cleaners; GenAI completions
11.3 Asynchronous Jobs with Polling¶
When to use: Long-running tasks without push callbacks.
- Contract:
start() -> job_id,get_response(job_id, wait|snapshot) -> Envelope - Failure model: Timeouts →
timeoutenvelope (optionally partials); network errors retried - Examples: Diarization, transcription, video processing
11.4 Asynchronous Jobs with Webhooks¶
When to use: Vendor can push results; you control an HTTP endpoint.
- Contract:
start(callback_url) -> job_id; callback sends transport payload; adapter maps to envelope - Failure model: At-least-once delivery; verify signatures; be idempotent
# app/webhooks.py
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.post("/callbacks/vendor")
async def vendor_callback(req: Request):
sig = req.headers.get("X-Vendor-Signature")
body = await req.json()
if not verify_signature(sig, body):
raise HTTPException(status_code=401)
jsr = JobStatusResponse.model_validate({
**body,
"job_id": body.get("job_id")
})
env = adapter.to_envelope(jsr)
save_result(env) # Persist; notify UI
return {"ok": True}
11.5 Streaming (SSE / WebSocket / gRPC)¶
When to use: Incremental results (tokens, segments, progress).
- Contract: Open stream → receive
Envelopedeltas orProgressevents; finalEnvelopecloses stream - Failure model: Reconnect with resume tokens; dedupe by
event_id - Notes: Wrap stream events into mini-envelopes for consistent shape
# Example: async streaming client
async def consume_stream(uri: str):
async with websockets.connect(uri) as ws:
await ws.send(json.dumps({"auth": token, "params": {...}}))
async for msg in ws:
evt = json.loads(msg)
env = to_envelope(evt) # Mini-envelopes or progress events
handle_event(env)
11.6 Message Queue / Pub-Sub Integration¶
When to use: Decouple producers/consumers; batch pipelines; fan-out/fan-in.
- Contract: Publish
Params+ correlation IDs; consumers emitEnvelopes on result topic - Failure model: Retries with DLQ; ensure idempotency via keys
@mq.consume("transcribe.jobs")
def worker(msg):
params = FeatureParams.model_validate(msg["params"])
env = service.generate(Path(msg["input"]), params)
mq.publish("transcribe.results", env.model_dump())
11.7 Batch Pipelines (Orchestration)¶
When to use: Multi-stage processing: diarize → chunk → transcribe → summarize.
- Contract: Each stage returns an
Envelope; orchestrator composes; pass alongprovenanceandpolicy - Failure model: Saga-style compensation/skip policies; partial results acceptable per policy
12. Tracing, Metrics & Observability¶
12.1 Instrumentation Boundaries¶
| Layer | Responsibility | Tools |
|---|---|---|
| Domain | Trace spans, usage accounting | Tracer, UsageAccounting |
| Adapter | Metrics (latency, cost, throughput) | Prometheus, DataDog |
| Transport | HTTP metrics, retry counts | Built-in client metrics |
12.2 Provenance Integration¶
Always record in provenance.params:
- Policy and policy version used
- System version (git SHA/tag) for reproducibility
- Upstream IDs (job_id/request_id) and correlation IDs
- When combining stages, aggregate provenance: keep list of stage-level blocks
13. Policy Evolution Strategy¶
13.1 Principles¶
- Policy ≠ Config: Policies affect semantics; configs describe environment
- Additive first: Prefer adding new policy fields with sensible defaults over changing semantics
- Version the policy surface: Include
policy_versionin provenance - Document intent: Every policy field gets one-line rationale in docstrings
- Record used policy: Echo effective policy into
provenance.policyon every response
13.2 Versioning¶
- Add
policy_version: str = "1.0"to policy model - Bump minor (
1.1) for additive fields; bump major (2.0) for behavior changes - Store version in
provenance.params["policy_version"]
13.3 Migration Patterns¶
Expand → Contract:
- Ship new policy fields with defaults that preserve old behavior
- Gather metrics; then flip defaults in major version
Feature flags:
- Use boolean or enum policy switches for new behaviors (e.g.,
overlap_mode = "ignore|merge|separate")
Policy profiles:
- Provide named presets (e.g.,
"dharma_talks","interviews") that resolve to concrete policy objects
Deprecation path:
- Mark fields as deprecated in docstrings
- Log deprecation warnings when used
- Remove only after major version bump
13.4 Testing & Governance¶
- Golden tests for policy variants: snapshot envelope shape for each profile
- Contract tests to verify provenance always includes policy version and effective values
- Review checklist for policy PRs: rationale, defaults, migration notes, docstrings, provenance echo
14. Key Implementation Rules¶
- Use
Protocolfor interfaces; useABConly when enforcing init-time invariants or providing mixins - Every service has exactly one orchestrator class
- Every adapter implements exactly one protocol
- Mappers are pure and side-effect-free
- No dicts except at SDK boundaries (use TypedDict or Pydantic models)
- All literals replaced by settings/policy parameters
- Exceptions wrapped in typed error classes
- Config at init, params per call, response envelope always
- Domain logic never depends on infrastructure
- All side-effects isolated to adapters and transport clients
15. Reference Implementation Index¶
| Concept | GenAI Service | Diarization Service |
|---|---|---|
| Orchestrator | service.py (GenAIService) |
diarization_service.py |
| Provider Client | providers/openai_adapter.py |
adapters/pyannote_client.py |
| Mapper | OpenAIRequestMapper |
PyannoteMapper |
| Domain Models | models/domain.py |
models/audio_domain.py |
| Transport Models | models/transport.py |
models/audio_transport.py |
| Errors | models/errors.py |
models/errors.py |
| Transport Client | (direct SDK call) | transport/vendor_client.py |
| Job Poller | (N/A - sync) | transport/poller.py |
16. File/Module Scaffold (Cookie-Ready)¶
<feature>/
config/
settings.py # Application-wide settings
transport.py # Transport configuration
policy.py # Domain/service policies
domain/
models.py # Domain models (Result, Envelope)
service.py # Service protocol definition
transport/
client.py # HTTP client (upload, start, poll)
poller.py # Reusable polling logic
models.py # Transport models (JobStatusResponse)
adapters/ (or providers/)
<vendor>_client.py # Adapter implementing protocol
<vendor>_mapper.py # Bi-directional mapper
service/
<vendor>_service.py # Service orchestrator (composes ports)
app/
processor.py # Application façade
cli.py # CLI entry point
models/
errors.py # Typed error hierarchy
17. Design Review Questions¶
- Error Granularity: Should
ProviderErrorbe subclassed per provider or remain generic? - Tracing Interface: Centralized or service-local?
- Policy Loading: Should YAML policy files be lazy-loaded or pre-loaded at startup?
- Composite PatternCatalog: Should multiple pattern sources (file, legacy, DB) be supported concurrently?
- RetryPolicy Abstraction: Should retries be unified across audio + text services or service-specific?
- Mappers as Classes vs Functions: Currently class-based; function-based mappers could simplify small providers
- Streaming: Should streaming adapters implement an async interface (
ProviderClientAsync)? - Webhook Security: Standardize signature verification across all webhook-based integrations?
- Rate Limiting: Client-side vs server-enforced; per-provider or global?
- Partial Results: How to handle and represent partial results in envelopes for long-running jobs?
18. Glossary¶
| Term | Definition |
|---|---|
| Adapter | Component that implements a protocol; maps between domain and transport |
| Anti-corruption layer | Boundary that converts vendor shapes to domain shapes |
| Envelope | Universal response wrapper containing status, diagnostics, provenance, and optional result |
| Mapper | Pure component performing bi-directional type translation |
| Policy | Opinionated settings that change behavior but not capability |
| Port | Abstract interface defined as Protocol |
| Provenance | Metadata about how/when/with what the result was produced |
| Protocol | Structural type interface (no inheritance required) |
| Transport | Layer handling HTTP, polling, streaming, and retries |
19. Evolution & Versioning¶
- Version the envelope (
schema_version) and keep changes additive where possible - Be adapter-tolerant to unknown transport fields
- For breaking API shifts, add new adapter/service and migrate gradually
- Include
policy_versionandsystem_versionin provenance for reproducibility - Aggregate provenance when combining stages in pipelines
20. Closing Notes¶
This blueprint is modality-agnostic, policy-forward, and strongly-typed. It unifies patterns for:
- Synchronous request/response (GenAI)
- Asynchronous job polling (diarization, transcription)
- Webhooks, streaming, message queues
Core principles:
- Use the smallest viable integration pattern
- Keep envelopes consistent across modalities
- Let policy express domain decisions
- Maintain strong typing with no literals
- Persist provenance for debugging and reproducibility
Key success metrics:
- Can add new provider in <1 day
- Can swap providers without changing application code
- Can trace any result back to exact parameters/version used
- Can test services in isolation with mocked protocols
Copy the scaffolds and stubs from this blueprint to start new features quickly while maintaining architectural consistency.