Oracle Database 26ai will be available on generic Linux platforms in January and soon on AIX and Windows
Purpose
Administration of Advanced Queue queues
AUTHID
CURRENT_USER
Constants
Name
Data Type
Value
12c Sharded Queue
AUTO
BINARY_INTEGER
1
CACHED
BINARY_INTEGER
2
UNCACHED
BINARY_INTEGER
3
Delivery Mode
PERSISTENT
BINARY_INTEGER
1
BUFFERED
BINARY_INTEGER
2
PERSISTENT_OR_BUFFERED
BINARY_INTEGER
3
Get/Set_Replay_Info
LAST_ENQUEUED
BINARY_INTEGER
0
LAST_ACKNOWLEDGED
BINARY_INTEGER
1
GoldenGate (OGG) Replicated Queue
REPLICATION_MODE
BINARY_INTEGER
1
PROPAGATION_MODE
BINARY_INTEGER
2
SWITCHOVER_FORCE
BINARY_INTEGER
4
LDAP
AQ_QUEUE_CONNECTION
BINARY_INTEGER
1
AQ_TOPIC_CONNECTION
BINARY_INTEGER
2
Message Grouping
TRANSACTIONAL
BINARY_INTEGER
1
NONE
BINARY_INTEGER
0
Non-Repudiation Properties
NON_REPUDIATE_SENDER
BINARY_INTEGER
1
NON_REPUDIATE_SNDRCV
BINARY_INTEGER
2
Payload
JMS_TYPE
VARCHAR2(10)
'JMS'
Protocols
TTC
BINARY_INTEGER
0
HTTP
BINARY_INTEGER
1
SMTP
BINARY_INTEGER
2
FTP
BINARY_INTEGER
4
ANYP
BINARY_INTEGER
HTTP + SMTP
LOGMINER_PROTOCOL
BINARY_INTEGER
1
LOGAPPLY_PROTOCOL
BINARY_INTEGER
2
TEST_PROTOCOL
BINARY_INTEGER
3
Queue Type
NORMAL_QUEUE
BINARY_INTEGER
0
EXCEPTION_QUEUE
BINARY_INTEGER
1
NON_PERSISTENT_QUEUE
BINARY_INTEGER
2
Retention
INFINITE
BINARY_INTEGER
-1
Retention Types
DEQUEUE_TIME
BINARY_INTEGER
0
ENQUEUE_TIME
BINARY_INTEGER
1
ENQUEUE_TIME_AND_ALL_DEQUEUED
BINARY_INTEGER
2
NUM_RETENTION_TYPES
BINARY_INTEGER
3
Sort List
PRIORITY
BINARY_INTEGER
1
ENQ_TIME
BINARY_INTEGER
2
PRIORITY_ENQ_TIME
BINARY_INTEGER
3
COMMIT_TIME
BINARY_INTEGER
4
PRIORITY_COMMIT_TIME
BINARY_INTEGER
5
ENQ_TIME_PRIORITY
BINARY_INTEGER
7
Subscriber
QUEUE_TO_QUEUE_SUBSCRIBER
BINARY_INTEGER
8
Data Types
TYPE sys.aq$_agent AS OBJECT(
name VARCHAR2(30), -- name of message producer or consumer
address VARCHAR2(1024), -- address where message to be sent
protocol NUMBER DEFAULT 0); -- must be 0
/
TYPE aq$_purge_options_t IS
RECORD(block BOOLEAN DEFAULT FALSE,
delivery_mode PLS_INTEGER DEFAULT dbms_aqadm.persistent);
/
TYPE aq$_subscriber_list_t IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
/
TYPE queue_props_t IS RECORD (
queue_type BINARY_INTEGER DEFAULT NORMAL_QUEUE,
retry_delay NUMBER DEFAULT 0,
retention_time NUMBER DEFAULT 0,
sort_list VARCHAR2(30) DEFAULT NULL,
cache_hint BINARY_INTEGER DEFAULT AUTO);
Dependencies
SELECT name FROM dba_dependencies WHERE referenced_name = 'DBMS_AQADM' UNION
SELECT referenced_name FROM dba_dependencies WHERE name = 'DBMS_AQADM' ORDER BY 1;
Owned by SYS with EXECUTE granted to the users CTX_SYS, MDSYS, SYSTEM, WMSYS and the AQ_ADMINISTRATOR_ROLE,
EXECUTE_CATALOG_ROLE, GSMADMIN_INTERNAL, OEM_MINITOR, OGG_APPLY, OGG_APPLY_PROCREP, OGG_CAPTURE, and OGG_SHARED_CAPTURE roles
Source
{ORACLE_HOME}/rdbms/admin/dbmsaqad.sql
also see: {ORACLE_HOME}/rdbms/admin/catqueue.sql
dbms_aqadm.add_connection_to_ldap(
connection IN VARCHAR2,
host IN VARCHAR2,
port IN BINARY_INTEGER,
sid IN VARCHAR2,
driver IN VARCHAR2 DEFAULT NULL,
type IN BINARY_INTEGER DEFAULT AQ_QUEUE_CONNECTION);
TBD
Overload 2
dbms_aqadm.add_connection_to_ldap(
connection IN VARCHAR2,
jdbc_string IN VARCHAR2,
username IN VARCHAR2 DEFAULT NULL,
password IN VARCHAR2 DEFAULT NULL,
type IN BINARY_INTEGER DEFAULT AQ_QUEUE_CONNECTION);
dbms_aqadm.add_subscriber(
queue_name IN VARCHAR2,
subscriber IN sys.aq$_agent,
rule IN VARCHAR2 DEFAULT NULL,
transformation IN VARCHAR2 DEFAULT NULL
queue_to_queue IN BOOLEAN DEFAULT FALSE,
delivery_mode IN PLS_INTEGER DEFAULT dbms_aqadm.persistent);
See AQ Demo 1: Linked at page bottom
-- a rule based on a VARCHAR2 must be in the format: 'priority < 11 AND SOURCE = ''EF''');
Alters an agent registered for Oracle Streams AQ Internet access
dbms_aqadm.alter_aq_agent(
agent_name IN VARCHAR2,
certificate_location IN VARCHAR2 DEFAULT NULL,
enable_http IN BOOLEAN DEFAULT FALSE,
enable_smtp IN BOOLEAN DEFAULT FALSE,
enable_anyp IN BOOLEAN DEFAULT FALSE);
dbms_aqadm.alter_propagation_schedule(
queue_name IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
duration IN NUMBER DEFAULT NULL,
next_time IN VARCHAR2 DEFAULT NULL,
latency IN NUMBER DEFAULT 60,
destination_queue IN VARCHAR2 DEFAULT NULL);
dbms_aqadm.alter_queue(
queue_name IN VARCHAR2,
max_retries IN NUMBER DEFAULT NULL,
retry_delay IN NUMBER DEFAULT NULL,
retention_time IN NUMBER DEFAULT NULL,
auto_commit IN BOOLEAN DEFAULT TRUE,
comment IN VARCHAR2 DEFAULT NULL);
Alters the existing properties of a queue table for use with RAC
dbms_aqadm.alter_queue_table(
queue_table IN VARCHAR2,
comment IN VARCHAR2 DEFAULT NULL,
primary_instance IN BINARY_INTEGER DEFAULT NULL,
secondary_instance IN BINARY_INTEGER DEFAULT NULL,
replication_mode IN BINARY_INTEGER DEFAULT NULL);
dbms_aqadm.alter_sharded_queue(
queue_name IN VARCHAR2,
max_retries IN NUMBER DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL,
queue_properties IN QUEUE_PROPS_T DEFAULT NULL,
replication_mode IN BINARY_INTEGER DEFAULT NONE);
Changes the properties of a transactional event queue
dbms_aqadm.alter_transactional_event_queue(
queue_name IN VARCHAR2,
max_retries IN NUMBER DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL,
queue_properties IN QUEUE_PROPS_T DEFAULT NULL,
replication_mode IN BINARY_INTEGER DEFAULT NONE);
aq$_propaq(
job IN NUMBER,
next_date IN DATE,
qname IN VARCHAR2,
schema IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
toid_char IN VARCHAR2 DEFAULT NULL,
version_char IN VARCHAR2 DEFAULT NULL,
start_time IN VARCHAR2,
duration IN VARCHAR2 DEFAULT NULL,
next_time IN VARCHAR2 DEFAULT NULL,
latency IN VARCHAR2 DEFAULT '60')
RETURN DATE;
dbms_aqadm.create_aq_agent(
agent_name IN VARCHAR2,
certificate_location IN VARCHAR2 DEFAULT NULL,
enable_http IN BOOLEAN DEFAULT FALSE,
enable_smtp IN BOOLEAN DEFAULT FALSE,
enable_anyp IN BOOLEAN DEFAULT FALSE);
dbms_aqadm.create_database_kafka_topic(
topicname IN VARCHAR2,
partition_num IN NUMBER,
retentiontime IN NUMBER,
parittion_assignment_mode IN NUMBER,
replication_mode IN BINARY_INTEGER);
dbms_aqadm.create_queue(
queue_name IN VARCHAR2,
queue_table IN VARCHAR2,
queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE,
max_retries IN NUMBER DEFAULT NULL,
retry_delay IN NUMBER DEFAULT 0,
retention_time IN NUMBER DEFAULT 0,
dependency_tracking IN BOOLEAN DEFAULT FALSE,
comment IN VARCHAR2 DEFAULT NULL,
auto_commit IN BOOLEAN DEFAULT TRUE); -- deprecated parameter
Creates a queue table for messages of a predefined type
dbms_aqadm.create_queue_table(
queue_table IN VARCHAR2,
queue_payload_type IN VARCHAR2,
storage_clause IN VARCHAR2 DEFAULT NULL,
sort_list IN VARCHAR2 DEFAULT NULL, -- options are priority & enq_time
multiple_consumers IN BOOLEAN DEFAULT FALSE,
message_grouping IN BINARY_INTEGER DEFAULT NONE,
comment IN VARCHAR2 DEFAULT NULL,
auto_commit IN BOOLEAN DEFAULT TRUE, -- deprecated parameter
primary_instance IN BINARY_INTEGER DEFAULT 0,
secondary_instance IN BINARY_INTEGER DEFAULT 0,
compatible IN VARCHAR2 DEFAULT NULL, -- in 11g set to 10.0
non_repudiation IN BINARY_INTEGER DEFAULT 0,
secure IN BOOLEAN DEFAULT FALSE);
replication_mode IN BINARY_INTEGER DEFAULT NULL);
Creates a queue and its queue table for a sharded queue in one step
dbms_aqadm.create_sharded_queue(
queue_name IN VARCHAR2,
storage_clause IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
max_retries IN NUMBER DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL);
Creates a transactional event queue for distribution across multiple RAC nodes
dbms_aqadm.create_transactional_event_queue(
queue_name IN VARCHAR2,
storage_clause IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
max_retries IN NUMBER DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL,
queue_payload_type IN VARCHAR2 DEFAULT JMS_TYPE,
queue_properties IN QUEUE_PROPS_T DEFAULT NULL,
replication_mode IN BINARY_INTEGER DEFAULT NONE);
dbms_aqadm.get_type_info(
schema IN VARCHAR2,
qname IN VARCHAR2,
gettds IN BOOLEAN,
rc OUT BINARY_INTEGER,
toid OUT RAW,
version OUT NUMBER,
tds OUT LONG RAW,
queue_style OUT VARCHAR2,
network_name OUT VARCHAR2);
TBD
Overload 2
dbms_aqadm.get_type_info(
schema IN VARCHAR2,
qname IN VARCHAR2,
gettds IN BOOLEAN,
rc OUT BINARY_INTEGER,
toid OUT RAW,
version OUT NUMBER,
tds OUT LONG RAW);
dbms_aqadm.nonrepudiate_receiver(
queue_name IN VARCHAR2,
msgid IN RAW,
rcver_info IN sys.aq$_agent,
signature OUT sys.aq$_sig_prop,
payload OUT sys.standard.<ADT_1>);
TBD
Non-repudiate receiver of raw payload
Overload 2
dbms_aqadm.nonrepudiate_receiver(
queue_name IN VARCHAR2,
msgid IN RAW,
rcver_info IN sys.aq$_agent,
signature OUT sys.aq$_sig_prop,
payload OUT RAW);
dbms_aqadm.nonrepudiate_sender(
queue_name IN VARCHAR2,
msgid IN RAW,
sender_info IN sys.aq$_agent,
signature OUT sys.aq$_sig_prop,
payload OUT sys.standard.<ADT_1>);
TBD
Non-repudiate sender of raw payload
Overload 2
dbms_aqadm.nonrepudiate_sender(
queue_name IN VARCHAR2,
msgid IN RAW,
sender_info IN sys.aq$_agent,
signature OUT sys.aq$_sig_prop,
payload OUT RAW);
dbms_aqadm.purge_queue_table(
queue_table IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options IN aq$_purge_options_t);
CREATE OR REPLACE PROCEDURE purgeQtable(qtable IN VARCHAR2) AUTHID CURRENT_USER AS
po_t dbms_aqadm.aq$_purge_options_t;
qname VARCHAR2(30);
CURSOR qcur IS
SELECT name
FROM user_queues
WHERE queue_table = UPPER(qtable);
BEGIN
po_t.block := FALSE;
dbms_aqadm.purge_queue_table(USER || '.' || qtable, NULL, po_t);
FOR qrec IN qcur LOOP
qname := qrec.name;
IF INSTR(qname, '$') > 0 THEN
dbms_aqadm.start_queue(qname, enqueue=>FALSE);
ELSE
dbms_aqadm.start_queue(qname);
END IF;
END LOOP;
dbms_utility.compile_schema(USER,compile_all=>FALSE);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('PurgeQTable: Error Starting Queue: '||qname||': '||SQLERRM);
END purgeQtable;
/
dbms_aqadm.recover_propagation(
schema IN VARCHAR2,
queue_name IN VARCHAR2,
destination IN VARCHAR2,
protocol IN BINARY_INTEGER DEFAULT TTC,
url IN VARCHAR2 DEFAULT NULL,
username IN VARCHAR2 DEFAULT NULL,
passwd IN VARCHAR2 DEFAULT NULL,
trace IN BINARY_INTEGER DEFAULT 0,
destq IN BINARY_INTEGER DEFAULT 0);
Schedules propagation of messages from a queue to a destination identified by a specific database link
dbms_aqadm.schedule_propagation(
queue_name IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
start_time IN TIMESTAMP WITH TIMEZONE DEFAULT NULL, -- data type changed in 11g
duration IN NUMBER DEFAULT NULL,
next_time IN VARCHAR2 DEFAULT NULL,
latency IN NUMBER DEFAULT 60,
destination_queue IN VARCHAR2 DEFAULT NULL);
-- Note: The file in /rdbms/admin shows start_time as data type TIMESTAMP WITH TIMEZONE but all_arguments does not.
Verifies that the source and destination queues have identical types
dbms_aqadm.verify_queue_types(
src_queue_name IN VARCHAR2,
dest_queue_name IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
rc OUT BINARY_INTEGER
transformation IN VARCHAR2 DEFAULT NULL);
set serveroutput on
DECLARE
x BINARY_INTEGER;
BEGIN
dbms_aqadm.verify_queue_types('rx_queue', 'finance_queue', 'finance_link', x);
dbms_output.put_line(x);
END;
/
dbms_aqadm.verify_queue_types_get_nrp(
src_queue_name IN VARCHAR2,
dest_queue_name IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
rc OUT BINARY_INTEGER,
transformation IN VARCHAR2 DEFAULT NULL);
dbms_aqadm.verify_queue_types_no_queue(
src_queue_name IN VARCHAR2,
dest_queue_name IN VARCHAR2,
destination IN VARCHAR2 DEFAULT NULL,
rc OUT BINARY_INTEGER,
transformation IN VARCHAR2 DEFAULT NULL);