lubu labs
Back to Blog
LangChain

Building a Real-World Pre-Production LangGraph Agent with LangChain Agent Skills

Building a pre-production research agent in 6 days using langchain-agent-skills—production-ready patterns for state, errors, deployment.

Simon Budziak
Simon Budziak
CTO
Building a Real-World Pre-Production LangGraph Agent with LangChain Agent Skills

We needed to build a pre-production research agent for a client - one that could search the web, synthesize findings across multiple sources, fact-check claims, and route to humans for approval before publishing reports.

Traditional approach (based on our internal experience on similar projects): 2-3 weeks of architecture debates, custom state management, retry logic from scratch, and a lot of "let me check the LangGraph docs".

With langchain-agent-skills: 6 days from requirements to pre-production deployment in this specific client project, with Claude Code and Codex automatically loading the right patterns at each step.

This isn't theory. This post walks through exactly how we built this agent in one pre-production engagement - which skills triggered when, what code they provided, and how they turned "what should I do?" moments into "here's the production-ready pattern" guidance.


The Problem

Our client needed an automated research assistant that could:

Functional requirements:

  • Search the web for relevant sources (Tavily API integration)
  • Synthesize findings from 10+ sources into coherent summaries
  • Fact-check claims against gathered evidence
  • Route to human reviewers for approval before publishing
  • Generate final reports with citations

Architecture requirements:

  • Parallel execution for worker nodes (search, summarize, fact-check must run simultaneously)
  • Central orchestration to manage workflow state
  • State aggregation across distributed workers
  • Clean separation of concerns (no monolithic node functions)

Reliability requirements:

  • Retry transient failures (search API timeouts, rate limits)
  • Never retry side effects (publishing the same report twice = bad)
  • Human-in-the-loop approval (interrupt execution, wait for decision, resume)
  • Graceful degradation (partial results if one worker fails)

Observability requirements:

  • Trace every step through LangSmith
  • Measure quality metrics (source count, fact-check accuracy)
  • Pre-production debugging capability (trace analysis, error pattern detection)
  • Performance monitoring (latency, retry counts)

Why "obvious" approaches don't work:

  • Sequential processing is too slow - Processing workers one after another adds 20-30 seconds of latency. Parallel execution is mandatory.
  • Custom state management is fragile - Merging results from 3 concurrent workers without battle-tested reducer patterns leads to race conditions and lost data.
  • Retry everything = dangerous - Retrying the search API timeout? Safe. Retrying the "publish report" action? Disaster (duplicate publications, user confusion).
  • No visibility = can't debug - Pre-production failures without traces mean guessing what went wrong based on user reports.

This is where langchain-agent-skills came in. Each problem domain (project structure, architecture patterns, state reducers, error handling, testing, debugging) maps to a specific skill that provides production-ready patterns.


Step 1: Project Initialization (langgraph-project-setup)

The challenge: LangGraph projects have a specific structure that enables deployment to LangSmith Cloud. Get the structure wrong (missing langgraph.json, incorrect graph path, invalid dependencies) and you'll waste hours debugging deployment failures.

What the skill provided:

When we told Claude Code "initialize a LangGraph project for a research agent," it automatically triggered the langgraph-project-setup skill. The skill uses progressive disclosure - it loads metadata first to understand the intent, then provides the full initialization script only when needed.

bash
# Skill-provided initialization command
uv run scripts/init_langgraph_project.py research-agent --pattern multiagent

Project structure created:

The skill scaffolded a production-ready structure with proper separation: src/research_agent/ for implementation (graph, state, nodes), tests/ for unit and integration tests, and critically - langgraph.json pointing to the compiled graph for deployment.

Key config file generated:

json
{
  "graphs": {"research_agent": "./src/research_agent/graph.py:app"},
  "env": {"OPENAI_API_KEY": "", "TAVILY_API_KEY": "", "LANGSMITH_API_KEY": ""}
}

Why this matters:

The skill eliminated "what's the right structure?" research. No reading deployment docs, no trial-and-error with graph paths, no missing config files. Production-ready scaffolding in 5 minutes.


Step 2: Architecture Selection (langgraph-agent-patterns)

The challenge: LangGraph supports multiple agent patterns (supervisor, router, orchestrator-worker, handoffs). Choosing the wrong pattern means refactoring the entire graph structure later. We needed parallel worker execution with central coordination.

What the skill provided:

The langgraph-agent-patterns skill provided a decision tree based on our requirements:

  • Requirements: 3 workers (search, summarize, fact-check) must run in parallel
  • Coordinator needed: Must manage flow (start → parallel workers → aggregate → human approval)
  • State aggregation: Need to merge results from multiple workers into single state

The skill recommended orchestrator-worker pattern for this use case because we needed explicit parallel fan-out/fan-in with central coordination.

Architectural implementation:

The skill provided the graph structure with the orchestrator-worker pattern. Key elements:

  • Workers run in parallel via Command(goto=[...]) with multiple targets
  • Workers return to orchestrator for state aggregation before proceeding
  • Orchestrator routing lives in one place (via Command) to avoid mixed control-flow paths
python
# src/research_agent/graph.py
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command
 
def orchestrator_node(
    state: ResearchState,
) -> Command[Literal["search_worker", "summarize_worker", "fact_check_worker", "approval"]]:
    # First pass: dispatch to all workers in parallel
    if not state.get("workers_started"):
        return Command(
            goto=["search_worker", "summarize_worker", "fact_check_worker"],
            update={"workers_started": True}
        )
    # Workers done, proceed to approval
    return Command(goto="approval")
 
def build_graph() -> StateGraph:
    builder = StateGraph(ResearchState)
 
    builder.add_node("orchestrator", orchestrator_node)
    builder.add_node("search_worker", search_worker)
    builder.add_node("summarize_worker", summarize_worker)
    builder.add_node("fact_check_worker", fact_check_worker)
    builder.add_node("approval", approval_node)
 
    builder.add_edge(START, "orchestrator")
    builder.add_edge(["search_worker", "summarize_worker", "fact_check_worker"], "orchestrator")
    builder.add_edge("approval", END)
 
    return builder.compile(checkpointer=PostgresSaver.from_conn_string(DB_URL))

Orchestrator dispatches workers in parallel:

python
from typing import Literal
from langgraph.types import Command
 
def orchestrator_node(
    state: ResearchState,
) -> Command[Literal["search_worker", "summarize_worker", "fact_check_worker", "approval"]]:
    # First pass: dispatch to all workers in parallel
    if not state.get("workers_started"):
        return Command(
            goto=["search_worker", "summarize_worker", "fact_check_worker"],
            update={"workers_started": True}
        )
    # Workers done, proceed to approval
    return Command(goto="approval")

Key architectural insight:

The skill provided both the pattern decision tree AND complete code structure. No guessing about conditional edges vs static edges. No trial-and-error with Command syntax for parallel dispatch.

The orchestrator pattern enables:

  • Parallel execution via Command(goto=[...]) with multiple targets
  • State aggregation via automatic reducer merging before approval
  • Clean separation between coordination logic (orchestrator) and business logic (workers)

Step 3: State Schema Design (langgraph-state-management)

The challenge: LangGraph state schemas aren't just type definitions - they include reducer functions that control how concurrent updates merge. Wrong reducer = race conditions, lost data, or duplicate entries.

What the skill provided:

The langgraph-state-management skill provided pre-built schemas for common use cases (chat, research, workflow, RAG). For research workflows, it recommended:

  • Base class: MessagesState (handles conversation history)
  • Aggregation lists: operator.add reducer for worker results
  • Metadata tracking: Simple fields for workflow state

State implementation with reducers:

python
# src/research_agent/state.py
from typing import Annotated, Literal
from typing_extensions import TypedDict
import operator
from langgraph.graph import MessagesState
 
class SearchResult(TypedDict):
    url: str
    title: str
    snippet: str
    relevance_score: float
 
class FactCheck(TypedDict):
    claim: str
    verdict: Literal["supported", "refuted", "unverified"]
    sources: list[str]
    confidence: float
 
class ResearchState(MessagesState):
    query: str
    max_sources: int
 
    # Worker outputs aggregated via operator.add
    search_results: Annotated[list[SearchResult], operator.add]
    summaries: Annotated[list[str], operator.add]
    fact_checks: Annotated[list[FactCheck], operator.add]
 
    # Workflow state (last-write-wins)
    workers_started: bool
    final_report: str | None
    human_approved: bool | None
    revision_notes: str | None
    published: bool
    idempotent_skip: bool
 
    # Tracing
    thread_id: str
    turn_id: int

Why operator.add matters - demonstration:

python
# Scenario: 3 workers running in parallel
 
# Worker 1 (search_worker) returns:
{"search_results": [result1, result2, result3]}
 
# Worker 2 (search_worker - second batch) returns:
{"search_results": [result4, result5]}
 
# Worker 3 (search_worker - third batch) returns:
{"search_results": [result6]}
 
# LangGraph automatically merges via operator.add reducer:
# Final state.search_results = [result1, result2, result3, result4, result5, result6]
 
# No manual merging code needed
# Note: ordering from parallel branches may vary by superstep

Key insight from the skill:

The skill explained exactly when to use each reducer pattern:

  • operator.add for lists where duplicates are acceptable (summaries, messages)
  • Custom reducers for deduplication (unique URLs, unique entity names)
  • No reducer (default last-write-wins) for simple fields (booleans, strings, counters)

This saved hours of debugging race conditions and understanding LangGraph's reducer semantics.


Step 4: Error Handling (langgraph-error-handling)

The challenge: Not all errors should be retried. Retrying a search API timeout is safe. Retrying "publish report" after success creates duplicate publications. We needed error classification and selective retry policies.

What the skill provided:

The langgraph-error-handling skill categorizes errors into three types:

  1. Transient errors (429, timeouts, 5xx) → RetryPolicy with exponential backoff
  2. Recoverable errors (bad tool args, formatting issues) → LLM recovery loop with Command
  3. User-fixable errors (missing info, approval needed) → interrupt() + resume pattern

Retry policy for transient failures:

The skill provided the RetryPolicy pattern with exponential backoff for the search worker. Timeouts and network errors retry automatically, but auth failures surface immediately.

python
from langgraph.types import RetryPolicy
 
search_retry_policy = RetryPolicy(
    max_attempts=3,
    initial_interval=1.0,
    backoff_factor=2.0,  # 1s, 2s, 4s
    retry_on=[TimeoutError, aiohttp.ClientError],
)
 
async def search_worker(state: ResearchState) -> Dict[str, Any]:
    results = await tavily_api.search(state["query"], max_results=state["max_sources"])
    return {
        "search_results": [
            {
                "url": r["url"],
                "title": r["title"],
                "snippet": r.get("content", ""),
                "relevance_score": r.get("score", 0.0),
            }
            for r in results
        ]
    }
 
# Attach retry policy when adding node
# builder.add_node("search_worker", search_worker, retry_policy=search_retry_policy)

Human-in-the-loop with interrupt():

For approval workflows, the skill provided the interrupt() pattern. This pauses execution until a human resumes the run with Command(resume=...) using the same thread_id.

python
from typing import Literal
from langgraph.types import interrupt, Command
 
def approval_node(state: ResearchState) -> Dict[str, Any] | Command[Literal["revise"]]:
    human_decision = interrupt({
        "type": "approval_request",
        "report": state["final_report"],
        "metadata": {"query": state["query"], "thread_id": state["thread_id"]}
    })
 
    if human_decision.get("action") == "approved":
        return {"human_approved": True}
    elif human_decision.get("action") == "revise":
        return Command(goto="revise", update={"revision_notes": human_decision["notes"]})
    return {"human_approved": False}
python
# Resume later with the same thread_id
config = {"configurable": {"thread_id": thread_id}}
graph.invoke(Command(resume={"action": "approved"}), config=config)

Idempotency for side effects (no retries):

Publishing is a side effect - retrying creates duplicates. The skill showed how to use idempotency keys to prevent this.

python
def publish_node(state: ResearchState) -> Dict[str, Any]:
    idempotency_key = f"{state['thread_id']}-{state['turn_id']}"
 
    if is_already_published(idempotency_key):
        return {"published": True, "idempotent_skip": True}
 
    publish_to_external_system(state["final_report"], idempotency_key)
    return {"published": True}
 
# CRITICAL: No retry_policy attached to this node
# builder.add_node("publish", publish_node)

Key insight from the skill:

The skill taught the critical distinction:

  • "Retry the model call" (safe - idempotent)
  • "Retry the tool call" (dangerous for side effects)

Provided production-ready patterns:

  • RetryPolicy for transient failures (search, summarization)
  • interrupt() for human decisions (approval, clarification)
  • Idempotency keys for side effects (publishing, notifications)

Step 5: Testing & Evaluation (langgraph-testing-evaluation)

The challenge: Manual testing doesn't scale. We needed automated unit tests for individual nodes AND trajectory evaluation for end-to-end workflows. LangSmith provides evaluation infrastructure, but the setup isn't obvious.

What the skill provided:

The langgraph-testing-evaluation skill provided:

  1. Unit test patterns for individual nodes (pytest fixtures, async handling)
  2. LangSmith evaluation workflow (dataset creation, evaluator functions, regression testing)

Unit tests for nodes:

The skill provided pytest patterns for testing individual nodes with mocked dependencies.

python
# tests/test_nodes.py
import pytest
from unittest.mock import AsyncMock, patch
 
@pytest.mark.asyncio
async def test_search_worker_returns_structured_results():
    state = ResearchState(query="LangGraph patterns", max_sources=5, thread_id="test-123")
 
    with patch("aiohttp.ClientSession.post") as mock_post:
        mock_post.return_value.__aenter__.return_value.json = AsyncMock(return_value=mock_response)
        result = await search_worker(state)
 
    assert "search_results" in result
    assert len(result["search_results"]) == 2
    assert all(
        {"url", "title", "snippet", "relevance_score"} <= r.keys()
        for r in result["search_results"]
    )

Similar tests covered timeout handling, fact-check logic, and orchestrator routing.

LangSmith evaluation workflow:

The skill showed how to create evaluation datasets and run regression testing. We created a dataset with expected outputs (source count, fact-check count, report length) and evaluator functions to check quality.

python
from langsmith import Client
from langsmith.evaluation import evaluate
 
client = Client()
dataset = client.create_dataset("research-agent-eval")
client.create_examples(
    dataset_id=dataset.id,
    inputs=[{"query": "What are LangGraph checkpointing patterns?"}],
    outputs=[{"expected_num_sources": 10, "expected_fact_checks": 3}]
)
 
def evaluate_research_quality(run, example):
    state = run.outputs.get("final_state", {})
    return {
        "source_count_sufficient": len(state.get("search_results", [])) >= example.outputs["expected_num_sources"],
        "fact_checks_performed": len(state.get("fact_checks", [])) >= example.outputs["expected_fact_checks"],
    }
 
results = evaluate(run_agent, data=dataset, evaluators=[evaluate_research_quality])

Key insight from the skill:

The skill provided the complete LangSmith evaluation workflow:

  • Dataset creation patterns
  • Evaluator function templates
  • Regression testing setup

Before this skill, we would have:

  1. Manually tested each scenario (hours of repetitive work)
  2. Missed edge cases that automated evaluation caught
  3. Had no regression testing for future changes

Step 6: Debugging with Trace Analysis (langsmith-trace-analyzer)

The challenge: In pre-production, we noticed intermittent fact-check failures (5% of requests). Manual debugging was impossible - we needed systematic trace analysis to identify the failure pattern.

What the skill provided:

The langsmith-trace-analyzer skill provided patterns for:

  1. Downloading traces filtered by error status
  2. Analyzing failure patterns by node
  3. Identifying conditions that trigger failures

The debugging scenario:

We noticed 5% error rate in pre-production. The skill showed how to download and analyze traces systematically.

python
from langsmith import Client
 
traces = client.list_runs(project_name="research-agent-preprod", filter='eq(error, true)', limit=100)
 
# Organize by failure node
failures_by_node = {}
for trace in traces:
    for run in trace.child_runs:
        if run.error:
            failures_by_node.setdefault(run.name, []).append(trace)
 
# Result: 95% of failures in fact_check_worker, all with source_count > 15 and execution_time > 30s

Traces revealed the pattern: fact-check node timed out when processing >15 summaries in one LLM call.

The fix - batch processing:

python
# BEFORE: Processing all summaries in one LLM call (timed out at >15 summaries)
async def fact_check_worker(state: ResearchState) -> Dict[str, Any]:
    all_summaries = "\n\n".join(state["summaries"])
    fact_checks = await llm.ainvoke(f"Fact check: {all_summaries}")
    return {"fact_checks": fact_checks}
 
# AFTER: Process in batches of 3 with 10s timeout per batch
async def fact_check_worker(state: ResearchState) -> Dict[str, Any]:
    BATCH_SIZE = 3
    fact_checks = []
 
    for i in range(0, len(state["summaries"]), BATCH_SIZE):
        batch = state["summaries"][i:i+BATCH_SIZE]
        result = await asyncio.wait_for(
            llm.ainvoke(f"Fact check: {'\n\n'.join(batch)}"),
            timeout=10.0
        )
        fact_checks.extend(parse_fact_checks(result))
 
    return {"fact_checks": fact_checks}

After the fix, error rate dropped from 5% to 0.2% in this deployment. Re-running evaluations confirmed the batching approach resolved the timeout issue.

Measurement note: These percentages come from our internal LangSmith traces for this single project (computed as erroring runs / total runs for this workflow over a defined pre-production window). Treat them as directional, environment-specific results rather than universal benchmarks.

Key insight from the skill:

Traces surfaced the exact failure mode (timeout threshold) and conditions (source count > 15) that would have been invisible without structured observability.

The skill taught:

  • How to filter traces by error status
  • How to extract failure patterns from trace metadata
  • How to correlate errors with state conditions

Step 7: Pre-Production Deployment (langsmith-deployment)

The challenge: Deploying to pre-production in LangSmith Cloud requires valid configuration, passing tests, and monitoring setup. Mistakes (invalid graph path, missing env vars) cause silent deployment failures.

What the skill provided:

The langsmith-deployment skill provided:

  1. Configuration validation script
  2. Complete CI/CD pipeline template
  3. Monitoring setup patterns

Configuration validation:

bash
# Skill-provided validation script
$ uv run scripts/validate_config.py langgraph.json
 
 Schema valid
 Graph path exists: ./src/research_agent/graph.py:app
 Environment variables defined: OPENAI_API_KEY, TAVILY_API_KEY, LANGSMITH_API_KEY
 Dependencies installable (checked via uv)
 Warning: No retry_policy on 'publish' node (expected for side effects)
 Checkpointer configured: PostgreSQL
 
Deployment ready: 5/6 checks passed (1 warning)

CI/CD pipeline:

The skill provided a GitHub Actions template that validates config and runs tests, with deployment handled by LangSmith Cloud's GitHub integration.

yaml
# .github/workflows/deploy-research-agent.yml
jobs:
  test:
    steps:
      - name: Run unit tests
        run: uv run pytest tests/ -v
      - name: Run evaluations
        run: uv run python tests/test_evaluation.py
 
  deploy:
    needs: test
    steps:
      - name: Validate langgraph.json
        run: uv run scripts/validate_config.py langgraph.json
      - name: Deploy trigger
        run: echo "LangSmith Cloud deploys from the connected GitHub repository after checks pass."
      - name: Set up monitoring
        run: echo "Configure alerts in LangSmith (UI or Deployment API) for error rate and latency thresholds."

Monitoring setup:

Alerts configured for error rate > 5%, p95 latency > 30s, and excessive retries. Daily summaries sent to Slack with run counts, error rates, and top failure patterns.

Key insight from the skill:

The skill provided the complete pre-production checklist:

  • Validation before deployment (catch config errors early)
  • Testing in CI (no untested code reaches pre-production)
  • Monitoring setup (know when things break)

Not just "how to deploy" but "how to deploy safely."


What We Learned: Skills in Practice

Timeline breakdown (6 days total):

  • Day 1: Project setup, architecture selection, initial state schema (3 skills: langgraph-project-setup, langgraph-agent-patterns, langgraph-state-management)
  • Day 2-3: Worker node implementation, retry policies, human-in-the-loop (2 skills: langgraph-error-handling, continued state refinement)
  • Day 4: Testing setup, evaluation dataset creation, unit tests (1 skill: langgraph-testing-evaluation)
  • Day 5: Debugging via traces, performance optimization (batching fix) (1 skill: langsmith-trace-analyzer)
  • Day 6: Pre-production deployment, monitoring setup, handoff documentation (1 skill: langsmith-deployment)

Observed time impact in this project:

AreaTypical effort (internal baseline)Effort with skills (this project)
Project setup2-3 hours~5 minutes
Architecture selection4-6 hours~30 minutes
State schema + reducers3-4 hours~1 hour
Error handling patterns2-3 hours~1 hour
Testing + evaluation setup6-8 hours~2 hours
Trace-based debugging8-12 hours~2 hours
Deployment readiness + monitoring4-5 hours~1 hour

What skills accelerated:

  1. Zero architecture debate - langgraph-agent-patterns provided decision tree, we chose orchestrator-worker in 30 minutes vs 4-6 hours of research
  2. State schema took 1 hour, not 1 day - Skill showed exactly when to use operator.add vs custom reducers, no trial-and-error with race conditions
  3. Debugging was systematic - Trace-analyzer skill taught metadata tagging and pattern detection, we identified the batching issue in 2 hours vs days of guessing

What would have taken weeks without skills:

  • Trial-and-error with state reducers and MessagesState patterns (would have hit race conditions in pre-production, spent days debugging)
  • Implementing retry logic from scratch (would have gotten exponential backoff wrong, retried side effects, created duplicate publications)
  • Setting up proper observability and trace analysis (would have relied on logs and print statements, missed the batching issue entirely)

Key realization:

Skills didn't just save time - they eliminated entire categories of mistakes: retrying side effects (duplicate publications), wrong reducer patterns (race conditions), and sequential processing (3x slower latency). The skills framework turned Claude Code into a LangChain domain expert with production-ready patterns, not just documentation.


Takeaway

Building pre-production LangGraph agents doesn't have to start from zero. The langchain-agent-skills repository gave Claude Code domain expertise - loading the right patterns automatically instead of requiring documentation searches.

What made the difference: 7 skills covered 90% of pre-production requirements with production-ready patterns (not toy examples), progressive disclosure that kept context clean, and executable automation that validated configs and scaffolded projects.

The bottom line: 6 days from requirements to pre-production because we built on battle-tested patterns instead of reinventing state machines, retry policies, and evaluation workflows.


Try It Yourself

Install the skills in Claude Code following the setup instructions at github.com/Lubu-Labs/langchain-agent-skills. Browse the skill catalog in the repository README and start your next LangGraph project - skills trigger automatically based on your intent.

Found a production-ready pattern worth sharing? Contribute by packaging it as a skill and opening a PR to the repository.

Need help shipping pre-production agents fast? Schedule a free consultation to discuss your use case. We delivered this research agent in 6 days using these exact patterns.


Sources & Further Reading

Ready to Transform Your Business?

Let's discuss how Lubu Labs can help you leverage AI to drive growth and efficiency.