¿Cómo obtener mensajes de una cola de Bus de servicio de Azure en modo “PeekLock” usando AMQP?

Estamos intentando consumir Azure Service Bus en una aplicación Node. Nuestro requisito es recuperar varios mensajes de una cola .

Como Azure SDK for Node no admite la recuperación por lotes, decidimos usar AMQP. Si bien podemos recuperar mensajes usando Peek Messages como se describe aquí ( https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations ).

Lo que estamos notando es que tan pronto como se recuperan los mensajes, se eliminan de la cola. Me pregunto si alguien tiene una idea de cómo podemos obtener mensajes en modo “PeekLock” usando AMQP y Node. Para AMQP, estamos utilizando el paquete de nodo amqp10 ( https://www.npmjs.com/package/amqp10 ).

Aquí está nuestro código para echar un vistazo a los mensajes:

const AMQPClient = require('amqp10/lib').Client, Policy = require('amqp10/lib').Policy; const protocol = 'amqps'; const keyName = 'RootManageSharedAccessKey'; const sasKey = 'My Shared Access Key' const serviceBusHost = 'account-name.servicebus.windows.net'; const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost; const queueName = 'test1'; var client = new AMQPClient(Policy.ServiceBusQueue); client.connect(uri) .then(function () { return Promise.all([ client.createReceiver(queueName), client.createSender(queueName) ]); }) .spread(function(receiver, sender) { console.log(receiver); console.log(sender); console.log('--------------------------------------------------------------------------'); receiver.on('errorReceived', function(err) { // check for errors console.log(err); }); receiver.on('message', function(message) { console.log('Received message'); console.log(message); console.log('------------------------------------'); }); return sender.send([], { operation: 'com.microsoft:peek-message', 'message-count': 5 }); }) .error(function (e) { console.warn('connection error: ', e); }); 

Por defecto, el receptor funciona en modo de autoajuste , debe cambiarlo para establecer la disposición :

 const { Constants } = require('amqp10') // // ...create client, connect, etc... // // Second parameter of createReceiver method enables overwriting policy parameters const receiver = client.createReceiver(queueName, { attach: { rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition } }) 

No olvide aceptar / rechazar / liberar un mensaje después de procesarlo:

 receiver.on('message', msg => { // // ...do something smart with a message... // receiver.accept(msg) // <- manually settle a message }) 
    Intereting Posts