Job¶
The Job class defines a transformation — what prompt to use, what output schema to expect, and how to configure batching and retries.
Quick reference¶
from smelt import Job
job = Job(
prompt="Classify each company by industry sector",
output_model=Classification,
batch_size=10,
concurrency=3,
max_retries=3,
shuffle=False,
stop_on_exhaustion=True,
)
# Sync
result = job.run(model, data=rows)
# Async
result = await job.arun(model, data=rows)
# Single-row test
result = job.test(model, data=rows)
result = await job.atest(model, data=rows)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
prompt |
str |
required | Transformation instructions sent to the LLM |
output_model |
Type[BaseModel] |
required | Pydantic model defining the output schema per row |
batch_size |
int |
10 |
Number of input rows per LLM request |
concurrency |
int |
3 |
Maximum concurrent batch requests |
max_retries |
int |
3 |
Retry attempts per failed batch |
shuffle |
bool |
False |
Randomize row order before batching (results are always in original order) |
stop_on_exhaustion |
bool |
True |
Raise SmeltExhaustionError on failure vs collect errors |
Validation¶
Job configuration is validated on creation. Invalid values raise SmeltConfigError:
# These all raise SmeltConfigError:
Job(prompt="", output_model=MyModel) # Empty prompt
Job(prompt="ok", output_model=dict) # Not a BaseModel
Job(prompt="ok", output_model=MyModel, batch_size=0) # batch_size < 1
Job(prompt="ok", output_model=MyModel, concurrency=0) # concurrency < 1
Job(prompt="ok", output_model=MyModel, max_retries=-1) # max_retries < 0
Methods¶
run(model, *, data)¶
Run the transformation synchronously. Creates an event loop internally.
result = job.run(model, data=[
{"name": "Apple", "desc": "Tech company"},
{"name": "Stripe", "desc": "Payments"},
])
print(result.data) # [Classification(...), Classification(...)]
print(result.success) # True
Parameters:
| Parameter | Type | Description |
|---|---|---|
model |
Model |
LLM provider configuration |
data |
list[dict[str, Any]] |
Input rows as dictionaries |
Returns: SmeltResult[T] where T is your output_model
Raises:
RuntimeErrorif called from an async context (usearuninstead)SmeltConfigErrorif the model can't be initializedSmeltExhaustionErrorifstop_on_exhaustion=Trueand a batch fails
arun(model, *, data)¶
Run the transformation asynchronously.
Same parameters and return type as run(). Use this in Jupyter notebooks or async applications.
test(model, *, data)¶
Run a single-row test synchronously. Useful for validating your setup before a full run.
Behavior:
- Only processes the first row of
data - Ignores
batch_size,concurrency, andshufflesettings - Uses
batch_size=1,concurrency=1,shuffle=Falseinternally - Respects
max_retriesandstop_on_exhaustion
Raises: SmeltConfigError if data is empty
atest(model, *, data)¶
Run a single-row test asynchronously.
Same behavior as test(). Use this in Jupyter notebooks or async applications.
Usage patterns¶
Test → Run workflow¶
# 1. Test with one row to verify setup
test_result = job.test(model, data=data)
print(f"Test output: {test_result.data[0]}")
print(f"Test tokens: {test_result.metrics.input_tokens + test_result.metrics.output_tokens}")
# 2. If test looks good, run the full dataset
result = job.run(model, data=data)
Async Jupyter notebook¶
# In Jupyter, use await with async methods
result = await job.atest(model, data=data) # Test
result = await job.arun(model, data=data) # Full run
Error collection mode¶
job = Job(
prompt="...",
output_model=MyModel,
stop_on_exhaustion=False,
)
result = job.run(model, data=data)
print(f"Succeeded: {len(result.data)}, Failed: {len(result.errors)}")
Source¶
Job(prompt, output_model, batch_size=10, concurrency=3, max_retries=3, shuffle=False, stop_on_exhaustion=True)
dataclass
¶
A smelt transformation job.
Defines what transformation to apply, how to validate output, and how to manage batching and retries.
Attributes:
| Name | Type | Description |
|---|---|---|
prompt |
str
|
The transformation instruction sent to the LLM. |
output_model |
Type[BaseModel]
|
A Pydantic |
batch_size |
int
|
Number of input rows per LLM request. Defaults to 10. |
concurrency |
int
|
Maximum number of concurrent batch requests. Defaults to 3. |
max_retries |
int
|
Maximum retry attempts per failed batch. Defaults to 3. |
shuffle |
bool
|
If |
stop_on_exhaustion |
bool
|
If |
Examples:
>>> from pydantic import BaseModel
>>> class Classification(BaseModel):
... category: str
... confidence: float
>>> job = Job(
... prompt="Classify each company by industry sector",
... output_model=Classification,
... batch_size=20,
... concurrency=3,
... )
__post_init__()
¶
Validate job configuration on initialization.
Raises:
| Type | Description |
|---|---|
SmeltConfigError
|
If any configuration value is invalid. |
Source code in src/smelt/job.py
atest(model, *, data)
async
¶
Run a single-row test to validate setup before a full run.
Takes only the first row from data, ignores shuffle, and uses
batch_size=1, concurrency=1. Useful for quickly verifying
that the prompt, model, and output schema work together.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Model
|
The :class: |
required |
data
|
list[dict[str, Any]]
|
Input rows as a list of dictionaries. Only the first row is used. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
SmeltResult[Any]
|
class: |
Raises:
| Type | Description |
|---|---|
SmeltConfigError
|
If data is empty or the model cannot be initialized. |
SmeltExhaustionError
|
If |
Source code in src/smelt/job.py
test(model, *, data)
¶
Run a single-row test synchronously.
Convenience wrapper around :meth:atest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Model
|
The :class: |
required |
data
|
list[dict[str, Any]]
|
Input rows as a list of dictionaries. Only the first row is used. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
SmeltResult[Any]
|
class: |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called from within an already-running event loop.
Use :meth: |
Source code in src/smelt/job.py
arun(model, *, data)
async
¶
Run the transformation job asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Model
|
The :class: |
required |
data
|
list[dict[str, Any]]
|
Input rows as a list of dictionaries. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
SmeltResult[Any]
|
class: |
SmeltResult[Any]
|
any errors, and run metrics. |
Raises:
| Type | Description |
|---|---|
SmeltConfigError
|
If the model cannot be initialized. |
SmeltExhaustionError
|
If |
Source code in src/smelt/job.py
run(model, *, data)
¶
Run the transformation job synchronously.
Convenience wrapper around :meth:arun that creates an event loop
if one is not already running.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Model
|
The :class: |
required |
data
|
list[dict[str, Any]]
|
Input rows as a list of dictionaries. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
SmeltResult[Any]
|
class: |
SmeltResult[Any]
|
any errors, and run metrics. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called from within an already-running event loop.
Use :meth: |
SmeltConfigError
|
If the model cannot be initialized. |
SmeltExhaustionError
|
If |