@probitas/client-rabbitmq
RabbitMQ client for Probitas scenario testing framework.
This package provides a RabbitMQ client designed for integration testing of message-driven applications.
Features
- Queue Operations: Declare, bind, purge, and delete queues
- Exchange Operations: Declare and delete exchanges (direct, topic, fanout, headers)
- Publishing: Publish messages with routing keys and headers
- Consuming: Consume messages with acknowledgment support
- Resource Management: Implements
AsyncDisposablefor proper cleanup
Installation
deno add jsr:@probitas/client-rabbitmq
Quick Start
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
// Using string URL
const client = await createRabbitMqClient({
url: "amqp://localhost:5672",
});
// Or using connection config object
const client2 = await createRabbitMqClient({
url: {
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
},
});
// Create a channel
const channel = await client.channel();
// Declare a queue
await channel.assertQueue("test-queue", { durable: true });
// Publish a message
const content = new TextEncoder().encode("Hello, World!");
await channel.sendToQueue("test-queue", content, {
contentType: "text/plain",
});
// Consume messages
for await (const msg of channel.consume("test-queue")) {
console.log("Received:", new TextDecoder().decode(msg.content));
await channel.ack(msg);
break;
}
await client.close();
Exchange and Binding
// Declare an exchange
await channel.assertExchange("events", "topic", { durable: true });
// Declare a queue and bind to exchange
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");
// Publish to exchange with routing key
const content = new TextEncoder().encode(JSON.stringify({ id: 1, name: "Alice" }));
await channel.publish("events", "user.created", content, {
contentType: "application/json",
});
Using with using Statement
await using client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();
await channel.assertQueue("test");
// Client automatically closed when block exits
Related Packages
| Package | Description |
|---|---|
@probitas/client |
Core utilities and types |
@probitas/client-sqs |
AWS SQS client |
Links
Installation
deno add jsr:@probitas/client-rabbitmqClasses
#RabbitMqChannelError
class RabbitMqChannelError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ channel operation fails.
Constructor
new RabbitMqChannelError(message: string, options?: RabbitMqChannelErrorOptions)Properties
- readonly
namestring - readonly
kind"channel" - readonly
channelId?number
#RabbitMqConnectionError
class RabbitMqConnectionError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ connection cannot be established.
Constructor
new RabbitMqConnectionError(message: string, options?: RabbitMqErrorOptions)Properties
- readonly
namestring - readonly
kind"connection"
#RabbitMqError
class RabbitMqError extends ClientErrorClientErrorBase error class for RabbitMQ client errors.
Constructor
new RabbitMqError(message: string, _: unknown, options?: RabbitMqErrorOptions)Properties
- readonly
namestring - readonly
code?number
#RabbitMqNotFoundError
class RabbitMqNotFoundError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ resource (queue or exchange) is not found.
Constructor
new RabbitMqNotFoundError(message: string, options: RabbitMqNotFoundErrorOptions)Properties
- readonly
namestring - readonly
kind"not_found" - readonly
resourcestring
#RabbitMqPreconditionFailedError
class RabbitMqPreconditionFailedError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ precondition check fails.
Constructor
new RabbitMqPreconditionFailedError(message: string, options: RabbitMqPreconditionFailedErrorOptions)Properties
- readonly
namestring - readonly
kind"precondition_failed" - readonly
reasonstring
Interfaces
#RabbitMqAckResult
interface RabbitMqAckResultAck/Nack result.
Properties
- readonly
type"rabbitmq:ack" - readonly
okboolean - readonly
durationnumber
#RabbitMqChannel
interface RabbitMqChannel extends AsyncDisposableRabbitMQ channel interface.
| Name | Description |
|---|---|
assertExchange() | — |
deleteExchange() | — |
assertQueue() | — |
deleteQueue() | — |
purgeQueue() | — |
bindQueue() | — |
unbindQueue() | — |
publish() | — |
sendToQueue() | — |
get() | — |
consume() | — |
ack() | — |
nack() | — |
reject() | — |
prefetch() | — |
close() | — |
Methods
assertExchange(name: string, type: RabbitMqExchangeType, options?: RabbitMqExchangeOptions): Promise<RabbitMqExchangeResult>Parameters
namestringoptions?RabbitMqExchangeOptions
deleteExchange(name: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>Parameters
namestringoptions?CommonOptions
assertQueue(name: string, options?: RabbitMqQueueOptions): Promise<RabbitMqQueueResult>Parameters
namestringoptions?RabbitMqQueueOptions
deleteQueue(name: string, options?: CommonOptions): Promise<RabbitMqQueueResult>Parameters
namestringoptions?CommonOptions
purgeQueue(name: string, options?: CommonOptions): Promise<RabbitMqQueueResult>Parameters
namestringoptions?CommonOptions
bindQueue(queue: string, exchange: string, routingKey: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>Parameters
queuestringexchangestringroutingKeystringoptions?CommonOptions
unbindQueue(queue: string, exchange: string, routingKey: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>Parameters
queuestringexchangestringroutingKeystringoptions?CommonOptions
publish(exchange: string, routingKey: string, content: Uint8Array, options?: RabbitMqPublishOptions): Promise<RabbitMqPublishResult>Parameters
exchangestringroutingKeystringcontentUint8Arrayoptions?RabbitMqPublishOptions
sendToQueue(queue: string, content: Uint8Array, options?: RabbitMqPublishOptions): Promise<RabbitMqPublishResult>Parameters
queuestringcontentUint8Arrayoptions?RabbitMqPublishOptions
get(queue: string, options?: CommonOptions): Promise<RabbitMqConsumeResult>Parameters
queuestringoptions?CommonOptions
consume(queue: string, options?: RabbitMqConsumeOptions): AsyncIterable<RabbitMqMessage>Parameters
queuestringoptions?RabbitMqConsumeOptions
ack(message: RabbitMqMessage, options?: CommonOptions): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessageoptions?CommonOptions
nack(message: RabbitMqMessage, options?: RabbitMqNackOptions): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessageoptions?RabbitMqNackOptions
reject(message: RabbitMqMessage, requeue?: boolean): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessagerequeue?boolean
prefetch(count: number): Promise<void>Parameters
countnumber
close(): Promise<void>#RabbitMqChannelErrorOptions
interface RabbitMqChannelErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ channel errors.
| Name | Description |
|---|---|
channelId | — |
Properties
- readonly
channelId?number
#RabbitMqClient
interface RabbitMqClient extends AsyncDisposableRabbitMQ client interface.
Properties
Methods
channel(): Promise<RabbitMqChannel>close(): Promise<void>#RabbitMqClientConfig
interface RabbitMqClientConfig extends CommonOptionsRabbitMQ client configuration.
| Name | Description |
|---|---|
url | RabbitMQ connection URL or configuration object. |
heartbeat | Heartbeat interval in seconds |
prefetch | Default prefetch count for channels |
Properties
RabbitMQ connection URL or configuration object.
- readonly
heartbeat?numberHeartbeat interval in seconds
- readonly
prefetch?numberDefault prefetch count for channels
#RabbitMqConnectionConfig
interface RabbitMqConnectionConfig extends CommonConnectionConfigRabbitMQ connection configuration.
Extends CommonConnectionConfig with RabbitMQ-specific options.
| Name | Description |
|---|---|
vhost | Virtual host. |
Properties
- readonly
vhost?stringVirtual host.
#RabbitMqConsumeOptions
interface RabbitMqConsumeOptions extends CommonOptionsConsume options.
Properties
- readonly
noAck?boolean - readonly
exclusive?boolean - readonly
priority?number
#RabbitMqConsumeResult
interface RabbitMqConsumeResultConsume result (single message retrieval).
Properties
- readonly
type"rabbitmq:consume" - readonly
okboolean - readonly
durationnumber
#RabbitMqErrorOptions
interface RabbitMqErrorOptions extends ErrorOptionsOptions for RabbitMQ errors.
| Name | Description |
|---|---|
code | — |
Properties
- readonly
code?number
#RabbitMqExchangeOptions
interface RabbitMqExchangeOptions extends CommonOptionsExchange options.
| Name | Description |
|---|---|
durable | — |
autoDelete | — |
internal | — |
arguments | — |
Properties
- readonly
durable?boolean - readonly
autoDelete?boolean - readonly
internal?boolean - readonly
arguments?Record<string, unknown>
#RabbitMqExchangeResult
interface RabbitMqExchangeResultExchange declaration result.
Properties
- readonly
type"rabbitmq:exchange" - readonly
okboolean - readonly
durationnumber
#RabbitMqMessage
interface RabbitMqMessageRabbitMQ message.
| Name | Description |
|---|---|
content | — |
properties | — |
fields | — |
Properties
- readonly
contentUint8Array
#RabbitMqMessageFields
interface RabbitMqMessageFieldsRabbitMQ message fields.
| Name | Description |
|---|---|
deliveryTag | — |
redelivered | — |
exchange | — |
routingKey | — |
Properties
- readonly
deliveryTagbigint - readonly
redeliveredboolean - readonly
exchangestring - readonly
routingKeystring
#RabbitMqMessageProperties
interface RabbitMqMessagePropertiesRabbitMQ message properties.
| Name | Description |
|---|---|
contentType | — |
contentEncoding | — |
headers | — |
deliveryMode | 1: non-persistent, 2: persistent |
priority | — |
correlationId | — |
replyTo | — |
expiration | — |
messageId | — |
timestamp | — |
type | — |
userId | — |
appId | — |
Properties
- readonly
contentType?string - readonly
contentEncoding?string - readonly
headers?Record<string, unknown> - readonly
deliveryMode?1 | 21: non-persistent, 2: persistent
- readonly
priority?number - readonly
correlationId?string - readonly
replyTo?string - readonly
expiration?string - readonly
messageId?string - readonly
timestamp?number - readonly
type?string - readonly
userId?string - readonly
appId?string
#RabbitMqNackOptions
interface RabbitMqNackOptions extends CommonOptionsNack options.
Properties
- readonly
requeue?boolean - readonly
allUpTo?boolean
#RabbitMqNotFoundErrorOptions
interface RabbitMqNotFoundErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ not found errors.
| Name | Description |
|---|---|
resource | — |
Properties
- readonly
resourcestring
#RabbitMqPreconditionFailedErrorOptions
interface RabbitMqPreconditionFailedErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ precondition failed errors.
| Name | Description |
|---|---|
reason | — |
Properties
- readonly
reasonstring
#RabbitMqPublishOptions
interface RabbitMqPublishOptions extends CommonOptionsPublish options.
| Name | Description |
|---|---|
persistent | — |
contentType | — |
contentEncoding | — |
headers | — |
correlationId | — |
replyTo | — |
expiration | — |
messageId | — |
priority | — |
Properties
- readonly
persistent?boolean - readonly
contentType?string - readonly
contentEncoding?string - readonly
headers?Record<string, unknown> - readonly
correlationId?string - readonly
replyTo?string - readonly
expiration?string - readonly
messageId?string - readonly
priority?number
#RabbitMqPublishResult
interface RabbitMqPublishResultPublish result.
Properties
- readonly
type"rabbitmq:publish" - readonly
okboolean - readonly
durationnumber
#RabbitMqQueueOptions
interface RabbitMqQueueOptions extends CommonOptionsQueue options.
| Name | Description |
|---|---|
durable | — |
exclusive | — |
autoDelete | — |
arguments | — |
messageTtl | — |
maxLength | — |
deadLetterExchange | — |
deadLetterRoutingKey | — |
Properties
- readonly
durable?boolean - readonly
exclusive?boolean - readonly
autoDelete?boolean - readonly
arguments?Record<string, unknown> - readonly
messageTtl?number - readonly
maxLength?number - readonly
deadLetterExchange?string - readonly
deadLetterRoutingKey?string
#RabbitMqQueueResult
interface RabbitMqQueueResultQueue declaration result.
| Name | Description |
|---|---|
type | — |
ok | — |
queue | — |
messageCount | — |
consumerCount | — |
duration | — |
Properties
- readonly
type"rabbitmq:queue" - readonly
okboolean - readonly
queuestring - readonly
messageCountnumber - readonly
consumerCountnumber - readonly
durationnumber
Functions
#createRabbitMqClient
async function createRabbitMqClient(config: RabbitMqClientConfig): Promise<RabbitMqClient>Create a new RabbitMQ client instance.
The client provides queue and exchange management, message publishing and consumption, and acknowledgment handling via AMQP protocol.
Parameters
configRabbitMqClientConfig- RabbitMQ client configuration
Returns
Promise<RabbitMqClient> — A promise resolving to a new RabbitMQ client instance
Examples
Basic usage with string URL
const rabbit = await createRabbitMqClient({
url: "amqp://guest:guest@localhost:5672",
});
const channel = await rabbit.channel();
await channel.assertQueue("my-queue", { durable: true });
const content = new TextEncoder().encode(JSON.stringify({ type: "ORDER" }));
await channel.sendToQueue("my-queue", content, { persistent: true });
await channel.close();
await rabbit.close();
With connection config object
const rabbit = await createRabbitMqClient({
url: {
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
},
});
Exchange and binding
// Create exchange and queue
await channel.assertExchange("events", "topic", { durable: true });
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");
// Publish to exchange
await channel.publish("events", "user.created", content);
Consuming messages
for await (const message of channel.consume("my-queue")) {
console.log("Received:", new TextDecoder().decode(message.content));
await channel.ack(message);
}
Get single message (polling)
const result = await channel.get("my-queue");
if (result.message) {
await channel.ack(result.message);
}
Using await using for automatic cleanup
await using rabbit = await createRabbitMqClient({
url: "amqp://localhost:5672",
});
const channel = await rabbit.channel();
// Client automatically closed when scope exits
Type Aliases
#RabbitMqExchangeType
type RabbitMqExchangeType = "direct" | "topic" | "fanout" | "headers"Exchange type.
#RabbitMqResult
type RabbitMqResult = RabbitMqPublishResult | RabbitMqConsumeResult | RabbitMqAckResult | RabbitMqQueueResult | RabbitMqExchangeResultUnion of all RabbitMQ result types.
