Skip to main content
AI/MLjeremylongshore

langchain-webhooks-events

"Dispatch LangChain 1.0 chain/agent events to external systems \u2014\

Stars
2,267
Source
jeremylongshore/claude-code-plugins-plus-skills
Updated
2026-05-31
Slug
jeremylongshore--claude-code-plugins-plus-skills--langchain-webhooks-events
View on GitHubRaw SKILL.md

// install — copy + paste into any project

mkdir -p .claude/skills && curl -fsSL https://raw.githubusercontent.com/jeremylongshore/claude-code-plugins-plus-skills/HEAD/plugins/saas-packs/langchain-py-pack/skills/langchain-webhooks-events/SKILL.md -o .claude/skills/langchain-webhooks-events.md

Drops the SKILL.md into .claude/skills/langchain-webhooks-events.md. Works with Claude Code, Cursor, and any agent that loads SKILL.md files from .claude/skills/.

LangChain Webhooks and Event Dispatch (Python)

Overview

A team wires per-tool webhook dispatch from their LangChain agent via FastAPI BackgroundTasks — analytics is always N seconds late because BackgroundTasks fire after the HTTP response closes, not during the stream (P60). Worse: the BaseCallbackHandler they attached via .with_config(callbacks=[h]) fires on the outer agent but is dark on the subagent's tool calls — custom callbacks are not inherited by LangGraph subgraphs (P28), they must be passed via config["callbacks"] at invoke time.

Pain-catalog anchors handled here:

  • P28 — Callbacks via with_config don't propagate to subgraphs
  • P46 — SSE streams dropped by buffering proxies (see langchain-langgraph-streaming)
  • P47 — astream_events(v2) emits thousands of events; never forward raw
  • P48 — Sync invoke() inside async endpoint blocks the event loop
  • P60 — BackgroundTasks fire post-response; wrong for per-event dispatch

This skill walks through an async AsyncCallbackHandler with fire-and-forget dispatch, per-target sinks for HTTP / Kafka / Redis Streams / SNS, HMAC-signed delivery with 1s/5s/30s retry and DLQ, idempotency keys = `run_id + event_type

  • step_index, andconfig["callbacks"]wiring that makes subagent calls visible. Typical webhook latency budget: <500ms per event. Pin:langchain-core 1.0.x, langgraph 1.0.x. Scope: server-to-server dispatch only — UI streaming is in langchain-langgraph-streaming`.

Prerequisites

  • Python 3.10+
  • langchain-core >= 1.0, < 2.0, langgraph >= 1.0, < 2.0
  • httpx >= 0.27 for async HTTP (or aiohttp)
  • One of: aiokafka, redis[hiredis] >= 5, aioboto3 (per target)
  • An event sink — a webhook endpoint, Kafka topic, Redis Stream, or SNS topic
  • A shared secret (for HMAC) stored in your secret manager, not env

Instructions

Step 1 — Write an async handler that fire-and-forget dispatches

Sync dispatch from a callback blocks the chain — a slow HTTP POST during on_tool_end serializes all downstream tokens behind it (P48). Use asyncio.create_task(...) so the dispatch runs alongside the chain:

import asyncio
import uuid
from typing import Any
from langchain_core.callbacks import AsyncCallbackHandler

class EventDispatchHandler(AsyncCallbackHandler):
    """Fire-and-forget dispatch to external sinks.

    IMPORTANT: subclass AsyncCallbackHandler (not BaseCallbackHandler) so
    on_* methods are awaited. Mixing sync and async handlers is a silent
    footgun — sync on_* blocks the event loop (P48).
    """

    def __init__(self, sink, *, run_id: str | None = None):
        self.sink = sink                      # dispatch target — Step 3
        self.run_id = run_id or str(uuid.uuid4())
        self._tasks: set[asyncio.Task] = set()

    def _dispatch(self, event_type: str, payload: dict, step_index: int) -> None:
        # Fire-and-forget. Keep a strong reference so the task isn't GC'd
        # mid-flight (asyncio quirk — orphan tasks get garbage-collected).
        task = asyncio.create_task(
            self.sink.send(
                idempotency_key=f"{self.run_id}:{event_type}:{step_index}",
                event_type=event_type,
                payload=payload,
            )
        )
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def on_tool_end(self, output: Any, *, run_id, parent_run_id=None, **kwargs):
        # 4000 char cap — keep payload under typical webhook body limits
        # while preserving enough context for downstream analytics.
        MAX_OUTPUT_CHARS = 4000
        self._dispatch(
            "tool_end",
            {"output": str(output)[:MAX_OUTPUT_CHARS], "run_id": str(run_id)},
            step_index=kwargs.get("tags", []).__len__() or 0,
        )

    async def on_chain_end(self, outputs: dict, *, run_id, **kwargs):
        # Only named chains — skip the unnamed LCEL inner nodes (P47)
        name = kwargs.get("name")
        if not name or name.startswith("RunnableLambda"):
            return
        self._dispatch("chain_end", {"name": name, "run_id": str(run_id)}, step_index=0)

    async def drain(self, timeout: float = 5.0) -> None:
        """Call before process exit so in-flight dispatches complete."""
        if self._tasks:
            await asyncio.wait(self._tasks, timeout=timeout)

See Async Callback Handler for the full handler — on_llm_end, filtering, sync-vs-async decision.

Step 2 — Pass callbacks via config so subgraphs inherit them

P28: Runnable.with_config(callbacks=[h]) binds at definition and is not inherited by LangGraph subgraphs. Pass callbacks via config at invocation:

# WRONG — subagent tool calls never fire the handler
agent_with_handler = agent.with_config({"callbacks": [handler]})
await agent_with_handler.ainvoke({"messages": [...]})

# RIGHT — callbacks in config propagate into subgraphs
await agent.ainvoke(
    {"messages": [...]},
    config={"callbacks": [handler], "configurable": {"thread_id": "t1"}},
)

Validate propagation with a probe that counts events by kwargs["name"] and asserts the subagent's name appears. See Subgraph Propagation.

Step 3 — Pick a dispatch target by delivery semantics

Match the event to the transport:

Target Delivery Typical latency Use when Failure mode
HTTP webhook At-least-once (with retry) 50-500ms Partner integrations, Zapier/Make, customer-owned endpoints Endpoint 5xx → retry 1s/5s/30s → DLQ
Kafka (aiokafka) At-least-once (idempotent producer) 5-20ms intra-region High-volume telemetry, analytics fan-in Broker unavailable → retry + local buffer
Redis Streams (XADD) At-least-once (consumer groups) 1-5ms Near-realtime worker queues, progress fan-out Redis down → retry or spill to disk
SNS At-most-once (best-effort) 10-100ms Fan-out to multiple SQS/Lambda subscribers Best-effort only; accept loss or front with SQS FIFO

Handler stays provider-agnostic; only the sink changes. A minimal HTTP sink:

import hashlib, hmac, json, os
import httpx

WEBHOOK_URL = os.environ["WEBHOOK_URL"]
SIGNING_SECRET = os.environ["WEBHOOK_SIGNING_SECRET"].encode()

class WebhookSink:
    # 256-bit HMAC — industry-standard signature strength (GitHub, Stripe use same)
    SIG_ALG = hashlib.sha256
    # Retry schedule: 1s absorbs transient blips, 5s absorbs brief 503s,
    # 30s absorbs autoscaler / cold-start incidents. Beyond 30s = stale event.
    RETRY_DELAYS = (1, 5, 30)
    REQUEST_TIMEOUT_S = 5.0

    def __init__(self, client: httpx.AsyncClient):
        self.client = client

    async def send(self, *, idempotency_key: str, event_type: str, payload: dict) -> None:
        body = json.dumps({"event": event_type, "data": payload}, sort_keys=True).encode()
        sig = hmac.new(SIGNING_SECRET, body, self.SIG_ALG).hexdigest()
        headers = {
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
            "X-Signature-256": f"sha256={sig}",
        }
        for delay in self.RETRY_DELAYS:
            try:
                resp = await self.client.post(WEBHOOK_URL, content=body, headers=headers, timeout=self.REQUEST_TIMEOUT_S)
                if 200 <= resp.status_code < 300:
                    return
                if resp.status_code < 500 and resp.status_code != 429:
                    return  # 4xx (except 429) is not retryable
            except (httpx.TimeoutException, httpx.TransportError):
                pass
            await asyncio.sleep(delay)
        await self._dead_letter(idempotency_key, event_type, payload)

See Dispatch Targets for Kafka / Redis Streams / SNS sinks and per-target DLQ patterns.

Step 4 — Filter events so you don't saturate the downstream

astream_events(version="v2") emits thousands of events per invocation (P47). Never forward raw — dispatch only what the downstream consumes:

Callback method Typical decision Why
on_llm_start Skip Prompt content often contains PII; low value without masking
on_llm_new_token Skip for dispatch (UI only) 1 event per token; N/A to analytics
on_llm_end Dispatch for named chains only Token usage, final response — high value, low volume
on_chain_start Skip (P47 noise) LCEL emits one per inner runnable
on_chain_end Dispatch for named subgraphs only Stage completion — what analytics cares about
on_tool_start Optional (dispatch for audit log) Matters for compliance / tool-use audit
on_tool_end Dispatch always The key analytics signal in agent flows
on_agent_action Dispatch Cleaner signal than on_tool_start in agent graphs
on_agent_finish Dispatch Terminal event for the run

Rule of thumb: dispatch on_tool_end + named on_chain_end + on_llm_end. Everything else is noise.

Step 5 — Build idempotency keys and a retry budget

At-least-once transports mean duplicates. Build the key deterministically:

# run_id — unique per chain invocation (propagates into subgraphs)
# event_type — on_tool_end / on_chain_end / on_llm_end
# step_index — monotonic per-run counter you maintain in the handler

idempotency_key = f"{run_id}:{event_type}:{step_index}"

Retry budget: 1s → 5s → 30s (~36s total) then DLQ. Retry on 5xx / 429 / network only — 4xx (except 429) goes straight to DLQ. DLQ is a Redis Stream or S3 prefix keyed by YYYY/MM/DD/run_id/idempotency_key.json; alarm on depth growth. See Idempotency and Retry for HMAC verify, at-least-once vs at-most-once, and 24h de-dup window sizing.

Step 6 — Never dispatch from BackgroundTasks (P60)

FastAPI BackgroundTasks run after the response closes — exactly wrong for per-event dispatch. Events must go out during the chain:

# WRONG — events fire all at once after the stream ends
@app.post("/chat")
async def chat(req: Request, bg: BackgroundTasks):
    bg.add_task(agent.ainvoke, {"messages": [...]})  # late + no streaming
    return {"status": "accepted"}

# RIGHT — handler fires during the chain, each on_tool_end dispatches immediately
@app.post("/chat")
async def chat(req: ChatReq):
    handler = EventDispatchHandler(sink=webhook_sink, run_id=req.run_id)
    try:
        result = await agent.ainvoke(
            {"messages": req.messages},
            config={"callbacks": [handler], "configurable": {"thread_id": req.thread_id}},
        )
    finally:
        await handler.drain(timeout=5.0)  # flush in-flight dispatches
    return {"result": result}

drain() awaits in-flight asyncio.create_task() dispatches up to 5s so events aren't lost when the pod scales down mid-request.

Output

  • Async BaseCallbackHandler subclass with asyncio.create_task() fire-and-forget
  • Callbacks wired via config["callbacks"] at invoke time (subgraph-safe)
  • Per-target sink abstraction: HTTP / Kafka / Redis Streams / SNS
  • HMAC-signed webhook payloads with 1s/5s/30s retry and DLQ fallback
  • Idempotency key = run_id + event_type + step_index
  • Event-taxonomy filter: dispatch on_tool_end + named on_chain_end + on_llm_end
  • drain() on shutdown to flush in-flight dispatches

Error Handling

Error Cause Fix
Handler fires on outer agent but not subagent Bound via with_config at definition (P28) Pass via config["callbacks"] at invoke(...) time
Webhook analytics lags generation duration BackgroundTasks fire post-response (P60) Dispatch from the callback handler, never from BackgroundTasks
Browser / Kafka saturates on long generations Forwarded astream_events(v2) raw (P47) Filter events server-side; dispatch only on_tool_end/on_chain_end/on_llm_end
SSE stream hangs, no end event Proxy buffering (P46) Set X-Accel-Buffering: no — see langchain-langgraph-streaming
Event loop freezes on slow webhook Sync POST in callback (P48) Subclass AsyncCallbackHandler; use asyncio.create_task()
Duplicate events downstream At-least-once dispatch + retry Receiver dedupes on Idempotency-Key header with 24h cache
Orphan asyncio.create_task never runs GC collected the task Hold a strong reference in self._tasks and discard on completion
Events lost on pod shutdown In-flight tasks cancelled Call await handler.drain(timeout=5.0) in endpoint finally block
4xx webhook errors retrying 3x Retry logic retrying everything Retry only on 5xx / 429 / network; 4xx goes straight to DLQ
Signature verification fails on receiver Body re-serialized with different key order Sign the exact bytes you send; use sort_keys=True in json.dumps

Examples

Named subgraph dispatch with a LangGraph agent

Planner subagent runs search_docssummarize; outer agent needs a webhook on summarize completion. Full wiring in Subgraph Propagation.

Fan-out: webhook + Kafka + Redis Streams in one invocation

CompositeSink dispatches to multiple child sinks via asyncio.gather(..., return_exceptions=True) — one sink's failure doesn't block others. See Dispatch Targets.

HMAC-signed webhook receiver with de-dup

Verify X-Signature-256, SETNX Idempotency-Key against Redis with 24h TTL, 200 on both replay and first-seen. See Idempotency and Retry.

Resources