Using advanced queueing v9.0.3.1
EDB Postgres Advanced Server advanced queueing provides message queueing and message processing for the EDB Postgres Advanced Server database. User-defined messages are stored in a queue. A collection of queues is stored in a queue table. Create a queue table before creating a queue that depends on it.
On the server side, procedures in the DBMS_AQADM
package create and manage message queues and queue tables. Use the DBMS_AQ
package to add messages to or remove messages from a queue or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ
and DBMS_AQADM
, see DBMS_AQ.
On the client side, the application uses the EDB.NET driver to enqueue and dequeue messages.
Enqueueing or dequeueing a message
For more information about using EDB Postgres Advanced Server's advanced queueing functionality, see Built-in packages.
Server-side setup
To use advanced queueing functionality on your .NET application, you must first create a user-defined type, queue table, and queue, and then start the queue on the database server. Invoke EDB-PSQL and connect to the EDB Postgres Advanced Server host database. Use the following SPL commands at the command line.
Creating a user-defined type
To specify a RAW data type, create a user-defined type. This example shows creating a user-defined type named myxml
:
CREATE TYPE myxml AS (value XML);
Creating the queue table
A queue table can hold multiple queues with the same payload type. This example shows creating a table named MSG_QUEUE_TABLE
:
EXEC DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'MSG_QUEUE_TABLE', queue_payload_type => 'myxml', comment => 'Message queue table'); END;
Creating the queue
This example shows creating a queue named MSG_QUEUE
in the table MSG_QUEUE_TABLE
:
BEGIN DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.'); END;
Starting the queue
Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database:
BEGIN DBMS_AQADM.START_QUEUE (queue_name => 'MSG_QUEUE'); END;
Client-side example
Once you've created a user-defined type, the queue table, and the queue, start the queue. Then, you can enqueue or dequeue a message using EDB .Net drivers.
Enqueue a message
To enqueue a message on your .NET application, you must:
- Import the
EnterpriseDB.EDBClient
namespace. - Pass the name of the queue and create the instance of the
EDBAQQueue
. - Create an
EDBAQMessage
message set its payload. - Call the
EDBAQQueue.Enqueue
method. - The
EDBAQMessage.MessageID
property will be populated with a string uniquely identifying your message.
The following code shows how to use EDBAQQueue.Enqueue
method. A custom message payload is created and then enqueued.
Note
As an example, we are using the ambient Connection via EDBAQQueue.Connection
to begin a transaction, so that if anything goes wrong the queue won't be polluted.
// .NET Framework 4.7.2 using System; using System.Threading.Tasks; using EnterpriseDB.EDBClient; namespace EnterpriseDB { internal static class Program { // Sample message payload class MyXML { public string Value { get; set; } } public static async Task Main(string[] args) { // not for production, move connection string to app settings string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb"; // Note registration of MyXml type var dataSourceBuilder = new EDBDataSourceBuilder(connectionString); dataSourceBuilder .MapComposite<MyXML>("enterprisedb.myxml"); using (var dataSource = dataSourceBuilder.Build()) using (var connection = await dataSource.OpenConnectionAsync()) using (var queue = CreateQueue("MSG_QUEUE", connection)) { // Enqueue 5 messages int messagesToSend = 5; for (int i = 0; i < messagesToSend; i++) { var payload = new MyXML() { Value = $"(<Message><MessageText>Test message: {i}</MessageText></Message>)" }; if (TryEnqueueMessage(queue, payload, out var _)) { // MessageId is populated with a unique identifier Console.WriteLine($"Message {i} ({message.MessageId}) enqueued"); } else { Console.WriteLine($"Message {i} enqueue failed"); } } } } // Creates and returns a queue ready for use in our sample private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection) { var queue = new EDBAQQueue(queueName, connection); queue.MessageType = EDBAQMessageType.Udt; queue.EnqueueOptions.Visibility = EDBAQVisibility.ON_COMMIT; queue.UdtTypeName = "myxml"; return queue; } // Enqueues the payload // If the enqueuing was successfull, message variable receives the queue message and the function returns true // otherwise message is null and the function returns false private static bool TryEnqueueMessage<T>(EDBAQQueue queue, T payload, out EDBAQMessage message) { using (EDBTransaction transaction = queue.Connection.BeginTransaction()) { try { message = new EDBAQMessage() { Payload = payload }; queue.Enqueue(message); transaction.Commit(); return true; } catch (Exception ex) { Console.WriteLine($"Error while enqueing message: {ex.Message}"); transaction?.Rollback(); message = null; return false; } } } } } // .NET8 using EnterpriseDB.EDBClient; namespace EnterpriseDB; internal static class Program { // Sample message payload class MyXML { public string Value { get; set; } } public static async Task Main(string[] args) { // not for production, move connection string to app settings string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb"; // Note registration of MyXml type var dataSourceBuilder = new EDBDataSourceBuilder(connectionString); dataSourceBuilder .MapComposite<MyXML>("enterprisedb.myxml"); await using var dataSource = dataSourceBuilder.Build(); await using var connection = await dataSource.OpenConnectionAsync(); using var queue = CreateQueue("MSG_QUEUE", connection); // Enqueue 5 messages int messagesToSend = 5; for (int i = 0; i < messagesToSend; i++) { var payload = new MyXML() { Value = $"(<Message><MessageText>Test message: {i}</MessageText></Message>)" }; if (TryEnqueueMessage(queue, payload, out var _)) { // MessageId is populated with a unique identifier Console.WriteLine($"Message {i} ({message.MessageId}) enqueued"); } else { Console.WriteLine($"Message {i} enqueue failed"); } } } // Creates and returns a queue ready for use in our sample private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection) { var queue = new EDBAQQueue(queueName, connection); queue.MessageType = EDBAQMessageType.Udt; queue.EnqueueOptions.Visibility = EDBAQVisibility.ON_COMMIT; queue.UdtTypeName = "myxml"; return queue; } // Enqueues the payload // If the enqueuing was successfull, message variable receives the queue message and the function returns true // otherwise message is null and the function returns false private static bool TryEnqueueMessage<T>(EDBAQQueue queue, T payload, out EDBAQMessage message) { using EDBTransaction transaction = queue.Connection.BeginTransaction(); try { message = new EDBAQMessage() { Payload = payload }; queue.Enqueue(message); transaction.Commit(); return true; } catch (Exception ex) { Console.WriteLine($"Error while enqueing message: {ex.Message}"); transaction?.Rollback(); message = null; return false; } } }
Dequeueing a message
To dequeue a message on your .NET application, you must:
- Import the
EnterpriseDB.EDBClient
namespace. - Pass the name of the queue and create the instance of the
EDBAQQueue
. - Call the
EDBAQQueue.Dequeue()
method.
Note
The following code shows how to use the EDBAQQueue.Dequeue
method. A queue is retrieved by its name, and a attempt is made to dequeue a message.
If a PostgresException
with SqlState
set to P0002
is raised, then the queue is empty or the wait time (set with queue.DequeueOptions.Wait
) has expired, and the code gracefully returns a null
message.
// .NET Framework 4.7.2 using System; using System.Threading.Tasks; using EnterpriseDB.EDBClient; namespace EnterpriseDB { internal static class Program { // Sample message payload class MyXML { public string Value { get; set; } } public static async Task Main(string[] args) { // not for production, move connection string to app settings string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb"; // Note registration of MyXml type var dataSourceBuilder = new EDBDataSourceBuilder(connectionString); dataSourceBuilder .MapComposite<MyXML>("enterprisedb.myxml"); using (var dataSource = dataSourceBuilder.Build()) using (var connection = await dataSource.OpenConnectionAsync()) using (var queue = CreateQueue("MSG_QUEUE", connection)) { // Dequeue 5 messages int messagesToDequeue = 5; for (int i = 0; i < messagesToDequeue; i++) { if (TryDequeueMessage(queue, out var message)) { Console.WriteLine($"Message {message.MessageId} dequeued"); if (message?.Payload is MyXML myXML) { Console.WriteLine($"MyXML Message received: {myXML.Value}"); } else { Console.WriteLine($"Other message received"); } } else { Console.WriteLine($"No message"); } } } } // Creates and returns a queue ready for use in our sample private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection) { var queue = new EDBAQQueue(queueName, connection); queue.MessageType = EDBAQMessageType.Udt; queue.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE; queue.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT; queue.DequeueOptions.Wait = 1; // wait for 1 seconds queue.UdtTypeName = "myxml"; return queue; } // Dequeus a payload // If the dequeuing was successfull, message variable receives the queue message and the function returns true // otherwise message is null and the function returns false private static bool TryDequeueMessage(EDBAQQueue queue, out EDBAQMessage message) { using (EDBTransaction transaction = queue.Connection.BeginTransaction()) { try { message = queue.Dequeue(); transaction.Commit(); return true; } catch (PostgresException pgException) when (pgException.SqlState == "P0002") { // Queue empty or time out transaction.Commit(); message = null; return false; } catch (Exception ex) { Console.WriteLine($"Error while dequeuing message: {ex.Message}"); transaction?.Rollback(); message = null; return false; } } } } } // .NET8 using EnterpriseDB.EDBClient; namespace EnterpriseDB; internal static class Program { // Sample message payload class MyXML { public string Value { get; set; } } public static async Task Main(string[] args) { // not for production, move connection string to app settings string connectionString = "Server=127.0.0.1;Port=5444;User Id=enterprisedb;Password=edb;Database=edb"; // Note registration of MyXml type var dataSourceBuilder = new EDBDataSourceBuilder(connectionString); dataSourceBuilder .MapComposite<MyXML>("enterprisedb.myxml"); await using var dataSource = dataSourceBuilder.Build(); await using var connection = await dataSource.OpenConnectionAsync(); using var queue = CreateQueue("MSG_QUEUE", connection); // Dequeue 5 messages int messagesToDequeue = 5; for (int i = 0; i < messagesToDequeue; i++) { if (TryDequeueMessage(queue, out var message)) { Console.WriteLine($"Message {message.MessageId} dequeued"); if (message?.Payload is MyXML myXML) { Console.WriteLine($"MyXML Message received: {myXML.Value}"); } else { Console.WriteLine($"Other message received"); } } else { Console.WriteLine($"No message"); } } } // Creates and returns a queue ready for use in our sample private static EDBAQQueue CreateQueue(string queueName, EDBConnection connection) { var queue = new EDBAQQueue(queueName, connection); queue.MessageType = EDBAQMessageType.Udt; queue.DequeueOptions.Navigation = EDBAQNavigationMode.FIRST_MESSAGE; queue.DequeueOptions.Visibility = EDBAQVisibility.ON_COMMIT; queue.DequeueOptions.Wait = 1; // wait for 1 seconds queue.UdtTypeName = "myxml"; return queue; } // Dequeus a payload // If the dequeuing was successfull, message variable receives the queue message and the function returns true // otherwise message is null and the function returns false private static bool TryDequeueMessage(EDBAQQueue queue, out EDBAQMessage message) { using EDBTransaction transaction = queue.Connection.BeginTransaction(); try { message = queue.Dequeue(); transaction.Commit(); return true; } catch (PostgresException pgException) when (pgException.SqlState == "P0002") { // Queue empty or time out transaction.Commit(); message = null; return false; } catch (Exception ex) { Console.WriteLine($"Error while dequeuing message: {ex.Message}"); transaction?.Rollback(); message = null; return false; } } }
EDBAQ classes
The following EDBAQ classes are used in this application.
EDBAQDequeueMode
The EDBAQDequeueMode
class lists all the dequeuer modes available.
Value | Description |
---|---|
Browse | Reads the message without locking. |
Locked | Reads and gets a write lock on the message. |
Remove | Deletes the message after reading. This is the default value. |
Remove_NoData | Confirms receipt of the message. |
EDBAQDequeueOptions
The EDBAQDequeueOptions
class lists the options available when dequeuing a message.
Property | Description |
---|---|
ConsumerName | The name of the consumer for which to dequeue the message. |
DequeueMode | Set from EDBAQDequeueMode . It represents the locking behavior linked with the dequeue option. |
Navigation | Set from EDBAQNavigationMode . It represents the position of the message to fetch. |
Visibility | Set from EDBAQVisibility . It represents whether the new message is dequeued as part of the current transaction. |
Wait | The wait time for a message as per the search criteria. |
Msgid | The message identifier. |
Correlation | The correlation identifier. |
DeqCondition | The dequeuer condition. It's a Boolean expression. |
Transformation | The transformation to apply before dequeuing the message. |
DeliveryMode | The delivery mode of the dequeued message. |
EDBAQEnqueueOptions
The EDBAQEnqueueOptions
class lists the options available when enqueuing a message.
Property | Description |
---|---|
Visibility | Set from EDBAQVisibility . It represents whether the new message is enqueued as part of the current transaction. |
RelativeMsgid | The relative message identifier. |
SequenceDeviation | The sequence when to dequeue the message. |
Transformation | The transformation to apply before enqueuing the message. |
DeliveryMode | The delivery mode of the enqueued message. |
EDBAQMessage
The EDBAQMessage
class lists a message to enqueue/dequeue.
Property | Description |
---|---|
Payload | The actual message to queue. |
MessageId | The ID of the queued message. |
EDBAQMessageProperties
The EDBAQMessageProperties
lists the message properties available.
Property | Description |
---|---|
Priority | The priority of the message. |
Delay | The duration after which the message is available for dequeuing, in seconds. |
Expiration | The duration for which the message is available for dequeuing, in seconds. |
Correlation | The correlation identifier. |
Attempts | The number of attempts taken to dequeue the message. |
RecipientList | The recipients list that overthrows the default queue subscribers. |
ExceptionQueue | The name of the queue to move the unprocessed messages to. |
EnqueueTime | The time when the message was enqueued. |
State | The state of the message while dequeued. |
OriginalMsgid | The message identifier in the last queue. |
TransactionGroup | The transaction group for the dequeued messages. |
DeliveryMode | The delivery mode of the dequeued message. |
EDBAQMessageState
The EDBAQMessageState
class represents the state of the message during dequeue.
Value | Description |
---|---|
Expired | The message is moved to the exception queue. |
Processed | The message is processed and kept. |
Ready | The message is ready to be processed. |
Waiting | The message is in waiting state. The delay isn't reached. |
EDBAQMessageType
The EDBAQMessageType
class represents the types for payload.
Value | Description |
---|---|
Raw | The raw message type. Note: Currently, this payload type isn't supported. |
UDT | The user-defined type message. |
XML | The XML type message. Note: Currently, this payload type isn't supported. |
EDBAQNavigationMode
The EDBAQNavigationMode
class represents the different types of navigation modes available.
Value | Description |
---|---|
First_Message | Returns the first available message that matches the search terms. |
Next_Message | Returns the next available message that matches the search items. |
Next_Transaction | Returns the first message of next transaction group. |
EDBAQQueue
The EDBAQQueue
class represents a SQL statement to execute DMBS_AQ
functionality on a PostgreSQL database.
Property | Description |
---|---|
Connection | The connection to use. |
Name | The name of the queue. |
MessageType | The message type that's enqueued/dequeued from this queue, for example EDBAQMessageType.Udt . |
UdtTypeName | The user-defined type name of the message type. |
EnqueueOptions | The enqueue options to use. |
DequeuOptions | The dequeue options to use. |
MessageProperties | The message properties to use. |
EDBAQVisibility
The EDBAQVisibility
class represents the visibility options available.
Value | Description |
---|---|
Immediate | The enqueue/dequeue isn't part of the ongoing transaction. |
On_Commit | The enqueue/dequeue is part of the current transaction. |
Note
- To review the default options for these parameters, see DBMS_AQ.
- EDB advanced queueing functionality uses user-defined types for calling enqueue/dequeue operations.
Server Compatibility Mode=NoTypeLoading
can't be used with advanced queueing becauseNoTypeLoading
doesn't load any user-defined types.
- On this page
- Enqueueing or dequeueing a message
- EDBAQ classes