Missing Runs in LangSmith Experiment View with Concurrent Evaluations

Last updated: December 8, 2025

Issue Description

When running LangSmith evaluations with concurrent execution (e.g., using pytest with xdist, multiprocessing, or threading), the experiment list page shows the correct total number of runs, but the experiment details view displays only a subset of runs. The missing run data is still available programmatically and can be viewed from the Examples → Linked Runs page, but traces are not accessible from the experiment view.

Root Cause

When using multiprocessing or concurrent execution patterns with aevaluate, two issues can cause incomplete data in the experiment view:

  1. Async generator not consumedaevaluate returns an async generator that must be fully consumed (e.g., async for _ in results) for evaluations to actually execute

  2. Background tracing queue not flushed: Worker processes may terminate before the background tracing thread finishes uploading trace data, resulting in lost traces

Recommended Solution

Use aevaluate's built-in concurrency via the max_concurrency parameter instead of external multiprocessing:

import asyncio
from langsmith import aevaluate

async def my_task(inputs: dict) -> dict:
    # Your task logic
    return {"result": inputs["name"]}

def my_evaluator(run, example) -> dict:
    return {"key": "correctness", "score": 1.0}

async def main():
    # Collect all examples upfront
    examples = [...]  # Your list of examples
    
    # Single aevaluate call with built-in concurrency
    results = await aevaluate(
        my_task,
        data=examples,
        evaluators=[my_evaluator],
        experiment_prefix="my-experiment",
        max_concurrency=5,  # Run 5 examples concurrently
    )
    
    # Consume the async generator
    async for _ in results:
        pass

asyncio.run(main())

Using Multiprocessing

If you must use multiprocessing, ensure proper synchronization:

async def run_single_example(example):
    results = await aevaluate(
        my_task,
        data=[example],
        evaluators=[my_evaluator],
        experiment=project_id,
    )
    
    # 1. Consume the async generator
    async for _ in results:
        pass
    
    # 2. Flush the tracing queue before process terminates
    client = Client()
    if client.tracing_queue:
        client.tracing_queue.join()  # Block until queue is empty
    
    # 3. Add buffer for network requests to complete
    await asyncio.sleep(2)