@@ -226,6 +226,63 @@ test('closed socket closes client', async () => {
226226 expect ( amqp . closed ) . toBe ( true )
227227} )
228228
229+ test ( 'connection loss closes channels and consumers' , async ( ) => {
230+ const amqp = getNewClient ( )
231+ const conn = await amqp . connect ( )
232+ const ch = await conn . channel ( )
233+ const q = await ch . queue ( "" )
234+
235+ const consumer = await q . subscribe ( { noAck : false } , ( ) => { } )
236+
237+ // Set up error handler to track when consumer is closed
238+ const originalConsumerWait = consumer . wait ( )
239+
240+ const socket = amqp [ "socket" ]
241+ assert ( socket , "Socket must be created" )
242+
243+ // Simulate unclean connection loss by closing the socket
244+ const closed = new Promise ( ( resolve ) => socket . addEventListener ( 'close' , resolve ) )
245+ socket . close ( )
246+ await closed
247+
248+ // Check that connection, channel, and consumer are all marked as closed
249+ expect ( amqp . closed ) . toBe ( true )
250+ expect ( ch . closed ) . toBe ( true )
251+
252+ // Consumer wait should reject with an error
253+ await expect ( originalConsumerWait ) . rejects . toThrow ( )
254+
255+ // Verify that operations on closed objects throw errors
256+ await expect ( ch . queue ( ) ) . rejects . toThrow ( / c l o s e d / )
257+ await expect ( q . publish ( "test" ) ) . rejects . toThrow ( )
258+ } )
259+
260+ test ( 'connection loss triggers onerror callback' , async ( ) => {
261+ const amqp = getNewClient ( )
262+ const conn = await amqp . connect ( )
263+
264+ let errorReceived : AMQPError | null = null
265+ conn . onerror = vi . fn ( ( err : AMQPError ) => {
266+ errorReceived = err
267+ } )
268+
269+ const socket = amqp [ "socket" ]
270+ assert ( socket , "Socket must be created" )
271+
272+ // Simulate unclean connection loss
273+ const closed = new Promise ( ( resolve ) => socket . addEventListener ( 'close' , resolve ) )
274+ socket . close ( )
275+ await closed
276+
277+ // Check that error callback was called
278+ expect ( conn . onerror ) . toHaveBeenCalled ( )
279+ expect ( errorReceived ) . toBeTruthy ( )
280+ if ( errorReceived ) {
281+ expect ( ( errorReceived as AMQPError ) . message ) . toMatch ( / c o n n e c t i o n n o t c l e a n l y c l o s e d / )
282+ }
283+ expect ( amqp . closed ) . toBe ( true )
284+ } )
285+
229286test ( 'wait for publish confirms' , async ( ) => {
230287 const amqp = getNewClient ( )
231288 const conn = await amqp . connect ( )
0 commit comments