Using advanced queueing v9.0.3.1

EDB Postgres Advanced Server advanced queueing provides message queueing and message processing for the EDB Postgres Advanced Server database. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Create a queue table before creating a queue that depends on it.

On the server side, procedures in the DBMS_AQADM package create and manage message queues and queue tables. Use the DBMS_AQ package to add messages to or remove messages from a queue or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ and DBMS_AQADM, see DBMS_AQ.

On the client side, the application uses the EDB.NET driver to enqueue and dequeue messages.

Enqueueing or dequeueing a message

For more information about using EDB Postgres Advanced Server's advanced queueing functionality, see Built-in packages.

Server-side setup

To use advanced queueing functionality on your .NET application, you must first create a user-defined type, queue table, and queue, and then start the queue on the database server. Invoke EDB-PSQL and connect to the EDB Postgres Advanced Server host database. Use the following SPL commands at the command line.

Creating a user-defined type

To specify a RAW data type, create a user-defined type. This example shows creating a user-defined type named myxml:

CREATE TYPE myxml AS (value XML);

Creating the queue table

A queue table can hold multiple queues with the same payload type. This example shows creating a table named MSG_QUEUE_TABLE:

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'MSG_QUEUE_TABLE',
       queue_payload_type => 'myxml',
       comment => 'Message queue table');
END;

Creating the queue

This example shows creating a queue named MSG_QUEUE in the table MSG_QUEUE_TABLE:

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.');
END;

Starting the queue

Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database:

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'MSG_QUEUE');
END;

Client-side example

Once you've created a user-defined type, the queue table, and the queue, start the queue. Then, you can enqueue or dequeue a message using EDB .Net drivers.

Enqueue a message

To enqueue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Create an EDBAQMessage message set its payload.
  4. Call the EDBAQQueue.Enqueue method.
  5. The EDBAQMessage.MessageID property will be populated with a string uniquely identifying your message.

The following code shows how to use EDBAQQueue.Enqueue method. A custom message payload is created and then enqueued.

Note

As an example, we are using the ambient Connection via EDBAQQueue.Connection to begin a transaction, so that if anything goes wrong the queue won't be polluted.

// .NET Framework 4.7.2
using System;
using System.Threading.Tasks;
using EnterpriseDB.EDBClient;

namespace EnterpriseDB
{
    internal static class Program
    {
        // Sample message payload
        class MyXML
        {
            public string Value { get; set; }
        }

        public static async Task Main(string[] args)
        {
            // not for production, move connection string to app settings
            string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb";

            // Note registration of MyXml type
            var dataSourceBuilder = new EDBDataSourceBuilder(connectionString);
            dataSourceBuilder
                .MapComposite<MyXML>("enterprisedb.myxml");

            using (var dataSource = dataSourceBuilder.Build())
            using (var connection = await dataSource.OpenConnectionAsync())
            using (var queue = CreateQueue("MSG_QUEUE", connection))
            {
                // Enqueue 5 messages
                int messagesToSend = 5;
                for (int i = 0; i < messagesToSend; i++)
                {
                    var payload = new MyXML()
                    {
                        Value = $"(<Message><MessageText>Test message: {i}</MessageText></Message>)"
                    };

                    if (TryEnqueueMessage(queue, payload, out var _))
                    {
                        // MessageId is populated with a unique identifier
                        Console.WriteLine($"Message {i} ({message.MessageId}) enqueued");
                    }
                    else
                    {
                        Console.WriteLine($"Message {i} enqueue failed");
                    }
                }
            }
        }

        // Creates and returns a queue ready for use in our sample
        private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection)
        {
            var queue = new EDBAQQueue(queueName, connection);
            queue.MessageType = EDBAQMessageType.Udt;
            queue.EnqueueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
            queue.UdtTypeName = "myxml";

            return queue;
        }

        // Enqueues the payload
        // If the enqueuing was successfull, message variable receives the queue message and the function returns true
        // otherwise message is null and the function returns false
        private static bool TryEnqueueMessage<T>(EDBAQQueue queue, T payload, out EDBAQMessage message)
        {
            using (EDBTransaction transaction = queue.Connection.BeginTransaction())
            {
                try
                {
                    message = new EDBAQMessage() { Payload = payload };
                    queue.Enqueue(message);
                    transaction.Commit();
        
                    return true;
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error while enqueing message: {ex.Message}");
                    transaction?.Rollback();
        
                    message = null;
                    return false;
                }
            }
        }
    }
}

// .NET8
using EnterpriseDB.EDBClient;

namespace EnterpriseDB;

internal static class Program
{
    // Sample message payload
    class MyXML
    {
        public string Value { get; set; }
    }

    public static async Task Main(string[] args)
    {
        // not for production, move connection string to app settings
        string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb";

        // Note registration of MyXml type
        var dataSourceBuilder = new EDBDataSourceBuilder(connectionString);
        dataSourceBuilder
            .MapComposite<MyXML>("enterprisedb.myxml");

        await using var dataSource = dataSourceBuilder.Build();
        await using var connection = await dataSource.OpenConnectionAsync();
        using var queue = CreateQueue("MSG_QUEUE", connection);

        // Enqueue 5 messages
        int messagesToSend = 5;
        for (int i = 0; i < messagesToSend; i++)
        {
            var payload = new MyXML()
            {
                Value = $"(<Message><MessageText>Test message: {i}</MessageText></Message>)"
            };

            if (TryEnqueueMessage(queue, payload, out var _))
            {
                // MessageId is populated with a unique identifier
                Console.WriteLine($"Message {i} ({message.MessageId}) enqueued");
            }
            else
            {
                Console.WriteLine($"Message {i} enqueue failed");
            }
        }

    }

    // Creates and returns a queue ready for use in our sample
    private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection)
    {
        var queue = new EDBAQQueue(queueName, connection);
        queue.MessageType = EDBAQMessageType.Udt;
        queue.EnqueueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
        queue.UdtTypeName = "myxml";

        return queue;
    }

    // Enqueues the payload
    // If the enqueuing was successfull, message variable receives the queue message and the function returns true
    // otherwise message is null and the function returns false
    private static bool TryEnqueueMessage<T>(EDBAQQueue queue, T payload, out EDBAQMessage message)
    {
        using EDBTransaction transaction = queue.Connection.BeginTransaction();

        try
        {
            message = new EDBAQMessage() { Payload = payload };
            queue.Enqueue(message);
            transaction.Commit();

            return true;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error while enqueing message: {ex.Message}");
            transaction?.Rollback();

            message = null;
            return false;
        }
    }
}

Dequeueing a message

To dequeue a message on your .NET application, you must:

  1. Import the EnterpriseDB.EDBClient namespace.
  2. Pass the name of the queue and create the instance of the EDBAQQueue.
  3. Call the EDBAQQueue.Dequeue() method.
Note

The following code shows how to use the EDBAQQueue.Dequeue method. A queue is retrieved by its name, and a attempt is made to dequeue a message.

If a PostgresException with SqlState set to P0002 is raised, then the queue is empty or the wait time (set with queue.DequeueOptions.Wait) has expired, and the code gracefully returns a null message.

// .NET Framework 4.7.2
using System;
using System.Threading.Tasks;
using EnterpriseDB.EDBClient;

namespace EnterpriseDB
{
    internal static class Program
    {
        // Sample message payload
        class MyXML
        {
            public string Value { get; set; }
        }

        public static async Task Main(string[] args)
        {
            // not for production, move connection string to app settings
            string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb";

            // Note registration of MyXml type
            var dataSourceBuilder = new EDBDataSourceBuilder(connectionString);
            dataSourceBuilder
                .MapComposite<MyXML>("enterprisedb.myxml");

            using (var dataSource = dataSourceBuilder.Build())
            using (var connection = await dataSource.OpenConnectionAsync())
            using (var queue = CreateQueue("MSG_QUEUE", connection))
            {
                // Dequeue 5 messages
                int messagesToDequeue = 5;
                for (int i = 0; i < messagesToDequeue; i++)
                {
                    if (TryDequeueMessage(queue, out var message))
                    {
                        Console.WriteLine($"Message {message.MessageId} dequeued");
                        
                        if (message?.Payload is MyXML myXML)
                        {
                            Console.WriteLine($"MyXML Message received: {myXML.Value}");
                        }
                        else
                        {
                            Console.WriteLine($"Other message received");
                        }
                    }
                    else
                    {
                        Console.WriteLine($"No message");
                    }
                }
            }
        }

        // Creates and returns a queue ready for use in our sample
        private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection)
        {
            var queue = new EDBAQQueue(queueName, connection);
            queue.MessageType = EDBAQMessageType.Udt;
            queue.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE;
            queue.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
            queue.DequeueOptions.Wait = 1; // wait for 1 seconds
            queue.UdtTypeName = "myxml";

            return queue;
        }

        // Dequeus a payload
        // If the dequeuing was successfull, message variable receives the queue message and the function returns true
        // otherwise message is null and the function returns false
        private static bool TryDequeueMessage(EDBAQQueue queue, out EDBAQMessage message)
        {
            using (EDBTransaction transaction = queue.Connection.BeginTransaction())
            {
                try
                {
                    message = queue.Dequeue();
                    transaction.Commit();

                    return true;
                }
                catch (PostgresException pgException) when (pgException.SqlState == "P0002")
                {
                    // Queue empty or time out
                    transaction.Commit();

                    message = null;
                    return false;
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error while dequeuing message: {ex.Message}");
                    transaction?.Rollback();

                    message = null;
                    return false;
                }
            }
        }
    }
}

// .NET8
using EnterpriseDB.EDBClient;

namespace EnterpriseDB;
internal static class Program
{
    // Sample message payload
    class MyXML
    {
        public string Value { get; set; }
    }

    public static async Task Main(string[] args)
    {
        // not for production, move connection string to app settings
        string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb";

        // Note registration of MyXml type
        var dataSourceBuilder = new EDBDataSourceBuilder(connectionString);
        dataSourceBuilder
            .MapComposite<MyXML>("enterprisedb.myxml");

        await using var dataSource = dataSourceBuilder.Build();
        await using var connection = await dataSource.OpenConnectionAsync();
        using var queue = CreateQueue("MSG_QUEUE", connection);

        // Dequeue 5 messages
        int messagesToDequeue = 5;
        for (int i = 0; i < messagesToDequeue; i++)
        {
            if (TryDequeueMessage(queue, out var message))
            {
                Console.WriteLine($"Message {message.MessageId} dequeued");
                        
                if (message?.Payload is MyXML myXML)
                {
                    Console.WriteLine($"MyXML Message received: {myXML.Value}");
                }
                else
                {
                    Console.WriteLine($"Other message received");
                }
            }
            else
            {
                Console.WriteLine($"No message");
            }
        }
    }

    // Creates and returns a queue ready for use in our sample
    private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection)
    {
        var queue = new EDBAQQueue(queueName, connection);
        queue.MessageType = EDBAQMessageType.Udt;
        queue.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE;
        queue.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT;
        queue.DequeueOptions.Wait = 1; // wait for 1 seconds
        queue.UdtTypeName = "myxml";

        return queue;
    }

    // Dequeus a payload
    // If the dequeuing was successfull, message variable receives the queue message and the function returns true
    // otherwise message is null and the function returns false
    private static bool TryDequeueMessage(EDBAQQueue queue, out EDBAQMessage message)
    {
        using EDBTransaction transaction = queue.Connection.BeginTransaction();

        try
        {
            message = queue.Dequeue();
            transaction.Commit();

            return true;
        }
        catch (PostgresException pgException) when (pgException.SqlState == "P0002")
        {
            // Queue empty or time out
            transaction.Commit();

            message = null;
            return false;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error while dequeuing message: {ex.Message}");
            transaction?.Rollback();

            message = null;
            return false;
        }
    }
}

EDBAQ classes

The following EDBAQ classes are used in this application.

EDBAQDequeueMode

The EDBAQDequeueMode class lists all the dequeuer modes available.

ValueDescription
BrowseReads the message without locking.
LockedReads and gets a write lock on the message.
RemoveDeletes the message after reading. This is the default value.
Remove_NoDataConfirms receipt of the message.

EDBAQDequeueOptions

The EDBAQDequeueOptions class lists the options available when dequeuing a message.

PropertyDescription
ConsumerNameThe name of the consumer for which to dequeue the message.
DequeueModeSet from EDBAQDequeueMode. It represents the locking behavior linked with the dequeue option.
NavigationSet from EDBAQNavigationMode. It represents the position of the message to fetch.
VisibilitySet from EDBAQVisibility. It represents whether the new message is dequeued as part of the current transaction.
WaitThe wait time for a message as per the search criteria.
MsgidThe message identifier.
CorrelationThe correlation identifier.
DeqConditionThe dequeuer condition. It's a Boolean expression.
TransformationThe transformation to apply before dequeuing the message.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQEnqueueOptions

The EDBAQEnqueueOptions class lists the options available when enqueuing a message.

PropertyDescription
VisibilitySet from EDBAQVisibility. It represents whether the new message is enqueued as part of the current transaction.
RelativeMsgidThe relative message identifier.
SequenceDeviationThe sequence when to dequeue the message.
TransformationThe transformation to apply before enqueuing the message.
DeliveryModeThe delivery mode of the enqueued message.

EDBAQMessage

The EDBAQMessage class lists a message to enqueue/dequeue.

PropertyDescription
PayloadThe actual message to queue.
MessageIdThe ID of the queued message.

EDBAQMessageProperties

The EDBAQMessageProperties lists the message properties available.

PropertyDescription
PriorityThe priority of the message.
DelayThe duration after which the message is available for dequeuing, in seconds.
ExpirationThe duration for which the message is available for dequeuing, in seconds.
CorrelationThe correlation identifier.
AttemptsThe number of attempts taken to dequeue the message.
RecipientListThe recipients list that overthrows the default queue subscribers.
ExceptionQueueThe name of the queue to move the unprocessed messages to.
EnqueueTimeThe time when the message was enqueued.
StateThe state of the message while dequeued.
OriginalMsgidThe message identifier in the last queue.
TransactionGroupThe transaction group for the dequeued messages.
DeliveryModeThe delivery mode of the dequeued message.

EDBAQMessageState

The EDBAQMessageState class represents the state of the message during dequeue.

ValueDescription
ExpiredThe message is moved to the exception queue.
ProcessedThe message is processed and kept.
ReadyThe message is ready to be processed.
WaitingThe message is in waiting state. The delay isn't reached.

EDBAQMessageType

The EDBAQMessageType class represents the types for payload.

ValueDescription
RawThe raw message type.

Note: Currently, this payload type isn't supported.
UDTThe user-defined type message.
XMLThe XML type message.

Note: Currently, this payload type isn't supported.

EDBAQNavigationMode

The EDBAQNavigationMode class represents the different types of navigation modes available.

ValueDescription
First_MessageReturns the first available message that matches the search terms.
Next_MessageReturns the next available message that matches the search items.
Next_TransactionReturns the first message of next transaction group.

EDBAQQueue

The EDBAQQueue class represents a SQL statement to execute DMBS_AQ functionality on a PostgreSQL database.

PropertyDescription
ConnectionThe connection to use.
NameThe name of the queue.
MessageTypeThe message type that's enqueued/dequeued from this queue, for example EDBAQMessageType.Udt.
UdtTypeNameThe user-defined type name of the message type.
EnqueueOptionsThe enqueue options to use.
DequeuOptionsThe dequeue options to use.
MessagePropertiesThe message properties to use.

EDBAQVisibility

The EDBAQVisibility class represents the visibility options available.

ValueDescription
ImmediateThe enqueue/dequeue isn't part of the ongoing transaction.
On_CommitThe enqueue/dequeue is part of the current transaction.
Note
  • To review the default options for these parameters, see DBMS_AQ.
  • EDB advanced queueing functionality uses user-defined types for calling enqueue/dequeue operations. Server Compatibility Mode=NoTypeLoading can't be used with advanced queueing because NoTypeLoading doesn't load any user-defined types.