Skip to content

Job

The Job class defines a transformation — what prompt to use, what output schema to expect, and how to configure batching and retries.

Quick reference

from smelt import Job

job = Job(
    prompt="Classify each company by industry sector",
    output_model=Classification,
    batch_size=10,
    concurrency=3,
    max_retries=3,
    shuffle=False,
    stop_on_exhaustion=True,
)

# Sync
result = job.run(model, data=rows)

# Async
result = await job.arun(model, data=rows)

# Single-row test
result = job.test(model, data=rows)
result = await job.atest(model, data=rows)

Parameters

Parameter Type Default Description
prompt str required Transformation instructions sent to the LLM
output_model Type[BaseModel] required Pydantic model defining the output schema per row
batch_size int 10 Number of input rows per LLM request
concurrency int 3 Maximum concurrent batch requests
max_retries int 3 Retry attempts per failed batch
shuffle bool False Randomize row order before batching (results are always in original order)
stop_on_exhaustion bool True Raise SmeltExhaustionError on failure vs collect errors

Validation

Job configuration is validated on creation. Invalid values raise SmeltConfigError:

# These all raise SmeltConfigError:
Job(prompt="", output_model=MyModel)          # Empty prompt
Job(prompt="ok", output_model=dict)           # Not a BaseModel
Job(prompt="ok", output_model=MyModel, batch_size=0)    # batch_size < 1
Job(prompt="ok", output_model=MyModel, concurrency=0)   # concurrency < 1
Job(prompt="ok", output_model=MyModel, max_retries=-1)  # max_retries < 0

Methods

run(model, *, data)

Run the transformation synchronously. Creates an event loop internally.

result = job.run(model, data=[
    {"name": "Apple", "desc": "Tech company"},
    {"name": "Stripe", "desc": "Payments"},
])
print(result.data)      # [Classification(...), Classification(...)]
print(result.success)   # True

Parameters:

Parameter Type Description
model Model LLM provider configuration
data list[dict[str, Any]] Input rows as dictionaries

Returns: SmeltResult[T] where T is your output_model

Raises:

  • RuntimeError if called from an async context (use arun instead)
  • SmeltConfigError if the model can't be initialized
  • SmeltExhaustionError if stop_on_exhaustion=True and a batch fails

arun(model, *, data)

Run the transformation asynchronously.

result = await job.arun(model, data=rows)

Same parameters and return type as run(). Use this in Jupyter notebooks or async applications.

test(model, *, data)

Run a single-row test synchronously. Useful for validating your setup before a full run.

result = job.test(model, data=rows)
print(result.data[0])  # Single output row

Behavior:

  • Only processes the first row of data
  • Ignores batch_size, concurrency, and shuffle settings
  • Uses batch_size=1, concurrency=1, shuffle=False internally
  • Respects max_retries and stop_on_exhaustion

Raises: SmeltConfigError if data is empty

atest(model, *, data)

Run a single-row test asynchronously.

result = await job.atest(model, data=rows)
print(result.data[0])

Same behavior as test(). Use this in Jupyter notebooks or async applications.

Usage patterns

Test → Run workflow

# 1. Test with one row to verify setup
test_result = job.test(model, data=data)
print(f"Test output: {test_result.data[0]}")
print(f"Test tokens: {test_result.metrics.input_tokens + test_result.metrics.output_tokens}")

# 2. If test looks good, run the full dataset
result = job.run(model, data=data)

Async Jupyter notebook

# In Jupyter, use await with async methods
result = await job.atest(model, data=data)   # Test
result = await job.arun(model, data=data)    # Full run

Error collection mode

job = Job(
    prompt="...",
    output_model=MyModel,
    stop_on_exhaustion=False,
)
result = job.run(model, data=data)
print(f"Succeeded: {len(result.data)}, Failed: {len(result.errors)}")

Source

Job(prompt, output_model, batch_size=10, concurrency=3, max_retries=3, shuffle=False, stop_on_exhaustion=True) dataclass

A smelt transformation job.

Defines what transformation to apply, how to validate output, and how to manage batching and retries.

Attributes:

Name Type Description
prompt str

The transformation instruction sent to the LLM.

output_model Type[BaseModel]

A Pydantic BaseModel subclass defining the expected output schema for each row.

batch_size int

Number of input rows per LLM request. Defaults to 10.

concurrency int

Maximum number of concurrent batch requests. Defaults to 3.

max_retries int

Maximum retry attempts per failed batch. Defaults to 3.

shuffle bool

If True, shuffles the input rows before splitting into batches. Row ordering in the final result is unaffected — results are always returned in original input order. Defaults to False.

stop_on_exhaustion bool

If True (default), raises SmeltExhaustionError when any batch exhausts its retries. If False, continues processing remaining batches and collects errors.

Examples:

>>> from pydantic import BaseModel
>>> class Classification(BaseModel):
...     category: str
...     confidence: float
>>> job = Job(
...     prompt="Classify each company by industry sector",
...     output_model=Classification,
...     batch_size=20,
...     concurrency=3,
... )

__post_init__()

Validate job configuration on initialization.

Raises:

Type Description
SmeltConfigError

If any configuration value is invalid.

Source code in src/smelt/job.py
def __post_init__(self) -> None:
    """Validate job configuration on initialization.

    Raises:
        SmeltConfigError: If any configuration value is invalid.
    """
    if not self.prompt or not self.prompt.strip():
        raise SmeltConfigError("Job prompt must be a non-empty string.")

    if not isinstance(self.output_model, type) or not issubclass(
        self.output_model, BaseModel
    ):
        raise SmeltConfigError(
            f"output_model must be a Pydantic BaseModel subclass, "
            f"got {type(self.output_model)!r}."
        )

    if self.batch_size < 1:
        raise SmeltConfigError(
            f"batch_size must be >= 1, got {self.batch_size}."
        )

    if self.concurrency < 1:
        raise SmeltConfigError(
            f"concurrency must be >= 1, got {self.concurrency}."
        )

    if self.max_retries < 0:
        raise SmeltConfigError(
            f"max_retries must be >= 0, got {self.max_retries}."
        )

    self._validated = True

atest(model, *, data) async

Run a single-row test to validate setup before a full run.

Takes only the first row from data, ignores shuffle, and uses batch_size=1, concurrency=1. Useful for quickly verifying that the prompt, model, and output schema work together.

Parameters:

Name Type Description Default
model Model

The :class:~smelt.model.Model configuration for the LLM.

required
data list[dict[str, Any]]

Input rows as a list of dictionaries. Only the first row is used.

required

Returns:

Name Type Description
A SmeltResult[Any]

class:~smelt.types.SmeltResult containing the single transformed row.

Raises:

Type Description
SmeltConfigError

If data is empty or the model cannot be initialized.

SmeltExhaustionError

If stop_on_exhaustion is True and the batch exhausts all retries.

Source code in src/smelt/job.py
async def atest(self, model: Model, *, data: list[dict[str, Any]]) -> SmeltResult[Any]:
    """Run a single-row test to validate setup before a full run.

    Takes only the first row from ``data``, ignores shuffle, and uses
    ``batch_size=1``, ``concurrency=1``. Useful for quickly verifying
    that the prompt, model, and output schema work together.

    Args:
        model: The :class:`~smelt.model.Model` configuration for the LLM.
        data: Input rows as a list of dictionaries. Only the first row is used.

    Returns:
        A :class:`~smelt.types.SmeltResult` containing the single transformed row.

    Raises:
        SmeltConfigError: If data is empty or the model cannot be initialized.
        SmeltExhaustionError: If ``stop_on_exhaustion`` is ``True`` and
            the batch exhausts all retries.
    """
    if not data:
        raise SmeltConfigError("data must contain at least one row.")

    chat_model = model.get_chat_model()

    return await execute_batches(
        chat_model=chat_model,
        user_prompt=self.prompt,
        output_model=self.output_model,
        data=data[:1],
        batch_size=1,
        concurrency=1,
        max_retries=self.max_retries,
        shuffle=False,
        stop_on_exhaustion=self.stop_on_exhaustion,
    )

test(model, *, data)

Run a single-row test synchronously.

Convenience wrapper around :meth:atest.

Parameters:

Name Type Description Default
model Model

The :class:~smelt.model.Model configuration for the LLM.

required
data list[dict[str, Any]]

Input rows as a list of dictionaries. Only the first row is used.

required

Returns:

Name Type Description
A SmeltResult[Any]

class:~smelt.types.SmeltResult containing the single transformed row.

Raises:

Type Description
RuntimeError

If called from within an already-running event loop. Use :meth:atest instead in async contexts.

Source code in src/smelt/job.py
def test(self, model: Model, *, data: list[dict[str, Any]]) -> SmeltResult[Any]:
    """Run a single-row test synchronously.

    Convenience wrapper around :meth:`atest`.

    Args:
        model: The :class:`~smelt.model.Model` configuration for the LLM.
        data: Input rows as a list of dictionaries. Only the first row is used.

    Returns:
        A :class:`~smelt.types.SmeltResult` containing the single transformed row.

    Raises:
        RuntimeError: If called from within an already-running event loop.
            Use :meth:`atest` instead in async contexts.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop is not None:
        raise RuntimeError(
            "job.test() cannot be called from an async context. "
            "Use 'await job.atest(...)' instead."
        )

    return asyncio.run(self.atest(model, data=data))

arun(model, *, data) async

Run the transformation job asynchronously.

Parameters:

Name Type Description Default
model Model

The :class:~smelt.model.Model configuration for the LLM.

required
data list[dict[str, Any]]

Input rows as a list of dictionaries.

required

Returns:

Name Type Description
A SmeltResult[Any]

class:~smelt.types.SmeltResult containing transformed data,

SmeltResult[Any]

any errors, and run metrics.

Raises:

Type Description
SmeltConfigError

If the model cannot be initialized.

SmeltExhaustionError

If stop_on_exhaustion is True and a batch exhausts all retries.

Source code in src/smelt/job.py
async def arun(self, model: Model, *, data: list[dict[str, Any]]) -> SmeltResult[Any]:
    """Run the transformation job asynchronously.

    Args:
        model: The :class:`~smelt.model.Model` configuration for the LLM.
        data: Input rows as a list of dictionaries.

    Returns:
        A :class:`~smelt.types.SmeltResult` containing transformed data,
        any errors, and run metrics.

    Raises:
        SmeltConfigError: If the model cannot be initialized.
        SmeltExhaustionError: If ``stop_on_exhaustion`` is ``True`` and
            a batch exhausts all retries.
    """
    chat_model = model.get_chat_model()

    return await execute_batches(
        chat_model=chat_model,
        user_prompt=self.prompt,
        output_model=self.output_model,
        data=data,
        batch_size=self.batch_size,
        concurrency=self.concurrency,
        max_retries=self.max_retries,
        shuffle=self.shuffle,
        stop_on_exhaustion=self.stop_on_exhaustion,
    )

run(model, *, data)

Run the transformation job synchronously.

Convenience wrapper around :meth:arun that creates an event loop if one is not already running.

Parameters:

Name Type Description Default
model Model

The :class:~smelt.model.Model configuration for the LLM.

required
data list[dict[str, Any]]

Input rows as a list of dictionaries.

required

Returns:

Name Type Description
A SmeltResult[Any]

class:~smelt.types.SmeltResult containing transformed data,

SmeltResult[Any]

any errors, and run metrics.

Raises:

Type Description
RuntimeError

If called from within an already-running event loop. Use :meth:arun instead in async contexts.

SmeltConfigError

If the model cannot be initialized.

SmeltExhaustionError

If stop_on_exhaustion is True and a batch exhausts all retries.

Source code in src/smelt/job.py
def run(self, model: Model, *, data: list[dict[str, Any]]) -> SmeltResult[Any]:
    """Run the transformation job synchronously.

    Convenience wrapper around :meth:`arun` that creates an event loop
    if one is not already running.

    Args:
        model: The :class:`~smelt.model.Model` configuration for the LLM.
        data: Input rows as a list of dictionaries.

    Returns:
        A :class:`~smelt.types.SmeltResult` containing transformed data,
        any errors, and run metrics.

    Raises:
        RuntimeError: If called from within an already-running event loop.
            Use :meth:`arun` instead in async contexts.
        SmeltConfigError: If the model cannot be initialized.
        SmeltExhaustionError: If ``stop_on_exhaustion`` is ``True`` and
            a batch exhausts all retries.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop is not None:
        raise RuntimeError(
            "job.run() cannot be called from an async context. "
            "Use 'await job.arun(...)' instead."
        )

    return asyncio.run(self.arun(model, data=data))