@@ -147,6 +147,18 @@ test('can cancel a consumer', t => {
147147 . then ( ( channel ) => t . is ( channel . consumers . size , 0 ) )
148148} )
149149
150+ test ( 'will clear consumer wait timeout on cancel' , async t => {
151+ const amqp = new AMQPClient ( "amqp://127.0.0.1" )
152+ const conn = await amqp . connect ( )
153+ const ch = await conn . channel ( )
154+ const q = await ch . queue ( "" )
155+ const consumer = await q . subscribe ( { noAck : false } , ( ) => "" )
156+ const wait = consumer . wait ( 1000 ) ;
157+ consumer . cancel ( )
158+ const ok = await wait
159+ t . is ( ok , undefined )
160+ } )
161+
150162test ( 'can close a channel' , async t => {
151163 const amqp = new AMQPClient ( "amqp://127.0.0.1" )
152164 const conn = await amqp . connect ( )
@@ -534,3 +546,31 @@ test("can set frameMax", async t => {
534546test ( "can't set too small frameMax" , t => {
535547 t . throws ( ( ) => new AMQPClient ( "amqp://127.0.0.1?frameMax=" + 16 ) )
536548} )
549+
550+ test ( "can handle frames split over socket reads" , async t => {
551+ const amqp = new AMQPClient ( "amqp://127.0.0.1?frameMax=" + 4 * 1024 )
552+ const conn = await amqp . connect ( )
553+ const ch = await conn . channel ( )
554+ const q = await ch . queue ( "" )
555+ const body = "a" . repeat ( 5 )
556+ const msgs = 100000
557+ for ( let i = 0 ; i < msgs ; i ++ ) {
558+ await q . publish ( body )
559+ }
560+ let i = 0
561+ const consumer = await q . subscribe ( { noAck : true } , ( ) => { if ( ++ i === msgs ) consumer . cancel ( ) } )
562+ await consumer . wait ( 5000 )
563+ t . is ( i , msgs )
564+ } )
565+
566+ test ( "have to connect socket before opening channels" , async t => {
567+ const amqp = new AMQPClient ( "amqp://127.0.0.1" )
568+ await t . throwsAsync ( ( ) => amqp . channel ( ) , { message : / n o t c o n n e c t e d / } )
569+ } )
570+
571+ test ( "will raise if socket is closed on send" , async t => {
572+ const amqp = new AMQPClient ( "amqp://127.0.0.1" )
573+ const conn = await amqp . connect ( )
574+ if ( amqp . socket ) amqp . socket . destroy ( )
575+ await t . throwsAsync ( ( ) => conn . channel ( ) )
576+ } )
0 commit comments