Skip to content

Add automatic message encoding/decoding via codec registry#192

Open
baelter wants to merge 50 commits intomainfrom
message-codec-registry
Open

Add automatic message encoding/decoding via codec registry#192
baelter wants to merge 50 commits intomainfrom
message-codec-registry

Conversation

@baelter
Copy link
Copy Markdown
Member

@baelter baelter commented Mar 9, 2026

Summary

Adds automatic message serialization/deserialization via parser/coder registries on AMQPSession. Two-stage pipeline: serialization (content-type) then compression (content-encoding). Fully opt-in: no behavior change unless parsers/coders are configured. Low-level AMQPChannel API untouched.

  • Built-in parsers: text/plain, application/json. Built-in coders: gzip, deflate (web CompressionStream, zero new dependencies).
  • Users can register custom parsers/coders (snappy, protobuf, msgpack, etc.) via plain object literals.
  • Publish side encodes automatically when the session has parsers/coders configured.
  • Consume side decodes automatically; msg.body is typed by the parser-map generic (Uint8Array | null in plain mode, parser-output union in codec mode).
  • Both sides throw on unknown content-encoding (symmetric: a misconfigured publisher won't silently send uncompressed data labeled as compressed, and a consumer won't silently return compressed bytes to a callback expecting decoded data).

Modeled after the Ruby client's message codec system. Supersedes #181 (compression-only, added native deps, wrong abstraction layer).

High-level publish defaults

AMQPQueue.publish and AMQPExchange.publish default to:

  • confirm: true — wait for broker ack
  • deliveryMode: 2 — persistent (matches the Ruby client)

Both are opt-out: pass { confirm: false } or { deliveryMode: 1 } to override. The low-level AMQPChannel.basicPublish is unchanged. RPC client/server paths stay transient since amq.rabbitmq.reply-to is inherently transient.

Usage

import { AMQPSession, builtinParsers, builtinCoders } from "@cloudamqp/amqp-client"

const session = await AMQPSession.connect("amqp://localhost", {
  parsers: builtinParsers,
  coders: builtinCoders,
  defaultContentType: "application/json",
  defaultContentEncoding: "gzip",
})

const q = await session.queue("my-queue")
await q.publish({ hello: "world" }) // auto JSON + gzip

await q.subscribe(async (msg) => {
  console.log(msg.body) // { hello: "world" }
})

Custom parser:

import type { AMQPParser } from "@cloudamqp/amqp-client"

const csvParser: AMQPParser<string[][], string[][]> = {
  serialize: (rows) => new TextEncoder().encode(rows.map(r => r.join(",")).join("\n")),
  parse: (bytes) => new TextDecoder().decode(bytes).split("\n").map(l => l.split(",")),
}

const session = await AMQPSession.connect(url, {
  parsers: { ...builtinParsers, "text/csv": csvParser },
  defaultContentType: "text/csv",
})

Types

  • ResolveBody<P, KP> = what publish methods accept. Resolves to the rich type from the matching parser when codecs are configured, PlainBody otherwise.
  • PlainBody = raw wire types only (string | Uint8Array | ArrayBuffer | Buffer | null).
  • AMQPMessage<P> = generic message. msg.body is Uint8Array | null in plain mode; union of parser outputs plus Uint8Array | null in codec mode.
  • msg._rawBytes = Uint8Array | null (always raw wire bytes, @internal).

Typing approach

Users never write generics. Passing parsers to connect() infers a typed session; omitting them gives a plain session.

// Plain session — publish accepts string/Uint8Array/null, msg.body is Uint8Array | null
const plain = await AMQPSession.connect("amqp://localhost")

// Codec session — publish accepts JsonSerializable for "application/json", msg.body is decoded
const codec = await AMQPSession.connect("amqp://localhost", {
  parsers: builtinParsers,
  defaultContentType: "application/json",
})

Under the hood, four generic parameters propagate through Session, Queue, Exchange, Message, GeneratorSubscription, RPCClient, and RPCServer:

  • P extends ParserMap — the parser map
  • C extends CoderMap — the coder map
  • KP extends keyof P & string — the default content-type key (constrains defaultContentType to registered keys)
  • KC extends keyof C & string — the default content-encoding key

defaultContentType is type-checked against the registered parser keys, so typos are compile errors. Type casts are confined to the decode boundary.

Rejected alternatives

  • Boolean generic (AMQPSession<true/false>): Same mechanics but AMQPSession<false> is cryptic.
  • Factory functions (createParserRegistry({}, true)): The original shape; replaced with builtinParsers/builtinCoders constants because the factory was almost an identity function plus a flag — clunky for the common "just enable JSON" case.
  • SessionMessage wrapper: Separate class wrapping AMQPMessage with a decoded body. Doubled the message API surface.
  • Subclass (AMQPCodecSession extends AMQPSession): Duplicates method signatures, awkward factory pattern, inheritance is the wrong tool for a config flag.

Test plan

  • Unit tests for codec registry (JSON, text/plain, gzip, deflate, custom codecs, defaults, passthrough)
  • Integration tests with broker (publish/consume with codecs, subscribe callback + iterator, RPC, reconnect recovery)
  • Type safety: codec session accepts rich types via ResolveBody<P, KP>, plain session restricts to PlainBody

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as outdated.

@baelter baelter marked this pull request as ready for review March 10, 2026 09:32
@baelter baelter requested a review from snichme March 10, 2026 09:33
Comment thread src/amqp-publisher.ts Outdated
Comment thread src/amqp-message.ts Outdated

This comment was marked as outdated.

baelter added 4 commits March 16, 2026 14:50
Introduce AMQPCodecRegistry for two-stage message transformation:
serialization (content-type) and compression (content-encoding).

Built-in parsers: text/plain, application/json.
Built-in coders: gzip, deflate (via web CompressionStream, zero deps).
Custom codecs can be registered for snappy, protobuf, msgpack, etc.

Publish side: session runs body through serializeAndEncode() before
basicPublish when a registry is configured.

Consume side: AMQPMessage.parse() reverses the pipeline (decompress
then deserialize). Registry is attached automatically to messages
delivered via the session's queue subscribe/get methods.

Fully opt-in: no behavior change without configuring codecs.
Low-level AMQPChannel API is untouched.
Cover the full publish-encode-consume-parse pipeline through the
session layer: JSON via callback, default contentType/contentEncoding,
explicit override, gzip compression, async generator, exchange publish,
and the no-codecs fallback.
- Feature-detect CompressionStream/DecompressionStream in
  enableBuiltinCoders() with a clear error for Node <18
- Simplify publish body type to unknown (documented: requires codecs
  for non-Body values, raw types work without codecs)
- Remove unused Body import from queue and exchange
- Fix test: compare compressed bytes as Uint8Array, not decoded text
- Fix test: properly await subscribe and cancel in callback test
- Expand enableBuiltinCoders() check to include Blob and Response
- Throw clear error when contentType is set but no parser registered
  and body isn't a wire type (string/Buffer/Uint8Array)
- Throw clear error when body is non-serializable and no contentType
  is specified
- Add tests for both error paths
baelter added 5 commits March 16, 2026 14:54
…e.ts

decodeMessage is now a method on the registry instead of a standalone
function in its own file.
- body: unknown is now the public field for both raw and decoded data
- rawBody (marked @internal, stripped from .d.ts) holds raw bytes for
  the frame parser; body defaults to rawBody, codecs overwrite it
- Upgrade eslint to strict + stylistic presets, fix all violations
- Remove decodedBody/setDecodedBody/isDecoded in favor of body
Replace legacy @Property tags with inline JSDoc on each field.
TypeDoc reads member-level comments directly, so this improves
the generated docs site.
@baelter baelter force-pushed the message-codec-registry branch from d14fab2 to 2354772 Compare March 16, 2026 14:00
baelter added 4 commits March 16, 2026 15:22
The codec branch had upgraded to strict + stylistic with rule overrides
to compensate. Revert to match main's recommended config.
CodecMode controls publish-side type narrowing, not message structure.
Colocate it with the related Body/PlainBody types.
Verify Body<C> narrowing, generic propagation through session/queue/
exchange/rpc classes, default type parameters, and publish constraints.

This comment was marked as resolved.

baelter added 3 commits March 17, 2026 10:59
…st path

- Parameterize RPCHandler<C> by CodecMode so plain sessions only allow
  PlainBody replies and codec sessions allow rich bodies. Remove unsafe
  `as Body<C>` cast.
- Move decodeMessage() inside try/catch in auto-ack callback wrapper so
  decode errors nack the message instead of becoming unhandled rejections.
- Make encodeBody() sync when no codecs are configured, avoiding an
  unnecessary async hop on the publish hot path.
- Add RPCHandler type safety tests.
The async generator's outer catch was sniffing error messages to decide
which errors to re-throw. Use AMQPError instanceof check instead — channel
errors are AMQPError, decode errors are plain Error.

This comment was marked as resolved.

baelter added 2 commits March 17, 2026 15:07
AMQPMessage<"plain"> keeps body: Uint8Array | null (no breaking change
for existing users). AMQPMessage<"codec"> has body: unknown (decoded by
the configured codec registry).

The generic propagates through all session-level classes: Queue,
Exchange, Subscription, GeneratorSubscription, RPCClient, RPCServer.
Type casts are confined to the decode boundary.

Also addresses review feedback:
- Fix async generator error handling: use decodeError flag instead of
  instanceof AMQPError (which missed plain Error from client close)
- Export PlainBody and MessageBody from public API
- RPCHandler<C> now receives AMQPMessage<C>
- rpcClient.call() returns AMQPMessage<C>
Don't hard-code requeue=false on decode failure. Poison message handling
is an application concern (dead-lettering, etc.), not the library's.
@baelter baelter requested a review from antondalgren March 17, 2026 14:34
Copy link
Copy Markdown
Contributor

@antondalgren antondalgren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not at peace with the typing parts of the decode/parse parts. But I'm not able to wrap my head around how we can make it better and accurate.

The parts exposed to the end consumer has proper typing, which I think is good enough for now.

@baelter baelter marked this pull request as draft March 20, 2026 08:28
* WIP: Type-safe parser registry with ResolveBody and K defaults

- Add createParserRegistry factory with typed ParserMap/Registry
- Add ResolveBody<T, O> helper for content-type → body type cascade
- Default K to never (no default contentType = PlainBody)
- Make AMQPSessionOptions, AMQPSession, AMQPQueue generic on T/K
- Add serializeBody with contentType constrained to parser keys
- Update type-safety tests for ResolveBody and parser-aware queues
- QueuePublishOptions now constrains contentType to registered parsers

Known issues: downstream classes (Exchange, RPC) not yet threaded with T/K

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: AMQPParser<In, Out>, rename Registry to ParserRegistry, add JsonSerializable

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style: apply prettier formatting fixes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add createCoderRegistry factory with typed registry

Adds CoderMap/CoderRegistry types and a createCoderRegistry() factory
parallel to createParserRegistry(), with optional built-in gzip/deflate
coders via the useDefaults flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: standalone serializeAndEncode and decodeAndParse with sync fast path

Extract logic from AMQPCodecRegistry into standalone exported functions that
accept ParserRegistry/CoderRegistry objects directly. Returns a sync result
when no coder is involved, or a Promise when encoding/decoding is needed.
Also adds a standalone decodeMessage helper for use in Task 4.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: replace CodecMode with P/C/KP/KC generics across all classes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove unnecessary any casts and eslint-disable comments

Use ParserMap/CoderMap base types for standalone function parameters
instead of ParserRegistry<any>/CoderRegistry<any>. This eliminates
all type casts at call sites and ~20 eslint-disable comments.

ResolveBody now uses InferParserInput<P[O]> instead of structural match.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: remove AMQPCodecRegistry class, export decodeAndParse and inference types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: rewrite codec registry tests for standalone functions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test: rewrite type-safety tests for P/C/KP/KC generic system

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test: update session codec integration tests for new API

Replace AMQPCodecRegistry/CodecMode with createCoderRegistry and P/C/KP/KC generics.
Fix serializeBody call, unused properties params, q.publish leftover, and RPC handler return types.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: update rawBody → _rawBytes in remaining test files, extend eslint config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: replace any with unknown/never in ParserMap types, format test.ts

Method shorthand on AMQPParser gives bivariance, so AMQPParser<unknown,
unknown> accepts narrower parsers. Inference helpers use unknown/never
in the slot we discard so eslint no-explicit-any is satisfied without
disable comments.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Anders Bälter <anders@84codes.com>
@baelter baelter marked this pull request as ready for review May 6, 2026 05:28
The factory call shape `createParserRegistry({}, true)` was clunky for the
common case of "just give me JSON+text/plain": empty object literal plus a
magic boolean. Replace with two exported constants, `builtinParsers` and
`builtinCoders`. Common case becomes one import and one assignment.

Custom + builtins is now a spread:
  parsers: { ...builtinParsers, "text/csv": csvParser }

Tests in 84codes/training-tool#216 motivated this.
@baelter
Copy link
Copy Markdown
Member Author

baelter commented May 7, 2026

Replaced the createParserRegistry({}, true) / createCoderRegistry({}, true) factory shape with builtinParsers and builtinCoders constants.

Before:

import { AMQPSession, createParserRegistry } from "@cloudamqp/amqp-client"

const session = await AMQPSession.connect(url, {
  parsers: createParserRegistry({}, true),
  defaultContentType: "application/json",
})

After:

import { AMQPSession, builtinParsers } from "@cloudamqp/amqp-client"

const session = await AMQPSession.connect(url, {
  parsers: builtinParsers,
  defaultContentType: "application/json",
})

The factory function was effectively (x) => x plus a magic boolean for merging built-ins. For the common case ("just enable JSON+text/plain") that boilerplate added nothing. Custom parsers are still possible — pass an object literal, optionally spread builtinParsers to keep them:

parsers: { ...builtinParsers, "text/csv": csvParser }

Generic inference still works: keyof typeof builtinParsers gives "text/plain" | "application/json", so defaultContentType stays type-checked.

Motivated by the boilerplate that stood out in https://github.com/84codes/training-tool/pull/216.

Tradeoffs considered:

  • Boolean shortcut on options (useBuiltinParsers: true): would need conditional types or overloads to make defaultContentType infer correctly — combinatorial overload explosion when combined with custom parsers and the coder side.
  • Factory with no args (createParserRegistry() returns built-ins): saves one arg but still requires importing a factory and calling it.
  • Static helper (AMQPSession.builtins()): just an alternative form of the constants approach with extra ceremony.

Constants won on simplicity: one import, no call, no allocation per connect.

baelter added 2 commits May 7, 2026 15:21
AMQPExchange.bind/unbind already accept `string | AMQPExchange`. Mirror
that on AMQPQueue so users don't have to reach for `exchange.name` after
working with the exchange handle.

  await queue.bind(exchange, "user-id")  // now works
AMQPQueue.publish and AMQPExchange.publish now default `deliveryMode: 2`
when not set, matching the Ruby client. The high-level API already defaults
`confirm: true` — persistent-by-default is the consistent stance for an API
that optimizes for "don't lose data." Explicit `deliveryMode: 1` is honored.

Low-level AMQPChannel.basicPublish stays as-is. RPC client/server publish
paths are not touched: requests/replies via amq.rabbitmq.reply-to are
inherently transient.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants