Skip to content

Cookbook: Advanced Patterns

Advanced usage patterns and techniques.

Chaining transformations

Run multiple jobs in sequence, feeding the output of one into the next:

from pydantic import BaseModel, Field
from smelt import Model, Job

# Step 1: Extract entities
class ExtractedEntity(BaseModel):
    company_name: str
    industry: str
    sentiment: str

# Step 2: Enrich with additional data
class EnrichedEntity(BaseModel):
    market_position: str = Field(description="Market position: leader, challenger, niche, emerging")
    growth_potential: str = Field(description="Growth potential: high, medium, low")
    recommendation: str = Field(description="Brief investment recommendation")

model = Model(provider="openai", name="gpt-4.1-mini")

# First pass: extract
extract_job = Job(
    prompt="Extract the company name, industry, and overall sentiment from each news article.",
    output_model=ExtractedEntity,
    batch_size=10,
)

articles = [
    {"headline": "Apple reports record Q4 revenue driven by iPhone sales", "body": "..."},
    {"headline": "Stripe expands to 10 new markets in Southeast Asia", "body": "..."},
]

extract_result = extract_job.run(model, data=articles)

# Convert Pydantic models back to dicts for the second pass
enrichment_data = [row.model_dump() for row in extract_result.data]

# Second pass: enrich
enrich_job = Job(
    prompt="Based on the extracted entity information, assess the company's market position, "
           "growth potential, and provide a brief recommendation.",
    output_model=EnrichedEntity,
    batch_size=10,
)

enrich_result = enrich_job.run(model, data=enrichment_data)

# Combine results
for extracted, enriched in zip(extract_result.data, enrich_result.data):
    print(f"{extracted.company_name}: {enriched.market_position}{enriched.recommendation}")

Comparing providers

Run the same job across multiple providers to compare quality:

from smelt import Model, Job

models = {
    "gpt-4.1-mini": Model(provider="openai", name="gpt-4.1-mini"),
    "claude-sonnet": Model(provider="anthropic", name="claude-sonnet-4-6"),
    "gemini-flash": Model(provider="google_genai", name="gemini-3-flash-preview"),
}

job = Job(
    prompt="Classify each company by industry sector",
    output_model=Classification,
    batch_size=10,
)

for name, model in models.items():
    result = job.run(model, data=companies)
    print(f"\n{name}: {result.metrics.wall_time_seconds:.2f}s, "
          f"{result.metrics.input_tokens + result.metrics.output_tokens} tokens")
    for row in result.data:
        print(f"  {row}")

Temperature comparison

Compare deterministic vs creative outputs:

for temp in [0, 0.5, 1.0]:
    model = Model(
        provider="openai",
        name="gpt-4.1-mini",
        params={"temperature": temp},
    )
    result = job.run(model, data=data[:3])
    print(f"\ntemperature={temp}:")
    for row in result.data:
        print(f"  {row}")

Retrying failed rows

When some rows fail, retry just those:

job = Job(
    prompt="...",
    output_model=MyModel,
    batch_size=10,
    stop_on_exhaustion=False,  # Collect errors, don't raise
)

result = job.run(model, data=data)

if not result.success:
    # Get indices of failed rows
    failed_ids = set()
    for err in result.errors:
        failed_ids.update(err.row_ids)

    print(f"{len(failed_ids)} rows failed, retrying with smaller batches...")

    # Retry with smaller batch_size and more retries
    retry_job = Job(
        prompt="...",
        output_model=MyModel,
        batch_size=1,       # One row at a time
        max_retries=5,      # More retries
        stop_on_exhaustion=False,
    )

    failed_data = [data[i] for i in sorted(failed_ids)]
    retry_result = retry_job.run(model, data=failed_data)

    # Merge results
    all_data = list(result.data)  # Successful rows from first run
    # Note: you'd need to map retry results back to original indices
    print(f"Recovered {len(retry_result.data)} of {len(failed_ids)} failed rows")

Processing large datasets

For very large datasets, process in chunks to manage memory and provide progress:

import time

CHUNK_SIZE = 500  # Process 500 rows at a time

job = Job(
    prompt="...",
    output_model=MyModel,
    batch_size=20,
    concurrency=5,
    stop_on_exhaustion=False,
)

all_results = []
all_errors = []
total_tokens = 0

for i in range(0, len(data), CHUNK_SIZE):
    chunk = data[i:i + CHUNK_SIZE]
    print(f"Processing rows {i}{i + len(chunk) - 1}...")

    result = job.run(model, data=chunk)

    all_results.extend(result.data)
    all_errors.extend(result.errors)
    total_tokens += result.metrics.input_tokens + result.metrics.output_tokens

    print(f"  Done: {result.metrics.successful_rows}/{result.metrics.total_rows} rows, "
          f"{result.metrics.wall_time_seconds:.1f}s")

print(f"\nTotal: {len(all_results)} rows, {total_tokens:,} tokens, {len(all_errors)} errors")

Using with pandas

import pandas as pd
from smelt import Model, Job

# Load data
df = pd.read_csv("companies.csv")

# Convert DataFrame to list of dicts
data = df.to_dict(orient="records")

# Run transformation
job = Job(prompt="...", output_model=Classification, batch_size=10)
result = job.run(model, data=data)

# Convert results back to DataFrame
result_df = pd.DataFrame([row.model_dump() for row in result.data])

# Combine with original data
combined = pd.concat([df, result_df], axis=1)
print(combined.head())

Using with CSV files

import csv

def load_csv(path: str) -> list[dict[str, str]]:
    with open(path, newline="", encoding="utf-8") as f:
        return list(csv.DictReader(f))

def save_results(path: str, data: list, result_data: list):
    """Save original data + results to CSV."""
    rows = []
    for original, result in zip(data, result_data):
        row = {**original, **result.model_dump()}
        rows.append(row)

    with open(path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
        writer.writeheader()
        writer.writerows(rows)

# Usage
data = load_csv("input.csv")
result = job.run(model, data=data)
save_results("output.csv", data, result.data)

Custom Pydantic validators

Your output model's validators run on every row. Use them for business logic:

from pydantic import BaseModel, Field, field_validator, model_validator

class PricingAnalysis(BaseModel):
    suggested_price: float = Field(description="Suggested retail price in USD")
    margin_percent: float = Field(description="Expected profit margin as a percentage (0-100)")
    price_tier: str = Field(description="Price tier: budget, mid-range, premium, luxury")

    @field_validator("margin_percent")
    @classmethod
    def margin_in_range(cls, v: float) -> float:
        if not (0 <= v <= 100):
            raise ValueError(f"margin_percent must be 0-100, got {v}")
        return v

    @field_validator("suggested_price")
    @classmethod
    def price_positive(cls, v: float) -> float:
        if v <= 0:
            raise ValueError(f"suggested_price must be positive, got {v}")
        return v

    @model_validator(mode="after")
    def tier_matches_price(self) -> "PricingAnalysis":
        if self.price_tier == "budget" and self.suggested_price > 50:
            raise ValueError(f"Budget tier but price is ${self.suggested_price}")
        return self

Validators trigger retries

If a Pydantic validator raises ValueError, smelt treats it as a validation error and retries the entire batch. This means your custom validators act as quality gates — the LLM keeps trying until it produces output that passes all your validators.

Async with asyncio.gather

Run multiple independent jobs concurrently:

import asyncio

async def main():
    model = Model(provider="openai", name="gpt-4.1-mini")

    classify_job = Job(prompt="Classify by industry", output_model=Classification, batch_size=10)
    sentiment_job = Job(prompt="Analyze sentiment", output_model=Sentiment, batch_size=10)
    summary_job = Job(prompt="Summarize each company", output_model=Summary, batch_size=5)

    # Run all three jobs concurrently
    classify_result, sentiment_result, summary_result = await asyncio.gather(
        classify_job.arun(model, data=companies),
        sentiment_job.arun(model, data=reviews),
        summary_job.arun(model, data=companies),
    )

    print(f"Classification: {len(classify_result.data)} rows")
    print(f"Sentiment: {len(sentiment_result.data)} rows")
    print(f"Summaries: {len(summary_result.data)} rows")

asyncio.run(main())