RabbitMQ amqplib Code Snippets
1. Safe Connection Setup
import amqp from 'amqplib';
const connection = await amqp.connect(process.env.RABBITMQ_URL);
const channel = await connection.createChannel();
// Define DLX (Dead Letter Exchange)
await channel.assertExchange('dlx', 'direct', { durable: true });
await channel.assertQueue('jobs.failed', { durable: true });
await channel.bindQueue('jobs.failed', 'dlx', 'jobs.routing-key');
2. Declaring a Reliable Work Queue
// Declare main queue with DLX bindings
await channel.assertQueue('emails', {
durable: true, // Survives RabbitMQ broker restarts
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'jobs.routing-key'
}
});
3. Worker Consumer Implementation
// Limit unacknowledged messages to avoid server resource exhaustion
channel.prefetch(1);
channel.consume('emails', async (msg) => {
if (!msg) return;
try {
const jobData = JSON.parse(msg.content.toString());
await processEmail(jobData);
// Acknowledge successful execution (removes from queue)
channel.ack(msg);
} catch (err) {
console.error('Job failed:', err);
// NACK with requeue=false (moves message to the DLX queue)
channel.nack(msg, false, false);
}
});