Architecture¶
This page explains how smelt works under the hood — from receiving your data to returning typed results.
Pipeline overview¶
flowchart LR
A["list[dict]"] --> B["Tag rows\nwith row_id"]
B --> S{"shuffle?"}
S -->|Yes| Sh["Shuffle\ntagged rows"]
S -->|No| C
Sh --> C["Split into\nbatches"]
C --> D["Concurrent\nLLM calls"]
D --> E["Validate\nschema + IDs"]
E --> F["Reorder by\nrow_id"]
F --> G["SmeltResult[T]"]
style A fill:#f9f,stroke:#333
style G fill:#9f9,stroke:#333
Every job.run() call goes through these steps:
- Tag — Each input dict gets a
row_id(0, 1, 2, ...) for tracking - Shuffle — If
shuffle=True, randomize row order before batching - Split — Divide tagged rows into chunks of
batch_size - Execute — Send batches concurrently to the LLM via
asyncio.Semaphore - Validate — Check each batch response: correct row count, matching IDs, valid schema
- Reorder — Sort all successful rows back into original input order by
row_id - Return — Package into
SmeltResult[T]with data, errors, and metrics
Step-by-step walkthrough¶
Step 1: Row tagging¶
Your input list[dict] is converted to internal _TaggedRow objects:
# Your input:
data = [
{"name": "Apple", "desc": "Tech company"},
{"name": "Stripe", "desc": "Payments"},
]
# Internally becomes:
tagged = [
_TaggedRow(row_id=0, data={"name": "Apple", "desc": "Tech company"}),
_TaggedRow(row_id=1, data={"name": "Stripe", "desc": "Payments"}),
]
The row_id is the row's position in your original list. This is how smelt guarantees results come back in the same order you sent them, even when batches complete out of order.
Step 2: Dynamic model creation¶
Smelt extends your Pydantic model to add row_id, then wraps it for structured output:
# Your model:
class Classification(BaseModel):
sector: str
is_public: bool
# Smelt creates internally:
class _SmeltClassification(Classification):
row_id: int # Injected for tracking
class _SmeltBatch(BaseModel):
rows: list[_SmeltClassification] # Wrapper for structured output
This is done dynamically via Pydantic's create_model(), so it works with any user model including those with custom validators.
classDiagram
class YourModel {
+str sector
+str sub_sector
+bool is_public
}
class _SmeltYourModel {
+int row_id
+str sector
+str sub_sector
+bool is_public
}
class _SmeltBatch {
+list~_SmeltYourModel~ rows
}
YourModel <|-- _SmeltYourModel : extends via create_model()
_SmeltYourModel --* _SmeltBatch : rows
note for _SmeltYourModel "Injected row_id for tracking.\nStripped before returning to user."
note for _SmeltBatch "Wrapper required by LangChain's\nwith_structured_output()."
Why a batch wrapper?
LangChain's with_structured_output() requires a single Pydantic model, not a list[Model]. The _SmeltBatch wrapper with a rows field provides this. The LLM returns a JSON object with {"rows": [...]}, which LangChain parses into the wrapper model.
Reserved field name
Your output model cannot have a field named row_id — smelt reserves it for internal tracking. If your model has a row_id field, smelt will raise SmeltConfigError at job creation time.
Step 3: Prompt construction¶
Smelt builds two messages for each batch:
System message (shared across all batches):
You are a structured data transformation assistant.
## Task
{your prompt here}
## Rules
- You will receive a JSON array of objects. Each object has a "row_id" field.
- For EACH input object, produce exactly one output object.
- Every output object MUST include the same "row_id" as the corresponding input object.
- Do NOT skip, merge, duplicate, or reorder rows.
- Return ALL rows — the count of output rows must equal the count of input rows.
## Output Schema
Each output row must conform to this schema:
- row_id (int, required)
- sector (str, required) — Primary industry sector
- sub_sector (str, required) — More specific sub-sector
- is_public (bool, required) — Whether the company is publicly traded
Return your response as a JSON object with a single "rows" key containing the array of output objects.
Human message (unique per batch):
[
{"row_id": 0, "name": "Apple", "desc": "Tech company"},
{"row_id": 1, "name": "Stripe", "desc": "Payments"}
]
Better prompts = better results
The system prompt includes your prompt string verbatim. Be specific about what you want. For example, instead of "classify companies", say "classify each company by its primary GICS industry sector and sub-sector. Determine if the company is publicly traded on a major stock exchange."
Step 4: Concurrent execution¶
Batches run concurrently using asyncio.Semaphore(concurrency):
sequenceDiagram
participant User
participant Job
participant BatchEngine
participant LLM
User->>Job: job.run(model, data)
Job->>BatchEngine: execute_batches()
Note over BatchEngine: Tag rows with row_id<br/>Shuffle if enabled<br/>Create internal Pydantic model<br/>Build system prompt<br/>Split into batches
par Batch 0
BatchEngine->>LLM: [system_msg, human_msg]
LLM-->>BatchEngine: structured response
and Batch 1
BatchEngine->>LLM: [system_msg, human_msg]
LLM-->>BatchEngine: structured response
and Batch N
BatchEngine->>LLM: [system_msg, human_msg]
LLM-->>BatchEngine: structured response
end
Note over BatchEngine: Validate row IDs per batch<br/>Sort by row_id<br/>Strip row_id field<br/>Aggregate metrics
BatchEngine-->>Job: SmeltResult[T]
Job-->>User: result
Each batch independently:
- Acquires a semaphore slot
- Calls
structured_model.ainvoke([system_msg, human_msg]) - Gets back
{raw: AIMessage, parsed: _SmeltBatch, parsing_error: None} - Extracts token usage from
raw.usage_metadata - Validates the parsed response (row IDs, count)
- On failure: retries with exponential backoff
- Releases the semaphore slot
Step 5: Response validation¶
For each batch response, smelt validates four invariants:
| Check | What it catches |
|---|---|
| Row count | LLM returned 3 rows but batch had 5 — it skipped rows |
| No duplicates | LLM returned row_id: 2 twice — it duplicated a row |
| No missing IDs | LLM returned row_id: 0, 2 but not 1 — it skipped a row |
| No unexpected IDs | LLM returned row_id: 99 which wasn't in the batch — it hallucinated |
If any check fails, smelt raises SmeltValidationError internally and retries the entire batch.
Step 6: Row ID tracking and reordering¶
flowchart LR
subgraph Input
direction TB
R0["row 0: {name: Apple}"]
R1["row 1: {name: Stripe}"]
R2["row 2: {name: Mayo}"]
end
subgraph Tagged
direction TB
T0["{row_id: 0, name: Apple}"]
T1["{row_id: 1, name: Stripe}"]
T2["{row_id: 2, name: Mayo}"]
end
subgraph "LLM Output (may be unordered)"
direction TB
L1["{row_id: 1, sector: Fintech}"]
L0["{row_id: 0, sector: Tech}"]
L2["{row_id: 2, sector: Health}"]
end
subgraph "Final (reordered)"
direction TB
F0["Classification(sector=Tech)"]
F1["Classification(sector=Fintech)"]
F2["Classification(sector=Health)"]
end
Input --> Tagged --> L1
L1 ~~~ L0
L0 ~~~ L2
L2 --> F0
style F0 fill:#9f9,stroke:#333
style F1 fill:#9f9,stroke:#333
style F2 fill:#9f9,stroke:#333
After all batches complete:
- Collect all validated rows from all successful batches
- Sort by
row_id(ascending) - Call
strip_row_id()on each — doesmodel_dump(exclude={"row_id"})thenmodel_validate()back to your original type - The result is
list[YourModel]in exact original input order
Step 7: Result assembly¶
All metrics are aggregated across batches:
SmeltResult(
data=[...], # Ordered list of your Pydantic models
errors=[...], # BatchError for each failed batch
metrics=SmeltMetrics(
total_rows=100,
successful_rows=95,
failed_rows=5,
total_batches=10,
successful_batches=9,
failed_batches=1,
total_retries=3,
input_tokens=15000,
output_tokens=8000,
wall_time_seconds=4.521,
),
)
Module structure¶
src/smelt/
├── __init__.py # Public API: Model, Job, SmeltResult, errors
├── model.py # Model — wraps LangChain's init_chat_model
├── job.py # Job — defines transformation + run/arun/test/atest
├── batch.py # Async batch engine: concurrency, retry, orchestration
├── prompt.py # System/human message construction
├── validation.py # Dynamic Pydantic model creation, row ID validation
├── types.py # SmeltResult, SmeltMetrics, BatchError, internal types
└── errors.py # SmeltError hierarchy
| Module | Responsibility |
|---|---|
model.py |
Thin wrapper around init_chat_model. Lazy initialization + caching. |
job.py |
User-facing entry point. Validates config, delegates to batch.py. |
batch.py |
Core engine. Orchestrates the full pipeline: tag → split → concurrent execute → validate → reorder → assemble. |
prompt.py |
Builds the system message (prompt + rules + schema) and human message (JSON data). |
validation.py |
Creates internal Pydantic models with row_id, validates batch responses, strips row_id from results. |
types.py |
Data containers: SmeltResult, SmeltMetrics, BatchError, and internal _TaggedRow / _BatchResult. |
errors.py |
Exception hierarchy: SmeltError → SmeltConfigError, SmeltValidationError, SmeltAPIError, SmeltExhaustionError. |
Key design decisions¶
| Decision | Choice | Rationale |
|---|---|---|
| Structured output | LangChain with_structured_output(include_raw=True) |
Gets parsed model + raw message for tokens + non-throwing parse errors |
| Row tracking | row_id field injected via create_model(__base__=user_model) |
Inherits all user validators, clean separation |
| Batch wrapper | Dynamic _SmeltBatch model with rows: list[InternalModel] |
Required because with_structured_output needs a single model, not a list |
| Concurrency | asyncio.Semaphore |
Standard, predictable, no worker pool overhead |
| Cancellation | Cooperative via asyncio.Event |
In-flight batches finish, pending ones skip. Safer than task cancellation |
| Sync wrapper | asyncio.run() with running-loop detection |
Clear error if called from async context |
| Validation | Pydantic v2 native | Standalone, no framework coupling |