Skip to content

Commit 4475334

Browse files
antondalgrenclaudebaelter
authored
Codec registry refactor (#198)
* WIP: Type-safe parser registry with ResolveBody and K defaults - Add createParserRegistry factory with typed ParserMap/Registry - Add ResolveBody<T, O> helper for content-type → body type cascade - Default K to never (no default contentType = PlainBody) - Make AMQPSessionOptions, AMQPSession, AMQPQueue generic on T/K - Add serializeBody with contentType constrained to parser keys - Update type-safety tests for ResolveBody and parser-aware queues - QueuePublishOptions now constrains contentType to registered parsers Known issues: downstream classes (Exchange, RPC) not yet threaded with T/K Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: AMQPParser<In, Out>, rename Registry to ParserRegistry, add JsonSerializable Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * style: apply prettier formatting fixes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add createCoderRegistry factory with typed registry Adds CoderMap/CoderRegistry types and a createCoderRegistry() factory parallel to createParserRegistry(), with optional built-in gzip/deflate coders via the useDefaults flag. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: standalone serializeAndEncode and decodeAndParse with sync fast path Extract logic from AMQPCodecRegistry into standalone exported functions that accept ParserRegistry/CoderRegistry objects directly. Returns a sync result when no coder is involved, or a Promise when encoding/decoding is needed. Also adds a standalone decodeMessage helper for use in Task 4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: replace CodecMode with P/C/KP/KC generics across all classes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: remove unnecessary any casts and eslint-disable comments Use ParserMap/CoderMap base types for standalone function parameters instead of ParserRegistry<any>/CoderRegistry<any>. This eliminates all type casts at call sites and ~20 eslint-disable comments. ResolveBody now uses InferParserInput<P[O]> instead of structural match. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: remove AMQPCodecRegistry class, export decodeAndParse and inference types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test: rewrite codec registry tests for standalone functions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: rewrite type-safety tests for P/C/KP/KC generic system Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: update session codec integration tests for new API Replace AMQPCodecRegistry/CodecMode with createCoderRegistry and P/C/KP/KC generics. Fix serializeBody call, unused properties params, q.publish leftover, and RPC handler return types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: update rawBody → _rawBytes in remaining test files, extend eslint config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: replace any with unknown/never in ParserMap types, format test.ts Method shorthand on AMQPParser gives bivariance, so AMQPParser<unknown, unknown> accepts narrower parsers. Inference helpers use unknown/never in the slot we discard so eslint no-explicit-any is satisfied without disable comments. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Anders Bälter <anders@84codes.com>
1 parent bbd9944 commit 4475334

18 files changed

Lines changed: 824 additions & 551 deletions

eslint.config.mjs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ export default tseslint.config(
1010
eslint.configs.recommended,
1111
tseslint.configs.recommended,
1212
prettierConfig,
13+
{
14+
files: ["src/**/*.ts", "test/**/*.ts", "test-browser/**/*.ts"],
15+
rules: {
16+
// {} is used extensively as a default for ParserMap/CoderMap generics
17+
"@typescript-eslint/no-empty-object-type": ["error", { allowObjectTypes: "always" }],
18+
},
19+
},
1320
{
1421
files: ["examples/*.js", "scripts/*.js"],
1522
languageOptions: {

src/amqp-base-client.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ export abstract class AMQPBaseClient {
668668
if (message) {
669669
message.bodySize = bodySize
670670
message.properties = properties
671-
message.rawBody = new Uint8Array(bodySize)
671+
message._rawBytes = new Uint8Array(bodySize)
672672
if (bodySize === 0) channel.onMessageReady(message)
673673
} else {
674674
this.logger?.warn("Header frame but no message")
@@ -677,9 +677,9 @@ export abstract class AMQPBaseClient {
677677
}
678678
case AMQPFrame.Type.BODY: {
679679
const message = channel.delivery || channel.getMessage || channel.returned
680-
if (message && message.rawBody) {
680+
if (message && message._rawBytes) {
681681
const bodyPart = new Uint8Array(view.buffer, view.byteOffset + i, frameSize)
682-
message.rawBody.set(bodyPart, message.bodyPos)
682+
message._rawBytes.set(bodyPart, message.bodyPos)
683683
message.bodyPos += frameSize
684684
i += frameSize
685685
if (message.bodyPos === message.bodySize) channel.onMessageReady(message)

src/amqp-channel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ export class AMQPChannel {
885885
* @param message
886886
*/
887887
onMessageReady(message: AMQPMessage): void {
888-
message.body = message.rawBody
888+
message.body = message._rawBytes
889889
if (this.delivery) {
890890
delete this.delivery
891891
this.deliver(message)

src/amqp-codec-registry.ts

Lines changed: 134 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,80 @@ import type { AMQPMessage } from "./amqp-message.js"
22
import type { AMQPProperties } from "./amqp-properties.js"
33
import { isPlainBody } from "./amqp-publisher.js"
44

5+
// 1. Define a type that represents a map of different Parsers
6+
export type ParserMap = {
7+
[K: string]: AMQPParser<unknown, unknown>
8+
}
9+
10+
// 2. The ParserRegistry type uses a Mapped Type to preserve the unique In/Out of each key
11+
export type ParserRegistry<T extends ParserMap> = {
12+
readonly [K in keyof T & string]: T[K]
13+
}
14+
15+
export type JsonSerializable =
16+
| string
17+
| number
18+
| boolean
19+
| null
20+
| JsonSerializable[]
21+
| { [key: string]: JsonSerializable }
22+
23+
type BuiltinParsers = {
24+
"text/plain": AMQPParser<string, string>
25+
"application/json": AMQPParser<JsonSerializable, unknown>
26+
}
27+
28+
// 3. The factory function
29+
export function createParserRegistry<T extends ParserMap>(parsers: T, useDefaultParsers?: false): ParserRegistry<T>
30+
export function createParserRegistry<T extends ParserMap>(
31+
parsers: T,
32+
useDefaultParsers: true,
33+
): ParserRegistry<T & BuiltinParsers>
34+
export function createParserRegistry<T extends ParserMap>(
35+
parsers: T,
36+
useDefaultParsers?: boolean,
37+
): ParserRegistry<T & BuiltinParsers> | ParserRegistry<T> {
38+
if (useDefaultParsers) {
39+
return { "text/plain": PlainParser, "application/json": JSONParser, ...parsers }
40+
}
41+
return parsers
42+
}
43+
44+
export type InferParserInput<P> = P extends AMQPParser<infer TInput, unknown> ? TInput : never
45+
export type InferParserOutput<P> = P extends AMQPParser<never, infer Out> ? Out : never
46+
47+
export type CoderMap = { [K: string]: AMQPCoder }
48+
export type CoderRegistry<T extends CoderMap> = { readonly [K in keyof T & string]: T[K] }
49+
50+
type BuiltinCoders = { gzip: AMQPCoder; deflate: AMQPCoder }
51+
52+
export function createCoderRegistry<T extends CoderMap>(coders: T, useDefaults?: false): CoderRegistry<T>
53+
export function createCoderRegistry<T extends CoderMap>(coders: T, useDefaults: true): CoderRegistry<T & BuiltinCoders>
54+
export function createCoderRegistry<T extends CoderMap>(
55+
coders: T,
56+
useDefaults?: boolean,
57+
): CoderRegistry<T & BuiltinCoders> | CoderRegistry<T> {
58+
if (useDefaults) {
59+
if (
60+
typeof CompressionStream === "undefined" ||
61+
typeof DecompressionStream === "undefined" ||
62+
typeof Blob === "undefined" ||
63+
typeof Response === "undefined"
64+
) {
65+
throw new Error(
66+
"Built-in coders require CompressionStream, DecompressionStream, Blob, and Response " +
67+
"(Node 18+, modern browsers). Register custom coders via createCoderRegistry() instead.",
68+
)
69+
}
70+
return { gzip: GzipCoder, deflate: DeflateCoder, ...coders }
71+
}
72+
return coders
73+
}
74+
575
/** Handles serialization/deserialization based on content-type. */
6-
export interface AMQPParser<T = unknown> {
7-
serialize(body: T, properties: AMQPProperties): Uint8Array
8-
parse(body: Uint8Array, properties: AMQPProperties): T
76+
export interface AMQPParser<In = unknown, Out = unknown> {
77+
serialize(body: In, properties: AMQPProperties): Uint8Array
78+
parse(body: Uint8Array, properties: AMQPProperties): Out
979
}
1080

1181
/** Handles compression/decompression based on content-encoding. */
@@ -22,7 +92,7 @@ function toBytes(data: string | Uint8Array | ArrayBuffer | Buffer | null): Uint8
2292
return new Uint8Array(data)
2393
}
2494

25-
const PlainParser: AMQPParser<string> = {
95+
const PlainParser: AMQPParser<string, string> = {
2696
serialize(body: string): Uint8Array {
2797
return new TextEncoder().encode(String(body))
2898
},
@@ -31,8 +101,8 @@ const PlainParser: AMQPParser<string> = {
31101
},
32102
}
33103

34-
const JSONParser: AMQPParser = {
35-
serialize(body: unknown): Uint8Array {
104+
const JSONParser: AMQPParser<JsonSerializable, unknown> = {
105+
serialize(body: JsonSerializable): Uint8Array {
36106
return new TextEncoder().encode(JSON.stringify(body))
37107
},
38108
parse(body: Uint8Array): unknown {
@@ -68,156 +138,78 @@ const DeflateCoder: AMQPCoder = {
68138
},
69139
}
70140

71-
/**
72-
* Registry for message parsers (content-type) and coders (content-encoding).
73-
*
74-
* Built-in parsers: `text/plain`, `application/json`.
75-
* Built-in coders: `gzip`, `deflate`.
76-
*
77-
* @example
78-
* ```ts
79-
* const codecs = new AMQPCodecRegistry()
80-
* codecs.enableBuiltinCodecs()
81-
* const session = await AMQPSession.connect(url, {
82-
* codecs,
83-
* defaultContentType: "application/json",
84-
* })
85-
* ```
86-
*/
87-
export class AMQPCodecRegistry {
88-
private readonly parsers = new Map<string, AMQPParser>()
89-
private readonly coders = new Map<string, AMQPCoder>()
90-
91-
registerParser<T>(contentType: string, parser: AMQPParser<T>): this {
92-
this.parsers.set(contentType, parser)
93-
return this
94-
}
95-
96-
registerCoder(contentEncoding: string, coder: AMQPCoder): this {
97-
this.coders.set(contentEncoding, coder)
98-
return this
99-
}
100-
101-
findParser(contentType: string): AMQPParser | undefined {
102-
return this.parsers.get(contentType)
103-
}
104-
105-
findCoder(contentEncoding: string): AMQPCoder | undefined {
106-
return this.coders.get(contentEncoding)
107-
}
108-
109-
enableBuiltinParsers(): this {
110-
this.parsers.set("text/plain", PlainParser)
111-
this.parsers.set("application/json", JSONParser)
112-
return this
113-
}
114-
115-
enableBuiltinCoders(): this {
116-
if (
117-
typeof CompressionStream === "undefined" ||
118-
typeof DecompressionStream === "undefined" ||
119-
typeof Blob === "undefined" ||
120-
typeof Response === "undefined"
121-
) {
122-
throw new Error(
123-
"Built-in coders require CompressionStream, DecompressionStream, Blob, and Response " +
124-
"(Node 18+, modern browsers). Register custom coders via registerCoder() instead.",
125-
)
126-
}
127-
this.coders.set("gzip", GzipCoder)
128-
this.coders.set("deflate", DeflateCoder)
129-
return this
130-
}
131-
132-
enableBuiltinCodecs(): this {
133-
this.enableBuiltinParsers()
134-
this.enableBuiltinCoders()
135-
return this
136-
}
137-
138-
/**
139-
* Serialize and encode a body for publishing.
140-
* Returns the transformed body and updated properties.
141-
*/
142-
async serializeAndEncode(
143-
body: unknown,
144-
properties: AMQPProperties,
145-
defaults?: { contentType?: string; contentEncoding?: string },
146-
): Promise<{ body: Uint8Array; properties: AMQPProperties }> {
147-
const props = { ...properties }
148-
if (defaults?.contentType && !props.contentType) props.contentType = defaults.contentType
149-
if (defaults?.contentEncoding && !props.contentEncoding) props.contentEncoding = defaults.contentEncoding
150-
151-
let bytes: Uint8Array
152-
if (props.contentType) {
153-
const parser = this.parsers.get(props.contentType)
154-
if (parser) {
155-
bytes = parser.serialize(body, props)
156-
} else if (isPlainBody(body)) {
157-
bytes = toBytes(body)
158-
} else {
159-
throw new Error(
160-
`No parser registered for content-type "${props.contentType}" and body is not a string/Buffer/Uint8Array. ` +
161-
`Register a parser via registerParser() or use enableBuiltinParsers().`,
162-
)
163-
}
141+
export function serializeAndEncode(
142+
parsers: ParserMap,
143+
coders: CoderMap,
144+
body: unknown,
145+
properties: AMQPProperties,
146+
defaults?: { contentType?: string; contentEncoding?: string },
147+
): Promise<{ body: Uint8Array; properties: AMQPProperties }> | { body: Uint8Array; properties: AMQPProperties } {
148+
const props = { ...properties }
149+
if (defaults?.contentType && !props.contentType) props.contentType = defaults.contentType
150+
if (defaults?.contentEncoding && !props.contentEncoding) props.contentEncoding = defaults.contentEncoding
151+
152+
let bytes: Uint8Array
153+
if (props.contentType) {
154+
const parser = parsers[props.contentType]
155+
if (parser) {
156+
bytes = parser.serialize(body, props)
164157
} else if (isPlainBody(body)) {
165158
bytes = toBytes(body)
166159
} else {
167160
throw new Error(
168-
"Cannot serialize body: no contentType specified and body is not a string/Buffer/Uint8Array. " +
169-
"Set contentType or configure a defaultContentType on the session.",
161+
`No parser registered for content-type "${props.contentType}" and body is not a string/Buffer/Uint8Array.`,
170162
)
171163
}
164+
} else if (isPlainBody(body)) {
165+
bytes = toBytes(body)
166+
} else {
167+
throw new Error(
168+
"Cannot serialize body: no contentType specified and body is not a string/Buffer/Uint8Array. " +
169+
"Set contentType or configure a defaultContentType on the session.",
170+
)
171+
}
172172

173-
if (props.contentEncoding) {
174-
const coder = this.coders.get(props.contentEncoding)
175-
if (!coder) {
176-
throw new Error(
177-
`No coder registered for content-encoding "${props.contentEncoding}". ` +
178-
`Register a coder via registerCoder() or use enableBuiltinCoders().`,
179-
)
180-
}
181-
bytes = await coder.encode(bytes, props)
173+
if (props.contentEncoding) {
174+
const coder = coders[props.contentEncoding]
175+
if (!coder) {
176+
throw new Error(`No coder registered for content-encoding "${props.contentEncoding}".`)
182177
}
183-
184-
return { body: bytes, properties: props }
178+
return coder.encode(bytes, props).then((encoded: Uint8Array) => ({ body: encoded, properties: props }))
185179
}
186180

187-
/**
188-
* Decode and parse a message body.
189-
* Reverses the encoding pipeline: decompress then deserialize.
190-
*/
191-
async decodeAndParse(body: Uint8Array, properties: AMQPProperties): Promise<unknown> {
192-
let bytes = body
193-
194-
if (properties.contentEncoding) {
195-
const coder = this.coders.get(properties.contentEncoding)
196-
if (!coder) {
197-
throw new Error(
198-
`No coder registered for content-encoding "${properties.contentEncoding}". ` +
199-
`Register a coder via registerCoder() or use enableBuiltinCoders().`,
200-
)
201-
}
202-
bytes = await coder.decode(bytes, properties)
203-
}
181+
return { body: bytes, properties: props }
182+
}
204183

205-
if (properties.contentType) {
206-
const parser = this.parsers.get(properties.contentType)
207-
if (parser) {
208-
return parser.parse(bytes, properties)
209-
}
184+
export function decodeAndParse(
185+
parsers: ParserMap,
186+
coders: CoderMap,
187+
body: Uint8Array,
188+
properties: AMQPProperties,
189+
): Promise<unknown> | unknown {
190+
if (properties.contentEncoding) {
191+
const coder = coders[properties.contentEncoding]
192+
if (!coder) {
193+
throw new Error(`No coder registered for content-encoding "${properties.contentEncoding}".`)
210194
}
195+
return coder.decode(body, properties).then((decoded: Uint8Array) => {
196+
if (properties.contentType) {
197+
const parser = parsers[properties.contentType]
198+
if (parser) return parser.parse(decoded, properties)
199+
}
200+
return decoded
201+
})
202+
}
211203

212-
return bytes
204+
if (properties.contentType) {
205+
const parser = parsers[properties.contentType]
206+
if (parser) return parser.parse(body, properties)
213207
}
208+
return body
209+
}
214210

215-
/** Decode a message body, replacing the raw bytes on `msg.body`. */
216-
async decodeMessage(msg: AMQPMessage): Promise<void> {
217-
if (msg.rawBody) {
218-
// After decoding, body holds the parsed value (not raw bytes).
219-
// The caller is responsible for presenting the correct generic to consumers.
220-
;(msg as { body: unknown }).body = await this.decodeAndParse(msg.rawBody, msg.properties)
221-
}
211+
export async function decodeMessage(msg: AMQPMessage, parsers: ParserMap, coders: CoderMap): Promise<void> {
212+
if (msg._rawBytes) {
213+
;(msg as { body: unknown }).body = await decodeAndParse(parsers, coders, msg._rawBytes, msg.properties)
222214
}
223215
}

0 commit comments

Comments
 (0)