Skip to main content

eio

Eio concurrency patterns for OCaml applications. Use when Claude needs to: (1) Write concurrent OCaml code with Eio, (2) Handle network operations with cohttp-eio, (3) Manage resource lifecycles with switches, (4) Implement rate limiting or synchronization, (5) Create parallel operations with fibers, (6) Test async code with Eio_mock, (7) Integrate with bytesrw for streaming, or any other Eio-based concurrency tasks

Stars
31
Source
avsm/ocaml-claude-marketplace
Updated
2026-05-24
Slug
avsm--ocaml-claude-marketplace--eio
View on GitHubRaw SKILL.md

// install — copy + paste into any project

mkdir -p .claude/skills && curl -fsSL https://raw.githubusercontent.com/avsm/ocaml-claude-marketplace/HEAD/plugins/ocaml-dev/skills/eio/SKILL.md -o .claude/skills/eio.md

Drops the SKILL.md into .claude/skills/eio.md. Works with Claude Code, Cursor, and any agent that loads SKILL.md files from .claude/skills/.

Eio Concurrency

Core Concepts

Why Eio

Eio is an effects-based IO library for OCaml 5. Advantages over Lwt/Async:

  • Direct-style code: No monads, concurrent code looks like sequential code
  • Performance: Real stacks, no heap allocations to simulate continuations
  • Better backtraces: Exceptions show proper call traces
  • Platform optimization: Generic API with optimized backends (Linux io_uring, POSIX, Windows)

Capability-Based Design

Pass capabilities explicitly instead of using global resources:

type t = {
  net : _ Eio.Net.t;
  clock : _ Eio.Time.clock;
  fs : _ Eio.Path.t;
}

(* Function signature reveals what resources it needs *)
val connect : net:_ Eio.Net.t -> host:string -> connection

Do: Pass net, clock, fs explicitly—makes dependencies clear and testable.

Don't: Use global modules like Unix.gettimeofday or access ambient resources.

Structured Concurrency with Switches

Eio.Switch.run manages resource and fiber lifecycles:

Eio.Switch.run @@ fun sw ->
  let conn = connect ~sw server in
  (* conn automatically closed when sw exits *)
  process conn

Do: Create switches in the smallest possible scope.

Don't: Take a switch argument if you could create one internally.

Fibers

Lightweight concurrent units running on a single core:

(* Run two operations concurrently *)
Eio.Fiber.both
  (fun () -> download file1)
  (fun () -> download file2)

(* Only one fiber executes at a time until one performs an effect *)

Cancellation

Cancellation contexts form a tree. Uncaught exceptions propagate upward, cancelling siblings:

(* If one branch fails, the other gets Cancelled *)
Eio.Fiber.both
  (fun () -> may_fail ())
  (fun () -> other_work ())  (* receives Cancelled if may_fail raises *)

Use Cancel.protect for operations that must complete:

Eio.Cancel.protect @@ fun () ->
  (* This won't be cancelled even if parent is *)
  flush_and_close connection

Common Patterns

Concurrent Operations

(* Parallel map *)
let fetch_all_items api item_ids =
  Eio.Fiber.List.map (fun id -> Api.item_info api id) item_ids

(* Race with timeout *)
let with_timeout ~clock duration fn =
  Eio.Fiber.first
    (fun () -> Eio.Time.sleep clock duration; Error `Timeout)
    (fun () -> Ok (fn ()))

(* Fork background task attached to switch *)
Eio.Fiber.fork ~sw (fun () -> background_work ())

File Operations

let ( / ) = Eio.Path.( / )

(* Read file *)
let content = Eio.Path.load (fs / "config.json")

(* Write file with permissions *)
Eio.Path.save ~create:(`Or_truncate 0o600) (fs / "data.bin") content

(* Create directory if needed *)
Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 (fs / "cache")

(* Clean up directory tree *)
Eio.Path.rmtree (fs / "tmp")

(* Check file type *)
match Eio.Path.kind ~follow:true path with
| `Directory -> ...
| `Regular_file -> ...
| `Not_found -> ...

Network Operations

(* TCP client *)
Eio.Net.connect ~sw net (`Tcp (addr, port))

(* TCP server *)
Eio.Net.run_server sock ~on_error:log_error
  ~max_connections:100
  (fun ~sw flow addr -> handle_client ~sw flow)

HTTPS with cohttp-eio

let https_handler uri raw_flow =
  let tls_config = create_tls_config () in
  let host = Uri.host uri |> Option.map Domain_name.(host_exn % of_string_exn) in
  Tls_eio.client_of_flow ?host tls_config raw_flow

let client = Cohttp_eio.Client.make net ~https:(Some https_handler)

Buffered Reading

Always use max_size to prevent memory exhaustion:

let body = Eio.Buf_read.(of_flow ~max_size:10_000_000 flow |> take_all)

Synchronization Primitives

Mutex:

let mutex = Eio.Mutex.create ()
Eio.Mutex.use_rw mutex (fun () -> shared_state := new_value)

Semaphore for rate limiting:

let limiter = Eio.Semaphore.make 10
let with_rate_limit fn =
  Eio.Semaphore.acquire limiter;
  Fun.protect ~finally:(fun () -> Eio.Semaphore.release limiter) fn

Stream for producer/consumer:

let queue = Eio.Stream.create 100
Eio.Stream.add queue item      (* producer *)
let item = Eio.Stream.take queue  (* consumer, blocks if empty *)

Condition variables (must check in loop):

let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()

(* Waiter - MUST use while loop *)
Eio.Mutex.use_rw ~protect:false mutex (fun () ->
  while not (check_condition ()) do
    Eio.Condition.await cond mutex
  done)

(* Signaler *)
Eio.Condition.broadcast cond

Integration with bytesrw

Bridge Eio flows to bytesrw for streaming binary parsing:

(* Eio flow → bytesrw reader *)
let reader_of_flow ?(buf_len = 4096) flow =
  let cs = Cstruct.create buf_len in
  let read () =
    match Eio.Flow.single_read flow cs with
    | 0 -> Bytesrw.Bytes.Slice.eod
    | n ->
        let buf = Cstruct.to_bytes (Cstruct.sub cs 0 n) in
        Bytesrw.Bytes.Slice.make buf ~first:0 ~length:n
    | exception End_of_file -> Bytesrw.Bytes.Slice.eod
  in
  Bytesrw.Bytes.Reader.make read

(* Then use with Binary.Reader *)
let r = Binary.Reader.of_reader (reader_of_flow flow)
let frame = Tc_frame.parse r  (* effects propagate through *)

Note: This involves a copy from Cstruct (Bigarray) to bytes (heap). Unavoidable until OCaml has non-moving bytes.

RNG Initialization

Critical for crypto: Initialize RNG before any crypto operations.

(* For Eio applications *)
Eio_main.run @@ fun env ->
  Mirage_crypto_rng_eio.run (module Mirage_crypto_rng.Fortuna) env @@ fun () ->
    (* RNG now available *)
    let iv = Mirage_crypto_rng.generate 12 in
    ...

(* For Unix applications (simpler but less integrated) *)
Mirage_crypto_rng_unix.initialize (module Mirage_crypto_rng.Fortuna)

Failure to initialize results in predictable random numbers, breaking crypto security.

Error Handling

Errors use Eio.Io (err, context) with nested error codes:

try
  Eio.Net.connect ~sw net addr
with
| Eio.Io (Eio.Net.E (Connection_failure _), _) ->
    (* Specific error *)
    Error `Connection_failed
| Eio.Io (Eio.Net.E _, _) ->
    (* Any network error *)
    Error `Network_error
| Eio.Io _ ->
    (* Any Eio error *)
    Error `Io_error

Add context when re-raising:

Eio.Exn.reraise_with_context exn "while connecting to %s" host

For tests, hide backend details:

Eio.Exn.Backend.show := false

Testing with Eio_mock

Library: eio.mock (not eio_mock)

Test Setup Pattern

let setup_test f () =
  Mirage_crypto_rng_unix.use_default ();  (* or initialize RNG *)
  Eio_main.run @@ fun env ->
  Eio.Switch.run @@ fun sw ->
  let fs = Eio.Stdenv.fs env in
  let tmp = Eio.Path.(fs / Filename.get_temp_dir_name () / "test-dir") in
  (try Eio.Path.rmtree tmp with _ -> ());
  Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp;
  Fun.protect
    ~finally:(fun () -> try Eio.Path.rmtree tmp with _ -> ())
    (fun () -> f ~sw tmp)

let test_something = setup_test @@ fun ~sw tmp ->
  (* test code here *)

Mock Flow

let test_api_call () =
  Eio_main.run @@ fun _env ->
  let flow = Eio_mock.Flow.make "response" in
  (* IMPORTANT: Always end with `Raise End_of_file *)
  Eio_mock.Flow.on_read flow [
    `Return "{\"ok\": true}";
    `Raise End_of_file;
  ];
  (* use flow *)

Simulating Chunked Data

let test_partial_reads () =
  Eio_main.run @@ fun _env ->
  let flow = Eio_mock.Flow.make "chunked" in
  Eio_mock.Flow.on_read flow [
    `Return "\x00\x01";      (* First 2 bytes *)
    `Return "\x02\x03";      (* Next 2 bytes *)
    `Raise End_of_file;
  ];
  (* Parser must handle values spanning chunks *)

Mock Network

let test_network () =
  Eio_main.run @@ fun _env ->
  let net = Eio_mock.Net.make "mocknet" in
  let flow = Eio_mock.Flow.make "conn" in
  Eio_mock.Net.on_connect net [`Return flow];
  Eio_mock.Flow.on_read flow [`Return "data"; `Raise End_of_file];
  (* test network operations *)

Deadlock Detection

Eio_mock.Backend.run automatically detects deadlocks in tests.

Good and Bad Examples

Capability Passing

Bad: Using global/ambient resources

let fetch_data url =
  let now = Unix.gettimeofday () in  (* Global! *)
  let body = Http.get url in         (* Where does network come from? *)
  { timestamp = now; data = body }

Good: Explicit capability parameters

let fetch_data ~net ~clock url =
  let now = Eio.Time.now clock in
  Eio.Switch.run @@ fun sw ->
  let body = Http.get ~sw net url in
  { timestamp = now; data = body }

Switch Scope

Bad: Taking switch as argument when not needed

(* Why does this need a switch? Caller must manage lifetime *)
let read_config ~sw ~fs path =
  Eio.Path.load (fs / path)  (* No resources to manage! *)

Good: Create switch internally when needed

(* No switch needed - just reads a file *)
let read_config ~fs path =
  Eio.Path.load (fs / path)

(* Switch needed - manages connection lifetime *)
let with_connection ~sw ~net addr f =
  let conn = Eio.Net.connect ~sw net addr in
  f conn  (* conn closed when sw exits *)

Buffered Reading

Bad: No size limit allows memory exhaustion

let read_body flow =
  Eio.Buf_read.(of_flow flow |> take_all)  (* Unbounded! *)

Good: Always specify max_size

let read_body flow =
  Eio.Buf_read.(of_flow ~max_size:10_000_000 flow |> take_all)

Condition Variables

Bad: Single check can miss wakeups

Eio.Mutex.use_rw ~protect:false mutex (fun () ->
  if not (check_condition ()) then  (* Single if! *)
    Eio.Condition.await cond mutex;
  do_work ())

Good: Loop until condition is true

Eio.Mutex.use_rw ~protect:false mutex (fun () ->
  while not (check_condition ()) do  (* Loop handles spurious wakeups *)
    Eio.Condition.await cond mutex
  done;
  do_work ())

Error Handling

Bad: Catching all Eio errors indiscriminately

let connect addr =
  try Eio.Net.connect ~sw net addr
  with Eio.Io _ -> failwith "connection failed"  (* Loses context! *)

Good: Match specific errors, preserve context

let connect addr =
  try Eio.Net.connect ~sw net addr
  with
  | Eio.Io (Eio.Net.E (Connection_failure reason), _) ->
      Error (`Connection_failed reason)
  | Eio.Io (Eio.Net.E Connection_refused, _) ->
      Error `Refused
  | Eio.Io _ as exn ->
      Eio.Exn.reraise_with_context exn "connecting to %s" addr

Mock Testing

Bad: Mock flow without End_of_file hangs forever

let test_read () =
  Eio_main.run @@ fun _env ->
  let flow = Eio_mock.Flow.make "test" in
  Eio_mock.Flow.on_read flow [
    `Return "data";
    (* Missing End_of_file! Test hangs *)
  ];
  read_all flow

Good: Always end mock reads with End_of_file

let test_read () =
  Eio_main.run @@ fun _env ->
  let flow = Eio_mock.Flow.make "test" in
  Eio_mock.Flow.on_read flow [
    `Return "data";
    `Raise End_of_file;  (* Required! *)
  ];
  read_all flow

Cancellation Safety

Bad: Critical cleanup can be cancelled

Eio.Fiber.both
  (fun () -> may_fail ())
  (fun () ->
    do_work ();
    flush_and_close connection)  (* May be cancelled mid-flush! *)

Good: Protect critical operations

Eio.Fiber.both
  (fun () -> may_fail ())
  (fun () ->
    do_work ();
    Eio.Cancel.protect @@ fun () ->
    flush_and_close connection)  (* Completes even if cancelled *)

Common Gotchas

Issue Problem Solution
Racing reads Both reads may complete at kernel level Don't rely on Fiber.first for mutual exclusion
Signal handlers Most operations risk deadlock Only Eio.Condition.broadcast is safe
Condition spurious wakeup May wake without signal Always check condition in while loop
Mutex in signal handler Re-entrancy issues Use conditions instead
Forgetting End_of_file Mock flow hangs Always end mock reads with Raise End_of_file
Missing RNG init Predictable crypto Initialize before any crypto operations

Library Integration

Library Purpose Notes
Lwt_eio Run Lwt + Eio together Gradual migration path
Async_eio Run Async + Eio together Experimental
bytesrw Streaming byte parsing Requires copy (Cstruct→bytes)
cohttp-eio HTTP client/server Native Eio support
tls-eio TLS connections Use with cohttp-eio for HTTPS
mirage-crypto-rng Crypto RNG Use .unix sublibrary for entropy

Best Practices

Practice Description
Capability passing Pass net, clock, fs explicitly
Structured concurrency Use Switch.run, keep scopes small
Handle errors at boundaries Convert Eio.Io to domain errors
Buffered reading Use Buf_read with max_size
Parallel with fibers Fiber.List.map, Fiber.both, Fiber.first
Isolate blocking Eio_unix.run_in_systhread for non-Eio blocking
Test with mocks Eio_mock for deterministic tests
Initialize RNG Before any crypto operations

References