Add automatic message encoding/decoding via codec registry#192
Add automatic message encoding/decoding via codec registry#192
Conversation
0a1a134 to
0fda775
Compare
Introduce AMQPCodecRegistry for two-stage message transformation: serialization (content-type) and compression (content-encoding). Built-in parsers: text/plain, application/json. Built-in coders: gzip, deflate (via web CompressionStream, zero deps). Custom codecs can be registered for snappy, protobuf, msgpack, etc. Publish side: session runs body through serializeAndEncode() before basicPublish when a registry is configured. Consume side: AMQPMessage.parse() reverses the pipeline (decompress then deserialize). Registry is attached automatically to messages delivered via the session's queue subscribe/get methods. Fully opt-in: no behavior change without configuring codecs. Low-level AMQPChannel API is untouched.
Cover the full publish-encode-consume-parse pipeline through the session layer: JSON via callback, default contentType/contentEncoding, explicit override, gzip compression, async generator, exchange publish, and the no-codecs fallback.
- Feature-detect CompressionStream/DecompressionStream in enableBuiltinCoders() with a clear error for Node <18 - Simplify publish body type to unknown (documented: requires codecs for non-Body values, raw types work without codecs) - Remove unused Body import from queue and exchange - Fix test: compare compressed bytes as Uint8Array, not decoded text - Fix test: properly await subscribe and cancel in callback test
- Expand enableBuiltinCoders() check to include Blob and Response - Throw clear error when contentType is set but no parser registered and body isn't a wire type (string/Buffer/Uint8Array) - Throw clear error when body is non-serializable and no contentType is specified - Add tests for both error paths
…e.ts decodeMessage is now a method on the registry instead of a standalone function in its own file.
- body: unknown is now the public field for both raw and decoded data - rawBody (marked @internal, stripped from .d.ts) holds raw bytes for the frame parser; body defaults to rawBody, codecs overwrite it - Upgrade eslint to strict + stylistic presets, fix all violations - Remove decodedBody/setDecodedBody/isDecoded in favor of body
Replace legacy @Property tags with inline JSDoc on each field. TypeDoc reads member-level comments directly, so this improves the generated docs site.
d14fab2 to
2354772
Compare
The codec branch had upgraded to strict + stylistic with rule overrides to compensate. Revert to match main's recommended config.
CodecMode controls publish-side type narrowing, not message structure. Colocate it with the related Body/PlainBody types.
Verify Body<C> narrowing, generic propagation through session/queue/ exchange/rpc classes, default type parameters, and publish constraints.
…st path - Parameterize RPCHandler<C> by CodecMode so plain sessions only allow PlainBody replies and codec sessions allow rich bodies. Remove unsafe `as Body<C>` cast. - Move decodeMessage() inside try/catch in auto-ack callback wrapper so decode errors nack the message instead of becoming unhandled rejections. - Make encodeBody() sync when no codecs are configured, avoiding an unnecessary async hop on the publish hot path. - Add RPCHandler type safety tests.
The async generator's outer catch was sniffing error messages to decide which errors to re-throw. Use AMQPError instanceof check instead — channel errors are AMQPError, decode errors are plain Error.
AMQPMessage<"plain"> keeps body: Uint8Array | null (no breaking change for existing users). AMQPMessage<"codec"> has body: unknown (decoded by the configured codec registry). The generic propagates through all session-level classes: Queue, Exchange, Subscription, GeneratorSubscription, RPCClient, RPCServer. Type casts are confined to the decode boundary. Also addresses review feedback: - Fix async generator error handling: use decodeError flag instead of instanceof AMQPError (which missed plain Error from client close) - Export PlainBody and MessageBody from public API - RPCHandler<C> now receives AMQPMessage<C> - rpcClient.call() returns AMQPMessage<C>
Don't hard-code requeue=false on decode failure. Poison message handling is an application concern (dead-lettering, etc.), not the library's.
antondalgren
left a comment
There was a problem hiding this comment.
I'm not at peace with the typing parts of the decode/parse parts. But I'm not able to wrap my head around how we can make it better and accurate.
The parts exposed to the end consumer has proper typing, which I think is good enough for now.
* WIP: Type-safe parser registry with ResolveBody and K defaults - Add createParserRegistry factory with typed ParserMap/Registry - Add ResolveBody<T, O> helper for content-type → body type cascade - Default K to never (no default contentType = PlainBody) - Make AMQPSessionOptions, AMQPSession, AMQPQueue generic on T/K - Add serializeBody with contentType constrained to parser keys - Update type-safety tests for ResolveBody and parser-aware queues - QueuePublishOptions now constrains contentType to registered parsers Known issues: downstream classes (Exchange, RPC) not yet threaded with T/K Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: AMQPParser<In, Out>, rename Registry to ParserRegistry, add JsonSerializable Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * style: apply prettier formatting fixes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add createCoderRegistry factory with typed registry Adds CoderMap/CoderRegistry types and a createCoderRegistry() factory parallel to createParserRegistry(), with optional built-in gzip/deflate coders via the useDefaults flag. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: standalone serializeAndEncode and decodeAndParse with sync fast path Extract logic from AMQPCodecRegistry into standalone exported functions that accept ParserRegistry/CoderRegistry objects directly. Returns a sync result when no coder is involved, or a Promise when encoding/decoding is needed. Also adds a standalone decodeMessage helper for use in Task 4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: replace CodecMode with P/C/KP/KC generics across all classes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: remove unnecessary any casts and eslint-disable comments Use ParserMap/CoderMap base types for standalone function parameters instead of ParserRegistry<any>/CoderRegistry<any>. This eliminates all type casts at call sites and ~20 eslint-disable comments. ResolveBody now uses InferParserInput<P[O]> instead of structural match. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: remove AMQPCodecRegistry class, export decodeAndParse and inference types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test: rewrite codec registry tests for standalone functions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: rewrite type-safety tests for P/C/KP/KC generic system Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: update session codec integration tests for new API Replace AMQPCodecRegistry/CodecMode with createCoderRegistry and P/C/KP/KC generics. Fix serializeBody call, unused properties params, q.publish leftover, and RPC handler return types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: update rawBody → _rawBytes in remaining test files, extend eslint config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: replace any with unknown/never in ParserMap types, format test.ts Method shorthand on AMQPParser gives bivariance, so AMQPParser<unknown, unknown> accepts narrower parsers. Inference helpers use unknown/never in the slot we discard so eslint no-explicit-any is satisfied without disable comments. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Anders Bälter <anders@84codes.com>
The factory call shape `createParserRegistry({}, true)` was clunky for the
common case of "just give me JSON+text/plain": empty object literal plus a
magic boolean. Replace with two exported constants, `builtinParsers` and
`builtinCoders`. Common case becomes one import and one assignment.
Custom + builtins is now a spread:
parsers: { ...builtinParsers, "text/csv": csvParser }
Tests in 84codes/training-tool#216 motivated this.
|
Replaced the Before: import { AMQPSession, createParserRegistry } from "@cloudamqp/amqp-client"
const session = await AMQPSession.connect(url, {
parsers: createParserRegistry({}, true),
defaultContentType: "application/json",
})After: import { AMQPSession, builtinParsers } from "@cloudamqp/amqp-client"
const session = await AMQPSession.connect(url, {
parsers: builtinParsers,
defaultContentType: "application/json",
})The factory function was effectively parsers: { ...builtinParsers, "text/csv": csvParser }Generic inference still works: Motivated by the boilerplate that stood out in https://github.com/84codes/training-tool/pull/216. Tradeoffs considered:
Constants won on simplicity: one import, no call, no allocation per connect. |
AMQPExchange.bind/unbind already accept `string | AMQPExchange`. Mirror that on AMQPQueue so users don't have to reach for `exchange.name` after working with the exchange handle. await queue.bind(exchange, "user-id") // now works
AMQPQueue.publish and AMQPExchange.publish now default `deliveryMode: 2` when not set, matching the Ruby client. The high-level API already defaults `confirm: true` — persistent-by-default is the consistent stance for an API that optimizes for "don't lose data." Explicit `deliveryMode: 1` is honored. Low-level AMQPChannel.basicPublish stays as-is. RPC client/server publish paths are not touched: requests/replies via amq.rabbitmq.reply-to are inherently transient.
Summary
Adds automatic message serialization/deserialization via parser/coder registries on
AMQPSession. Two-stage pipeline: serialization (content-type) then compression (content-encoding). Fully opt-in: no behavior change unlessparsers/codersare configured. Low-levelAMQPChannelAPI untouched.text/plain,application/json. Built-in coders:gzip,deflate(webCompressionStream, zero new dependencies).msg.bodyis typed by the parser-map generic (Uint8Array | nullin plain mode, parser-output union in codec mode).Modeled after the Ruby client's message codec system. Supersedes #181 (compression-only, added native deps, wrong abstraction layer).
High-level publish defaults
AMQPQueue.publishandAMQPExchange.publishdefault to:confirm: true— wait for broker ackdeliveryMode: 2— persistent (matches the Ruby client)Both are opt-out: pass
{ confirm: false }or{ deliveryMode: 1 }to override. The low-levelAMQPChannel.basicPublishis unchanged. RPC client/server paths stay transient sinceamq.rabbitmq.reply-tois inherently transient.Usage
Custom parser:
Types
ResolveBody<P, KP>= what publish methods accept. Resolves to the rich type from the matching parser when codecs are configured,PlainBodyotherwise.PlainBody= raw wire types only (string | Uint8Array | ArrayBuffer | Buffer | null).AMQPMessage<P>= generic message.msg.bodyisUint8Array | nullin plain mode; union of parser outputs plusUint8Array | nullin codec mode.msg._rawBytes=Uint8Array | null(always raw wire bytes,@internal).Typing approach
Users never write generics. Passing
parserstoconnect()infers a typed session; omitting them gives a plain session.Under the hood, four generic parameters propagate through Session, Queue, Exchange, Message, GeneratorSubscription, RPCClient, and RPCServer:
P extends ParserMap— the parser mapC extends CoderMap— the coder mapKP extends keyof P & string— the default content-type key (constrainsdefaultContentTypeto registered keys)KC extends keyof C & string— the default content-encoding keydefaultContentTypeis type-checked against the registered parser keys, so typos are compile errors. Type casts are confined to the decode boundary.Rejected alternatives
AMQPSession<true/false>): Same mechanics butAMQPSession<false>is cryptic.createParserRegistry({}, true)): The original shape; replaced withbuiltinParsers/builtinCodersconstants because the factory was almost an identity function plus a flag — clunky for the common "just enable JSON" case.AMQPMessagewith a decoded body. Doubled the message API surface.AMQPCodecSession extends AMQPSession): Duplicates method signatures, awkward factory pattern, inheritance is the wrong tool for a config flag.Test plan
ResolveBody<P, KP>, plain session restricts toPlainBody