@probitas/client-rabbitmq

RabbitMQ client for Probitas scenario testing framework.

This package provides a RabbitMQ client designed for integration testing of message-driven applications.

Features

  • Queue Operations: Declare, bind, purge, and delete queues
  • Exchange Operations: Declare and delete exchanges (direct, topic, fanout, headers)
  • Publishing: Publish messages with routing keys and headers
  • Consuming: Consume messages with acknowledgment support
  • Resource Management: Implements AsyncDisposable for proper cleanup

Installation

deno add jsr:@probitas/client-rabbitmq

Quick Start

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

// Using string URL
const client = await createRabbitMqClient({
  url: "amqp://localhost:5672",
});

// Or using connection config object
const client2 = await createRabbitMqClient({
  url: {
    host: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
  },
});

// Create a channel
const channel = await client.channel();

// Declare a queue
await channel.assertQueue("test-queue", { durable: true });

// Publish a message
const content = new TextEncoder().encode("Hello, World!");
await channel.sendToQueue("test-queue", content, {
  contentType: "text/plain",
});

// Consume messages
for await (const msg of channel.consume("test-queue")) {
  console.log("Received:", new TextDecoder().decode(msg.content));
  await channel.ack(msg);
  break;
}

await client.close();

Exchange and Binding

// Declare an exchange
await channel.assertExchange("events", "topic", { durable: true });

// Declare a queue and bind to exchange
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");

// Publish to exchange with routing key
const content = new TextEncoder().encode(JSON.stringify({ id: 1, name: "Alice" }));
await channel.publish("events", "user.created", content, {
  contentType: "application/json",
});

Using with using Statement

await using client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();

await channel.assertQueue("test");
// Client automatically closed when block exits
Package Description
@probitas/client Core utilities and types
@probitas/client-sqs AWS SQS client

Installation

deno add jsr:@probitas/client-rabbitmq

Classes

class

#RabbitMqChannelError

class RabbitMqChannelError extends RabbitMqError

Error thrown when a RabbitMQ channel operation fails.

NameDescription
name
kind
channelId
Constructor
new RabbitMqChannelError(message: string, options?: RabbitMqChannelErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"channel"
  • readonlychannelId?number
class

#RabbitMqConnectionError

class RabbitMqConnectionError extends RabbitMqError

Error thrown when a RabbitMQ connection cannot be established.

NameDescription
name
kind
Constructor
new RabbitMqConnectionError(message: string, options?: RabbitMqErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"connection"
class

#RabbitMqError

class RabbitMqError extends ClientError

Base error class for RabbitMQ client errors.

NameDescription
name
code
Constructor
new RabbitMqError(message: string, _: unknown, options?: RabbitMqErrorOptions)
Properties
  • readonlynamestring
  • readonlycode?number
class

#RabbitMqNotFoundError

class RabbitMqNotFoundError extends RabbitMqError

Error thrown when a RabbitMQ resource (queue or exchange) is not found.

NameDescription
name
kind
resource
Constructor
new RabbitMqNotFoundError(message: string, options: RabbitMqNotFoundErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"not_found"
  • readonlyresourcestring
class

#RabbitMqPreconditionFailedError

class RabbitMqPreconditionFailedError extends RabbitMqError

Error thrown when a RabbitMQ precondition check fails.

NameDescription
name
kind
reason
Constructor
new RabbitMqPreconditionFailedError(message: string, options: RabbitMqPreconditionFailedErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"precondition_failed"
  • readonlyreasonstring

Interfaces

interface

#RabbitMqAckResult

interface RabbitMqAckResult

Ack/Nack result.

NameDescription
type
ok
duration
Properties
  • readonlytype"rabbitmq:ack"
  • readonlyokboolean
  • readonlydurationnumber
interface

#RabbitMqChannel

interface RabbitMqChannel extends AsyncDisposable

RabbitMQ channel interface.

Methods
assertExchange(name: string, type: RabbitMqExchangeType, options?: RabbitMqExchangeOptions): Promise<RabbitMqExchangeResult>
Parameters
deleteExchange(name: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>
Parameters
assertQueue(name: string, options?: RabbitMqQueueOptions): Promise<RabbitMqQueueResult>
Parameters
deleteQueue(name: string, options?: CommonOptions): Promise<RabbitMqQueueResult>
Parameters
purgeQueue(name: string, options?: CommonOptions): Promise<RabbitMqQueueResult>
Parameters
bindQueue(queue: string, exchange: string, routingKey: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>
Parameters
unbindQueue(queue: string, exchange: string, routingKey: string, options?: CommonOptions): Promise<RabbitMqExchangeResult>
Parameters
publish(exchange: string, routingKey: string, content: Uint8Array, options?: RabbitMqPublishOptions): Promise<RabbitMqPublishResult>
Parameters
sendToQueue(queue: string, content: Uint8Array, options?: RabbitMqPublishOptions): Promise<RabbitMqPublishResult>
Parameters
get(queue: string, options?: CommonOptions): Promise<RabbitMqConsumeResult>
Parameters
consume(queue: string, options?: RabbitMqConsumeOptions): AsyncIterable<RabbitMqMessage>
Parameters
ack(message: RabbitMqMessage, options?: CommonOptions): Promise<RabbitMqAckResult>
Parameters
nack(message: RabbitMqMessage, options?: RabbitMqNackOptions): Promise<RabbitMqAckResult>
Parameters
reject(message: RabbitMqMessage, requeue?: boolean): Promise<RabbitMqAckResult>
Parameters
prefetch(count: number): Promise<void>
Parameters
  • countnumber
close(): Promise<void>
interface

#RabbitMqChannelErrorOptions

interface RabbitMqChannelErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ channel errors.

NameDescription
channelId
Properties
  • readonlychannelId?number
interface

#RabbitMqClient

interface RabbitMqClient extends AsyncDisposable

RabbitMQ client interface.

NameDescription
config
channel()
close()
Properties
Methods
channel(): Promise<RabbitMqChannel>
close(): Promise<void>
interface

#RabbitMqClientConfig

interface RabbitMqClientConfig extends CommonOptions

RabbitMQ client configuration.

NameDescription
urlRabbitMQ connection URL or configuration object.
heartbeatHeartbeat interval in seconds
prefetchDefault prefetch count for channels
Properties
  • readonlyurlstring | RabbitMqConnectionConfig

    RabbitMQ connection URL or configuration object.

  • readonlyheartbeat?number

    Heartbeat interval in seconds

  • readonlyprefetch?number

    Default prefetch count for channels

interface

#RabbitMqConnectionConfig

interface RabbitMqConnectionConfig extends CommonConnectionConfig

RabbitMQ connection configuration.

Extends CommonConnectionConfig with RabbitMQ-specific options.

NameDescription
vhostVirtual host.
Properties
  • readonlyvhost?string

    Virtual host.

interface

#RabbitMqConsumeOptions

interface RabbitMqConsumeOptions extends CommonOptions

Consume options.

NameDescription
noAck
exclusive
priority
Properties
  • readonlynoAck?boolean
  • readonlyexclusive?boolean
  • readonlypriority?number
interface

#RabbitMqConsumeResult

interface RabbitMqConsumeResult

Consume result (single message retrieval).

NameDescription
type
ok
message
duration
Properties
  • readonlytype"rabbitmq:consume"
  • readonlyokboolean
  • readonlymessageRabbitMqMessage | null
  • readonlydurationnumber
interface

#RabbitMqErrorOptions

interface RabbitMqErrorOptions extends ErrorOptions

Options for RabbitMQ errors.

NameDescription
code
Properties
  • readonlycode?number
interface

#RabbitMqExchangeOptions

interface RabbitMqExchangeOptions extends CommonOptions

Exchange options.

NameDescription
durable
autoDelete
internal
arguments
Properties
  • readonlydurable?boolean
  • readonlyautoDelete?boolean
  • readonlyinternal?boolean
  • readonlyarguments?Record<string, unknown>
interface

#RabbitMqExchangeResult

interface RabbitMqExchangeResult

Exchange declaration result.

NameDescription
type
ok
duration
Properties
  • readonlytype"rabbitmq:exchange"
  • readonlyokboolean
  • readonlydurationnumber
interface

#RabbitMqMessage

interface RabbitMqMessage

RabbitMQ message.

NameDescription
content
properties
fields
Properties
interface

#RabbitMqMessageFields

interface RabbitMqMessageFields

RabbitMQ message fields.

NameDescription
deliveryTag
redelivered
exchange
routingKey
Properties
  • readonlydeliveryTagbigint
  • readonlyredeliveredboolean
  • readonlyexchangestring
  • readonlyroutingKeystring
interface

#RabbitMqMessageProperties

interface RabbitMqMessageProperties

RabbitMQ message properties.

NameDescription
contentType
contentEncoding
headers
deliveryMode1: non-persistent, 2: persistent
priority
correlationId
replyTo
expiration
messageId
timestamp
type
userId
appId
Properties
  • readonlycontentType?string
  • readonlycontentEncoding?string
  • readonlyheaders?Record<string, unknown>
  • readonlydeliveryMode?1 | 2

    1: non-persistent, 2: persistent

  • readonlypriority?number
  • readonlycorrelationId?string
  • readonlyreplyTo?string
  • readonlyexpiration?string
  • readonlymessageId?string
  • readonlytimestamp?number
  • readonlytype?string
  • readonlyuserId?string
  • readonlyappId?string
interface

#RabbitMqNackOptions

interface RabbitMqNackOptions extends CommonOptions

Nack options.

NameDescription
requeue
allUpTo
Properties
  • readonlyrequeue?boolean
  • readonlyallUpTo?boolean
interface

#RabbitMqNotFoundErrorOptions

interface RabbitMqNotFoundErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ not found errors.

NameDescription
resource
Properties
  • readonlyresourcestring
interface

#RabbitMqPreconditionFailedErrorOptions

interface RabbitMqPreconditionFailedErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ precondition failed errors.

NameDescription
reason
Properties
  • readonlyreasonstring
interface

#RabbitMqPublishOptions

interface RabbitMqPublishOptions extends CommonOptions

Publish options.

Properties
  • readonlypersistent?boolean
  • readonlycontentType?string
  • readonlycontentEncoding?string
  • readonlyheaders?Record<string, unknown>
  • readonlycorrelationId?string
  • readonlyreplyTo?string
  • readonlyexpiration?string
  • readonlymessageId?string
  • readonlypriority?number
interface

#RabbitMqPublishResult

interface RabbitMqPublishResult

Publish result.

NameDescription
type
ok
duration
Properties
  • readonlytype"rabbitmq:publish"
  • readonlyokboolean
  • readonlydurationnumber
interface

#RabbitMqQueueOptions

interface RabbitMqQueueOptions extends CommonOptions

Queue options.

Properties
  • readonlydurable?boolean
  • readonlyexclusive?boolean
  • readonlyautoDelete?boolean
  • readonlyarguments?Record<string, unknown>
  • readonlymessageTtl?number
  • readonlymaxLength?number
  • readonlydeadLetterExchange?string
  • readonlydeadLetterRoutingKey?string
interface

#RabbitMqQueueResult

interface RabbitMqQueueResult

Queue declaration result.

NameDescription
type
ok
queue
messageCount
consumerCount
duration
Properties
  • readonlytype"rabbitmq:queue"
  • readonlyokboolean
  • readonlyqueuestring
  • readonlymessageCountnumber
  • readonlyconsumerCountnumber
  • readonlydurationnumber

Functions

function

#createRabbitMqClient

async function createRabbitMqClient(config: RabbitMqClientConfig): Promise<RabbitMqClient>

Create a new RabbitMQ client instance.

The client provides queue and exchange management, message publishing and consumption, and acknowledgment handling via AMQP protocol.

Parameters
Returns

Promise<RabbitMqClient> — A promise resolving to a new RabbitMQ client instance

Examples

Basic usage with string URL

const rabbit = await createRabbitMqClient({
  url: "amqp://guest:guest@localhost:5672",
});

const channel = await rabbit.channel();
await channel.assertQueue("my-queue", { durable: true });

const content = new TextEncoder().encode(JSON.stringify({ type: "ORDER" }));
await channel.sendToQueue("my-queue", content, { persistent: true });

await channel.close();
await rabbit.close();

With connection config object

const rabbit = await createRabbitMqClient({
  url: {
    host: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
  },
});

Exchange and binding

// Create exchange and queue
await channel.assertExchange("events", "topic", { durable: true });
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");

// Publish to exchange
await channel.publish("events", "user.created", content);

Consuming messages

for await (const message of channel.consume("my-queue")) {
  console.log("Received:", new TextDecoder().decode(message.content));
  await channel.ack(message);
}

Get single message (polling)

const result = await channel.get("my-queue");
if (result.message) {
  await channel.ack(result.message);
}

Using await using for automatic cleanup

await using rabbit = await createRabbitMqClient({
  url: "amqp://localhost:5672",
});

const channel = await rabbit.channel();
// Client automatically closed when scope exits

Type Aliases

type

#RabbitMqExchangeType

type RabbitMqExchangeType = "direct" | "topic" | "fanout" | "headers"

Exchange type.

type

#RabbitMqResult

type RabbitMqResult = RabbitMqPublishResult | RabbitMqConsumeResult | RabbitMqAckResult | RabbitMqQueueResult | RabbitMqExchangeResult

Union of all RabbitMQ result types.

Search Documentation