Skip to content

Quickstart

Build your first transformation in 5 minutes.

1. Define your output schema

Use a standard Pydantic BaseModel to define the shape of each output row:

from pydantic import BaseModel, Field

class Classification(BaseModel):
    sector: str = Field(description="Primary industry sector")
    sub_sector: str = Field(description="More specific sub-sector")
    is_public: bool = Field(description="Whether the company is publicly traded")

Use Field descriptions

Adding Field(description=...) helps the LLM understand what each field means. Smelt includes these descriptions in the system prompt automatically. Better descriptions = better output quality.

2. Configure a model

from smelt import Model

model = Model(
    provider="openai",
    name="gpt-4.1-mini",
    # api_key="sk-..."  # Optional — falls back to OPENAI_API_KEY env var
)

You can pass additional parameters to the underlying LangChain model:

model = Model(
    provider="openai",
    name="gpt-4.1-mini",
    params={"temperature": 0},  # Deterministic output
)

3. Create a job

from smelt import Job

job = Job(
    prompt="Classify each company by its primary industry sector and sub-sector. "
           "Determine if the company is publicly traded.",
    output_model=Classification,
    batch_size=10,    # 10 rows per LLM call
    concurrency=3,    # Up to 3 concurrent calls
)

4. Test with a single row

Before running the full dataset, validate that your prompt and schema work together:

data = [
    {"name": "Apple", "description": "Consumer electronics and software"},
    {"name": "Stripe", "description": "Payment processing platform"},
    {"name": "Mayo Clinic", "description": "Nonprofit medical center"},
]

# Quick test — sends only the first row
result = job.test(model, data=data)
print(result.data[0])
# Classification(sector='Technology', sub_sector='Consumer Electronics', is_public=True)
print(result.metrics)
# SmeltMetrics(total_rows=1, successful_rows=1, ...)

Why test first?

test() / atest() sends only the first row with batch_size=1, concurrency=1, and shuffle=False. This catches prompt issues, schema mismatches, and auth errors before you burn through tokens on a full run.

5. Run the full transformation

result = job.run(model, data=data)

# Check results
print(f"Success: {result.success}")
print(f"Rows: {result.metrics.successful_rows}/{result.metrics.total_rows}")
print(f"Tokens: {result.metrics.input_tokens} in / {result.metrics.output_tokens} out")
print(f"Time: {result.metrics.wall_time_seconds:.2f}s")

for row in result.data:
    print(row)
# Classification(sector='Technology', sub_sector='Consumer Electronics', is_public=True)
# Classification(sector='Financial Technology', sub_sector='Payments', is_public=False)
# Classification(sector='Healthcare', sub_sector='Hospitals', is_public=False)

Key properties of the result:

Property Type Description
result.data list[Classification] Transformed rows, in original input order
result.errors list[BatchError] Failed batches (empty if all succeeded)
result.metrics SmeltMetrics Token usage, timing, retry counts
result.success bool True if zero errors

6. Handle errors

By default, smelt raises SmeltExhaustionError when a batch fails after all retries:

from smelt.errors import SmeltExhaustionError

try:
    result = job.run(model, data=data)
except SmeltExhaustionError as e:
    print(f"Some batches failed after retries")
    print(f"Partial results: {len(e.partial_result.data)} rows")
    print(f"Errors: {len(e.partial_result.errors)} batches")

Or collect errors without raising:

job = Job(
    prompt="...",
    output_model=Classification,
    stop_on_exhaustion=False,  # Don't raise, collect errors instead
)

result = job.run(model, data=data)
if not result.success:
    for err in result.errors:
        print(f"Batch {err.batch_index} failed: {err.message}")

Async usage

In Jupyter notebooks or async applications, use the async methods:

# Async test
result = await job.atest(model, data=data)

# Async run
result = await job.arun(model, data=data)

Sync vs async

job.run() and job.test() cannot be called from within an async event loop (e.g. Jupyter notebooks). Use await job.arun() and await job.atest() in those contexts. Smelt will raise a RuntimeError with a helpful message if you mix them up.

Complete example

Here's a full working script you can copy-paste:

"""Classify companies by industry sector using smelt."""
import os
from pydantic import BaseModel, Field
from smelt import Model, Job

# 1. Define output schema
class Classification(BaseModel):
    sector: str = Field(description="Primary industry sector")
    sub_sector: str = Field(description="More specific sub-sector")
    is_public: bool = Field(description="Whether the company is publicly traded")

# 2. Configure model
model = Model(provider="openai", name="gpt-4.1-mini")

# 3. Prepare data
companies = [
    {"name": "Apple Inc.", "description": "Consumer electronics, software, and services"},
    {"name": "JPMorgan Chase", "description": "Global financial services and investment banking"},
    {"name": "Pfizer", "description": "Pharmaceutical company developing medicines and vaccines"},
    {"name": "Tesla", "description": "Electric vehicles and clean energy products"},
    {"name": "Spotify", "description": "Digital music and podcast streaming platform"},
]

# 4. Create and test job
job = Job(
    prompt="Classify each company by its primary industry sector and sub-sector. "
           "Determine if the company is publicly traded.",
    output_model=Classification,
    batch_size=5,
    concurrency=1,
)

# Quick single-row test
test_result = job.test(model, data=companies)
print(f"Test: {test_result.data[0]}")
print(f"Tokens used: {test_result.metrics.input_tokens + test_result.metrics.output_tokens}")
print()

# 5. Full run
result = job.run(model, data=companies)

# 6. Print results
print(f"Success: {result.success}")
print(f"Rows: {result.metrics.successful_rows}/{result.metrics.total_rows}")
print(f"Time: {result.metrics.wall_time_seconds:.2f}s")
print()

for company, classification in zip(companies, result.data):
    print(f"{company['name']:20s}{classification.sector} / {classification.sub_sector} (public: {classification.is_public})")

Next steps