Skip Headers
Oracle® Database XStream Guide
11g Release 2 (11.2)

Part Number E15874-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
View PDF

3 Configuring XStream

This chapter describes configuring the Oracle Database components that are used by XStream. This chapter also includes sample client applications that communicate with an XStream outbound server and inbound server.

This chapter contains these topics:

See Also:

Configuring XStream Out

An outbound server in an XStream Out configuration streams Oracle database changes to a client application. The client application attaches to the outbound server using the Oracle Call Interface (OCI) or Java interface to receive these changes.

Configuring an outbound server involves creating the Oracle Streams components that send captured database changes to the outbound server. It also involves configuring the outbound server itself, which includes specifying the connect user that the client application will use to attach to the outbound server.

This section contains these topics:

Preparing for XStream Out

This section describes the decisions to make and the tasks to complete to prepare for an XStream Out configuration.

Decide How to Configure the Oracle Streams Components

When you configure XStream Out, you must configure Oracle Streams components to capture database changes and send these changes to the outbound server in the form of logical change records (LCRs). These components include a capture process and at least one queue. The capture process can be a local capture process or a downstream capture process. For some configurations, you must also configure a propagation and an apply process.

Local capture means that a capture process runs on the source database. Downstream capture means that a capture process runs on a database other than the source database. The primary reason to use downstream capture is to reduce the load on the source database, thereby improving its performance. The primary reason to use a local capture is because it is easier to configure and maintain.

The database that captures changes made to the source database is called the capture database. One of the following databases can be the capture database:

  • Source database (local capture)

  • Destination database (downstream capture)

  • A third database (downstream capture)

If the source database or a third database is the capture database, then a propagation sends changes from the capture database to the database running the outbound server. If the database running the outbound server is the capture database, then this propagation between databases is not needed because the capture process and apply process use the same queue.

You can configure the Oracle Streams components in the following ways:

  • Local capture and outbound server on the same database: The database objects, capture process, and outbound server are all in the same database. This configuration is the easiest to configure and maintain because all of the components are contained in one database.

  • Local capture and outbound server on different databases: The database objects and capture process are in one database, and the outbound server is in another database. A propagation sends LCRs from the source database to the outbound server database. This configuration is best when you want easy configuration and maintenance and when you want to optimize the performance of the outbound server database.

  • Downstream capture and outbound server on the same database: The database objects are in one database, and the capture process and outbound server are in another database. This configuration is best when you want to optimize the performance of the database with the database objects and want to offload change capture to another database. With this configuration, most of the components run on the database with the outbound server.

  • Downstream capture and outbound server on different databases: The database objects are in one database, the outbound server is in another database, and the capture process is in a third database. This configuration is best when you want to optimize the performance of both the database with the database objects and the database with the outbound server. With this configuration, the capture process runs on a third database, and a propagation sends LCRs from the capture database to the outbound server database.

If you decide to configure a downstream capture process, then you must decide which type of downstream capture process you want to configure. The following types are available:

  • A real-time downstream capture process configuration means that redo transport services use the log writer process (LGWR) at the source database to send redo data to the downstream database, and a remote file server process (RFS) at the downstream database receives the redo data over the network and stores the redo data in the standby redo log.

  • An archived-log downstream capture process configuration means that archived redo log files from the source database are copied to the downstream database, and the capture process captures changes in these archived redo log files. These log files can be transferred automatically using redo transport services, or they can be transferred manually using a method such as FTP.

The advantage of real-time downstream capture over archived-log downstream capture is that real-time downstream capture reduces the amount of time required to capture changes made at the source database. The time is reduced because the real-time downstream capture process does not need to wait for the redo log file to be archived before it can capture changes from it. You can configure more than one real-time downstream capture process that captures changes from the same source database, but you cannot configure real-time downstream capture for multiple source databases at one downstream database.

The advantage of archived-log downstream capture over real-time downstream capture is that archived-log downstream capture allows downstream capture processes for multiple source databases at a downstream database. You can copy redo log files from multiple source databases to a single downstream database and configure multiple archived-log downstream capture processes to capture changes in these redo log files.

Tasks to Complete Before Configuring XStream Out

Preparing for an XStream Out outbound server is similar to preparing for an Oracle Streams replication environment. The components used in an Oracle Streams replication environment to capture changes and send them to an apply process are the same components used to capture changes and send them to an outbound server. These components include a capture process and one or more queues. If the capture process runs on a different database than the outbound server, then a propagation is also required.

Several of the tasks describe in this section are described in more detail in Oracle Streams Replication Administrator's Guide. This section provides an overview of each task and specific information about completing the task for an XStream Out configuration.

Complete the following tasks before configuring XStream Out:

Configure an Oracle Streams Administrator on All Databases

To configure and manage an XStream configuration, either create a new user with the appropriate privileges or grant these privileges to an existing user. You should not use the SYS or SYSTEM user as an Oracle Streams administrator, and the Oracle Streams administrator should not use the SYSTEM tablespace as its default tablespace. Create an Oracle Streams administrator on each database that is involved in the XStream configuration.

See Also:

Oracle Streams Replication Administrator's Guide for instructions about creating an Oracle Streams administrator
If Required, Configure Network Connectivity and Database Links

Network connectivity and database links are not required when all of the Oracle Streams components run on the same database. These components include the capture process, queue, and outbound server.

You must configure network connectivity and database links if you decided to configure XStream in either of the following ways:

  • The capture process and the outbound server will run on different databases.

  • Downstream capture will be used.

See "Decide How to Configure the Oracle Streams Components" for more information about these decisions.

If network connectivity is required, then configure your network and Oracle Net so that the databases can communicate with each other.

The following database links are required:

  • When the capture process runs on a different database than the outbound server, create a database link from the capture database to the outbound server database. A propagation uses this database link to send changes from the capture database to the outbound server database.

  • When you use downstream capture, create a database link from the capture database to the source database. The source database is the database that generates the redo data that the capture process uses to capture changes. The capture process uses this database link to perform administrative tasks at the source database.

The name of each database link must match the global name of the destination database, and each database link should be created in the Oracle Streams administrator's schema.

For example, if the global name of the source database is dbs1.example.com, the global name of the destination database is dbs2.example.com, and the Oracle Streams administrator is strmadmin at each database, then the following statement creates the database link from dbs1.example.com to dbs2.example.com:

CONNECT strmadmin@dbs1.example.com
Enter password: password

CREATE DATABASE LINK dbs2.example.com CONNECT TO strmadmin 
   IDENTIFIED BY password USING 'dbs2.example.com';

See Also:

Ensure That Each Source Database Is in ARCHIVELOG Mode

Each source database that generates changes that will be captured by a capture process must be in ARCHIVELOG mode. For downstream capture processes, the downstream database also must run in ARCHIVELOG mode if you plan to configure a real-time downstream capture process. The downstream database does not need to run in ARCHIVELOG mode if you plan to run only archived-log downstream capture processes on it.

If you are configuring Oracle Streams in an Oracle Real Application Clusters (Oracle RAC) environment, then the archive log files of all threads from all instances must be available to any instance running a capture process. This requirement pertains to both local and downstream capture processes.

See Also:

Oracle Database Administrator's Guide for instructions about running a database in ARCHIVELOG mode
Set Initialization Parameters Relevant to Oracle Streams

Some initialization parameters are important for the configuration, operation, reliability, and performance of Oracle Streams components. Set these parameters appropriately.

Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream outbound servers:

  • Ensure that the PROCESSES initialization parameter is set to a value large enough to accommodate the outbound server background processes and all of the other Oracle Database background processes.

  • Ensure that the SESSIONS initialization parameter is set to a value large enough to accommodate the sessions used by the outbound server background processes and all of the other Oracle Database sessions.

Configure the Oracle Streams Pool

The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that is used by Oracle Streams. The Oracle Streams pool stores buffered queue messages in memory, and it provides memory for capture processes and outbound servers. The Oracle Streams pool always stores LCRs captured by a capture process, and it stores LCRs and messages that are enqueued into a buffered queue by applications. Ensure that there is enough space in the Oracle Streams pool at each database to store LCRs and run the Oracle Streams components properly.

An outbound server requires 1 MB for each outbound server. The Oracle Streams pool is initialized the first time an inbound server is started.

See Also:

Oracle Streams Replication Administrator's Guide for information about Oracle Streams pool requirements
If Required, Configure Log File Transfer to a Downstream Database

If you decided to use a local capture process, then log file transfer is not required. However, if you decided to use downstream capture that uses redo transport services to transfer archived redo log files to the downstream database automatically, then configure log file transfer from the source database to the capture database. See "Decide How to Configure the Oracle Streams Components" for information about this decision.

If Required, Add Standby Redo Logs for Real-Time Downstream Capture

If you decided to configure real-time downstream capture, then add standby redo logs to the capture database. See "Decide How to Configure the Oracle Streams Components" for information about this decision.

Configuring an XStream Outbound Server

You can create an outbound server using the following procedures in the DBMS_XSTREAM_ADM package:

  • The CREATE_OUTBOUND procedure creates an outbound server, a queue, and a capture process on a single database with one procedure call.

  • The ADD_OUTBOUND procedure only creates an outbound server. You must create the capture process and queue separately, and they must exist before you run the ADD_OUTBOUND procedure. You can configure the capture process on the same database as the outbound server or on a different database.

In both cases, you must create the client application that communicates with the outbound server and receives LCRs from the outbound server.

If you require multiple outbound servers, then you can use the CREATE_OUTBOUND procedure to create the capture process that captures database changes for the first outbound server. Next, you can run the ADD_OUTBOUND procedure to add additional outbound servers that receive the same captured changes. The capture process can reside on the same database as the outbound servers or on a different database.

This section contains these topics:

Configuring an Outbound Server, Queue, and Capture Process

The CREATE_OUTBOUND procedure in the DBMS_XSTREAM_ADM package creates a capture process, queue, and outbound server on a single database. Both the capture process and the outbound server use the queue created by the procedure. When you run the procedure, you provide the name of the new outbound server, while the procedure generates a name for the capture process and queue.

The instructions in this section can only set up the configuration described in "local capture and outbound server on the same database" in "Decide How to Configure the Oracle Streams Components". If you want all of the Oracle Streams components to run on the same database, then the CREATE_OUTBOUND procedure is the fastest and easiest way to create an outbound server.

To create an outbound server using the CREATE_OUTBOUND procedure:

  1. Complete the tasks described in "Tasks to Complete Before Configuring XStream Out".

  2. In SQL*Plus, connect to the database as the Oracle Streams administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  3. Run the CREATE_OUTBOUND procedure.

    For example, assume that you want to configure XStream Out in the following way:

    • The name of the outbound server is xout.

    • Data manipulation language (DML) and data definition language (DDL) changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

    • DML and DDL changes made to the hr schema are sent to the outbound server.

    Given these assumptions, run the following CREATE_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
        server_name     =>  'xout',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    Running this procedure performs the following actions:

    • Configures supplemental logging for the oe.orders and oe.order_items tables and for all of the tables in the hr schema.

    • Creates a queue with a system-generated name that is used by the capture process and the outbound server.

    • Creates and starts a capture process with a system-generated name with rule sets that instruct it to capture DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema.

    • Creates and starts an outbound server named xout with rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the Oracle Streams administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    To capture and send all database changes to the outbound server, specify NULL (the default) for the table_names and schema_names parameters.
  4. Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

  5. To add one or more additional outbound servers that receive LCRs from the capture process created in Step 3, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".

Configuring an Outbound Server

The ADD_OUTBOUND procedure in the DBMS_XSTREAM_ADM package creates an outbound server. This procedure does not create the capture process or the queue. You must configure these components manually.

The instructions in this section can set up any of the configurations described in "Decide How to Configure the Oracle Streams Components". However, if you chose "local capture and outbound server on the same database," then it is usually easier to use the CREATE_OUTBOUND procedure to configure all of the components simultaneously. See "Configuring an Outbound Server, Queue, and Capture Process".

To create an outbound server using the ADD_OUTBOUND procedure:

  1. Complete the tasks described in "Tasks to Complete Before Configuring XStream Out".

  2. In SQL*Plus, connect to the database that will run the capture process (the capture database) as the Oracle Streams administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  3. Create the queue that will be used by the capture process.

    See Oracle Streams Replication Administrator's Guide for instructions.

  4. Create the capture process.

    See Oracle Streams Replication Administrator's Guide for instructions.

  5. Connect to the source database. The source database is the database that contains the database objects for which the capture process will capture changes.

  6. Ensure that required supplemental logging is specified for the database objects at the source database. Supplemental logging is required for the database objects for which the capture process will capture changes.

    If the capture database and the source database are the same database, then supplemental logging might have been specified during capture process creation.

    Ensure that the following supplemental logging is specified at the source database:

    • Any columns at the source database that are used in a primary key in tables for which changes are process by the outbound server must be unconditionally logged in a log group or by database supplemental logging of primary key columns.

    • Any columns at the source database that are used by a rule or a rule-based transformation must be unconditionally logged.

    See Oracle Streams Replication Administrator's Guide for instructions about specifying supplemental logging.

  7. Connect to the database that will run the outbound server as the Oracle Streams administrator.

  8. Create the queue that will be used by the outbound server.

    See Oracle Streams Replication Administrator's Guide for instructions.

  9. Run the ADD_OUTBOUND procedure.

    For example, assume that you want to configure XStream Out in the following way:

    • The name of the outbound server is xout.

    • The queue used by the outbound server is strmadmin.streams_queue.

    • The source database is db1.example.com.

    • Data manipulation language (DML) and data definition language (DDL) changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

    • DML and DDL changes made to the hr schema are sent to the outbound server.

    Given these assumptions, run the following ADD_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.ADD_OUTBOUND(
        server_name     =>  'xout',
        queue_name      =>  'strmadmin.streams_queue',
        source_database =>  'db1.example.com',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    Running this procedure performs the following actions:

    • Creates an outbound server named xout. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application. The rules specify that these changes must have originated at the db1.example.com database. The outbound server dequeues LCRs from the queue strmadmin.streams_queue.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the Oracle Streams administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    For the outbound server to receive all of the LCRs sent by the capture process, specify NULL (the default) for the table_names and schema_names parameters.
  10. Connect to the capture database as the Oracle Streams administrator.

  11. Create the propagation that sends logical change records (LCRs) from the capture process's queue on the local database to the queue used by the outbound server on the outbound server database.

    See Oracle Streams Replication Administrator's Guide for instructions.

  12. Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

  13. Start the outbound server. For example:

    exec DBMS_APPLY_ADM.START_APPLY('xout');
    
  14. Start the capture process created in Step 4.

    See Oracle Streams Concepts and Administration for instructions.

  15. To add one or more additional outbound servers that receive LCRs from the capture process created in Step 4, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".

Adding an Additional Outbound Server to a Capture Process Stream

XStream Out configurations often require multiple outbound servers that process a stream of logical change records (LCRs) from a single capture process. This section describes adding an additional outbound server to a database that already includes at least one outbound server. The additional outbound server uses the same queue as another outbound server to receive the LCRs from the capture process.

Before completing the steps in this section, configure an XStream Out environment that includes at least one outbound server. The following sections describe configuring and XStream Out environment:

When an XStream Out environment exists, use the ADD_OUTBOUND procedure in the DBMS_XSTREAM_ADM package to add another outbound server to a capture process stream.

To add another outbound server to a capture process stream using the ADD_OUTBOUND procedure:

  1. In SQL*Plus, connect to the database that will run the additional outbound server as the Oracle Streams administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Determine the name of the queue used by an existing outbound server that receives LCRs from the capture process.

    Run the "Displaying General Information About an Outbound Server" query in to determine the owner and name of the queue. This query also shows the name of the capture process and the source database name. For this example, assume that the name of the queue is streams_queue and that the owner is strmadmin.

  3. Run the ADD_OUTBOUND procedure.

    For example, assume that you want to configure the additional outbound server in the following way:

    • The name of the outbound server is xout2.

    • Data manipulation language (DML) and data definition language (DDL) changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

    • DML and DDL changes made to the hr schema are sent to the outbound server.

    Given these assumptions, run the following ADD_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.ADD_OUTBOUND(
        server_name     =>  'xout2',
        queue_name      =>  'strmadmin.streams_queue',
        source_database =>  'db1.example.com',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    Running this procedure performs the following actions:

    • Creates an outbound server named xout2. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application. The rules specify that these changes must have originated at the db1.example.com database. The outbound server dequeues LCRs from the queue strmadmin.streams_queue.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the Oracle Streams administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    For the outbound server to receive all of the LCRs sent by the capture process, specify NULL (the default) for the table_names and schema_names parameters.
  4. If a client application does not already exist, then create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

Configuring XStream In

An inbound server in an XStream In configuration receives a stream of changes from a client application. The inbound server can apply these changes to database objects in an Oracle database, or it can process the changes in a customized way. A client application can attach to an inbound server and send row changes and DDL changes encapsulated in logical change records (LCRs) using the OCI or Java interface.

Tasks to Complete Before Configuring XStream In

Complete the following tasks before configuring XStream In:

Configure an Oracle Streams Administrator

To configure and manage an XStream configuration, either create a new user with the appropriate privileges or grant these privileges to an existing user. You should not use the SYS or SYSTEM user as an Oracle Streams administrator, and the Oracle Streams administrator should not use the SYSTEM tablespace as its default tablespace. Create an Oracle Streams administrator on the database that will run the XStream inbound server.

See Also:

Oracle Streams Replication Administrator's Guide for instructions about creating an Oracle Streams administrator

Set Initialization Parameters Relevant to Oracle Streams

Some initialization parameters are important for the configuration, operation, reliability, and performance of XStream inbound servers. Set these parameters appropriately.

Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream inbound servers:

  • Ensure that the PROCESSES initialization parameter is set to a value large enough to accommodate the inbound server background processes and all of the other Oracle Database background processes.

  • Ensure that the SESSIONS initialization parameter is set to a value large enough to accommodate the sessions used by the inbound server background processes and all of the other Oracle Database sessions.

Configure the Oracle Streams Pool

The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that provides memory for inbound servers. Ensure that there is enough space in the Oracle Streams pool for the inbound server to run properly. An inbound server requires 1 MB for each inbound server parallelism. For example, if parallelism is set to 4 for an inbound server, then at least 4 MB is required for the inbound server. The Oracle Streams pool also must have enough space to store LCRs before they are applied. The Oracle Streams pool is initialized the first time an inbound server is started.

Configuring an XStream Inbound Server

The CREATE_INBOUND procedure in the DBMS_XSTREAM_ADM package creates an inbound server. You must create the client application that communicates with the inbound server and sends LCRs to the inbound server.

To create an inbound server:

  1. Complete the tasks described in "Tasks to Complete Before Configuring XStream In".

  2. In SQL*Plus, connect to the database that will run the inbound server as the Oracle Streams administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  3. Run the CREATE_INBOUND procedure.

    For example, the following CREATE_INBOUND procedure configures inbound server named xin:

    BEGIN
      DBMS_XSTREAM_ADM.CREATE_INBOUND(
        server_name => 'xin',
        queue_name  => 'xin_queue');
    END;
    /
    

    Running this procedure performs the following actions:

    • Creates an inbound server named xin.

    • Sets the queue with the name xin_queue as the inbound server's queue, and creates this queue if it does not exist. This queue does not store LCRs sent by the client application. Instead, it stores error transactions if an LCR raises an error. The current user is the queue owner. In this example, the current user is the Oracle Streams administrator.

    • Sets the current user as the apply user for the inbound server. In this example, the current user is the Oracle Streams administrator. The client application must connect to the database as the apply user to interact with the inbound server.

    Tip:

    By default, an inbound server does not use rules or rule sets. Therefore, it processes all LCRs sent to it by the client application. To add rules and rule sets, use the DBMS_STREAMS_ADM package or the DBMS_RULE_ADM package. See Oracle Streams Concepts and Administration.
  4. Create and run the client application that will connect to the inbound server and send LCRs to it. See "Sample XStream Client Application" for a sample application.

  5. If necessary, create apply handlers for the inbound server.

    Apply handlers are optional. Apply handlers process LCRs sent to an inbound server in a customized way.

  6. Start the inbound server. For example:

    exec DBMS_APPLY_ADM.START_APPLY('xin');
    

Sample XStream Client Application

This section contains a sample XStream client application. This application illustrates the basic tasks that are required of an XStream Out and XStream In application.

The application performs the following tasks:

In an XStream Out configuration that does not send LCRs to an inbound server, the client application must obtain the processed low position in another way.

This application waits indefinitely for transactions from the outbound server. To interrupt the application, enter control-C. If the program is restarted, then the outbound server starts sending LCRs from the processed low position that was set during the previous run.

Figure 3-1 provides an overview of the XStream environment configured in this section.

Figure 3-1 Sample XStream Configuration

Description of Figure 3-1 follows
Description of "Figure 3-1 Sample XStream Configuration"

Before running the sample application, ensure that the following components exist:

The sample applications in the following sections perform the same tasks. One sample application uses the OCI API, and the other uses the Java API.

Note:

An Oracle Database installation includes several XStream demos. These demos are in the following location:
$ORACLE_HOME/rdbms/demo/xstream

Sample XStream Client Application for the Oracle Call Interface API

To run the sample XStream client application for the Oracle Call Interface API, compile and link the application file, and enter the following on a command line:

xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass 
-ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass

where:

  • xout_name is the name of the outbound server

  • sn_xout_db is the service name for the outbound server's database

  • xout_cu is the outbound server's connect user

  • xout_cu_pass is the password for the outbound server's connect user

  • xin_name is the name of the inbound server

  • sn_xin_db is the service name for the inbound server's database

  • xin_au is the inbound server's apply user

  • xin_au_pass is the password for the inbound server's apply user

When the sample client application is running, it prints information about the row logical change records (LCRs) it is processing. The output looks similar to the following:

----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=17.0.74
  owner=HR oname=COUNTRIES 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=COMMIT txid=17.0.74
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 

This output contains the following information for each row LCR:

  • src_db_name shows the source database for the change encapsulated in the row LCR.

  • cmd_type shows the type of data manipulation language statement that made the change.

  • txid shows the transaction ID of the transaction that includes the row LCR.

  • owner shows the owner of the database object that was changed.

  • oname shows the name of the database object that was changed.

This demo is available in the following location:

$ORACLE_HOME/rdbms/demo/xstream/oci

The file name for the demo is xio.c. See the README.txt file in the demo directory for more information about compiling and running the application.

The code for the sample application that uses the Oracle Call Interface API follows:

*/
 
#ifndef OCI_ORACLE
#include <oci.h>
#endif
 
#ifndef _STDIO_H
#include <stdio.h>
#endif
 
#ifndef _STDLIB_H
#include <stdlib.h>
#endif
 
#ifndef _STRING_H
#include <string.h>
#endif
 
#ifndef _MALLOC_H
#include <malloc.h>
#endif
 
/*---------------------------------------------------------------------- 
 *           Internal structures
 *----------------------------------------------------------------------*/ 
 
#define M_DBNAME_LEN    (128)
 
typedef struct conn_info                                     /* connect info */
{
  oratext * user;
  ub4       userlen;
  oratext * passw;
  ub4       passwlen;
  oratext * dbname;
  ub4       dbnamelen;
  oratext * svrnm;
  ub4       svrnmlen;
} conn_info_t;
 
typedef struct params
{
  conn_info_t  xout;                                        /* outbound info */
  conn_info_t  xin;                                          /* inbound info */
} params_t;
 
typedef struct oci                                            /* OCI handles */
{
  OCIEnv      *envp;                                   /* Environment handle */
  OCIError    *errp;                                         /* Error handle */
  OCIServer   *srvp;                                        /* Server handle */
  OCISvcCtx   *svcp;                                       /* Service handle */
  OCISession  *authp;
  OCIStmt    *stmtp;        
  boolean     attached;
  boolean     outbound;
} oci_t;
 
static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid,
                       ub2 nchar_csid);
static void disconnect_db(oci_t * ocip);
static void ocierror(oci_t * ocip, char * msg);
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound);
static void detach(oci_t *ocip);
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip);
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip);
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel);
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty);
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv);
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid);
static void set_client_charset(oci_t *outbound_ocip);
 
#define OCICALL(ocip, function) do {\
sword status=function;\
if (OCI_SUCCESS==status) break;\
else if (OCI_ERROR==status) \
{ocierror(ocip, (char *)"OCI_ERROR");\
exit(1);}\
else {printf("Error encountered %d\n", status);\
exit(1);}\
} while(0)
 
/*---------------------------------------------------------------------
 *                M A I N   P R O G R A M
 *---------------------------------------------------------------------*/
main(int argc, char **argv)
{
  /* Outbound and inbound connection info */
  conn_info_t   xout_params;
  conn_info_t   xin_params;
  oci_t        *xout_ocip = (oci_t *)NULL;
  oci_t        *xin_ocip = (oci_t *)NULL;
  ub2           obdb_char_csid = 0;                 /* outbound db char csid */
  ub2           obdb_nchar_csid = 0;               /* outbound db nchar csid */
 
  /* parse command line arguments */
  get_inputs(&xout_params, &xin_params, argc, argv); 
 
  /* Get the outbound database CHAR and NCHAR character set info */
  get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid);
 
  /* Connect to the outbound db and set the client env to the outbound charsets
   * to minimize character conversion when transferring LCRs from outbound 
   * directly to inbound server. 
   */
  connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to outbound server */
  attach(xout_ocip, &xout_params, TRUE);
 
  /* connect to inbound db and set the client charsets the same as the 
   * outbound db charsets.
   */
  connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to inbound server */
  attach(xin_ocip, &xin_params, FALSE);
 
  /* Get lcrs from outbound server and send to inbound server */
  get_lcrs(xin_ocip, xout_ocip);
 
  /* Detach from XStream servers */
  detach(xout_ocip);
  detach(xin_ocip);
 
  /* Disconnect from both databases */
  disconnect_db(xout_ocip);
  disconnect_db(xin_ocip);
 
  free(xout_ocip);
  free(xin_ocip);
  exit (0);
}
 
/*---------------------------------------------------------------------
 * connect_db - Connect to the database and set the env to the given
 * char and nchar character set ids. 
 *---------------------------------------------------------------------*/
static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid,
                ub2 nchar_csid)
{
  oci_t        *ocip;
 
  printf ("Connect to Oracle as %.*s@%.*s ",
          params_p->userlen, params_p->user, 
          params_p->dbnamelen, params_p->dbname);
       
  if (char_csid && nchar_csid)
    printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid);
 
  printf("\n");
 
  ocip = (oci_t *)malloc(sizeof(oci_t));
 
  if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0, char_csid, nchar_csid))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  /* Logon to database */
  OCICALL(ocip,
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  /* allocate the server handle */
  OCICALL(ocip,
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp,
                         OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  if (*ociptr == (oci_t *)NULL)
  {
    *ociptr = ocip;
  }
}
 
/*---------------------------------------------------------------------
 * get_db_charsets - Get the database CHAR and NCHAR character set ids.
 *---------------------------------------------------------------------*/
static const oratext GET_DB_CHARSETS[] =  \
 "select parameter, value from nls_database_parameters where parameter = \
 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'";
 
#define PARM_BUFLEN      (30)
 
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid)
{
  OCIDefine  *defnp1 = (OCIDefine *) NULL;
  OCIDefine  *defnp2 = (OCIDefine *) NULL;
  oratext     parm[PARM_BUFLEN];
  oratext     value[OCI_NLS_MAXBUFSZ];
  ub2         parm_len = 0;
  ub2         value_len = 0;
  oci_t       ocistruct; 
  oci_t      *ocip = &ocistruct;
   
  *char_csid = 0;
  *nchar_csid = 0;
  memset (ocip, 0, sizeof(ocistruct));
 
  if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  OCICALL(ocip, 
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  /* Execute stmt to select the db nls char and nchar character set */ 
  OCICALL(ocip, 
          OCIStmtPrepare(ocip->stmtp, ocip->errp,
                         (CONST text *)GET_DB_CHARSETS,
                         (ub4)strlen((char *)GET_DB_CHARSETS),
                         (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp1,
                         ocip->errp, (ub4) 1, parm,
                         PARM_BUFLEN, SQLT_CHR, (void*) 0,
                         &parm_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp2,
                         ocip->errp, (ub4) 2, value,
                         OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0,
                         &value_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip, 
          OCIStmtExecute(ocip->svcp, ocip->stmtp, 
                         ocip->errp, (ub4)0, (ub4)0, 
                         (const OCISnapshot *)0,
                         (OCISnapshot *)0, (ub4)OCI_DEFAULT));
 
  while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1,
                      OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS)
  {
    value[value_len] = '\0';
    if (parm_len == strlen("NLS_CHARACTERSET") &&
        !memcmp(parm, "NLS_CHARACTERSET", parm_len))
    {
      *char_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *char_csid);
    }
    else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") &&
             !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len))
    {
      *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *nchar_csid);
    }
  }
 
  disconnect_db(ocip);
}
 
/*---------------------------------------------------------------------
 * attach - Attach to XStream server specified in connection info
 *---------------------------------------------------------------------*/
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound)
{
  sword       err;
 
  printf ("Attach to XStream %s server '%.*s'\n", 
          outbound ? "outbound" : "inbound",
          conn->svrnmlen, conn->svrnm);
 
  if (outbound)
  {
    OCICALL(ocip, 
            OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm,
                              (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, 
            OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm,
                               (ub2)conn->svrnmlen, 
                               (oratext *)"From_XOUT", 9,
                               (ub1 *)0, 0, OCI_DEFAULT));
  }
 
  ocip->attached = TRUE;
  ocip->outbound = outbound;
}
 
/*---------------------------------------------------------------------
 * ping_svr - Ping inbound server by sending a commit LCR.
 *---------------------------------------------------------------------*/
static void ping_svr(oci_t *xin_ocip, void *commit_lcr,
                     ub1 *cmtpos, ub2 cmtpos_len, 
                     oratext *source_db, ub2 source_db_len)
{
  OCIDate     src_time;
  oratext     txid[128];
 
  OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time));
  sprintf(txid, "Ping %2d:%2d:%2d",
          src_time.OCIDateTime.OCITimeHH,
          src_time.OCIDateTime.OCITimeMI,
          src_time.OCIDateTime.OCITimeSS);
 
  /* Initialize LCR with new txid and commit position */
  OCICALL(xin_ocip,
          OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp,
                          source_db, source_db_len,
                          (oratext *)OCI_LCR_ROW_CMD_COMMIT,
                          (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT),
                          (oratext *)0, 0,                     /* null owner */
                          (oratext *)0, 0,                    /* null object */
                          (ub1 *)0, 0,                           /* null tag */
                          txid, (ub2)strlen((char *)txid),
                          &src_time, cmtpos, cmtpos_len,
                          0, commit_lcr, OCI_DEFAULT));
 
  /* Send commit lcr to inbound server. */
  if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr,
                          OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR)
  {
    ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()");
  }
}
 
/*---------------------------------------------------------------------
 * get_lcrs - Get LCRs from outbound server and send to inbound server.
 *---------------------------------------------------------------------*/
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip)
{
  sword       status = OCI_SUCCESS;
  void       *lcr;
  ub1         lcrtype;
  oraub8      flag;
  ub1         proclwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         proclwm_len = 0;
  ub1         sv_pingpos[OCI_LCR_MAX_POSITION_LEN];
  ub2         sv_pingpos_len = 0;
  ub1         fetchlwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         fetchlwm_len = 0;
  void       *commit_lcr = (void *)0;
  oratext    *lcr_srcdb = (oratext *)0;
  ub2         lcr_srcdb_len = 0;
  oratext     source_db[M_DBNAME_LEN];
  ub2         source_db_len = 0;
  ub4         lcrcnt = 0;
 
  /* create an lcr to ping the inbound server periodically by sending a
   * commit lcr.
   */
  commit_lcr = (void*)0;
  OCICALL(xin_ocip,
          OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION,
                    OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT));
 
  while (status == OCI_SUCCESS)
  {
    lcrcnt = 0;                         /* reset lcr count before each batch */
 
    while ((status = 
                OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp,
                                        &lcr, &lcrtype, &flag, 
                                        fetchlwm, &fetchlwm_len, OCI_DEFAULT))
                                               == OCI_STILL_EXECUTING)
    {
      lcrcnt++;
 
      /* print header of LCR just received */
      print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len);
 
      /* save the source db to construct ping lcr later */
      if (!source_db_len && lcr_srcdb_len)
      {
        memcpy(source_db, lcr_srcdb, lcr_srcdb_len);
        source_db_len = lcr_srcdb_len;
      }
      
      /* send the LCR just received */
      if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, 
                              lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR)
      {
        ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed");
      }
 
      /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */
      if (flag & OCI_XSTREAM_MORE_ROW_DATA)
      {
        /* receive and send chunked columns */
        get_chunks(xin_ocip, xout_ocip); 
      }
    }
 
    if (status == OCI_ERROR)
      ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed");
 
    /* clear the saved ping position if we just received some new lcrs */
    if (lcrcnt)
    {
      sv_pingpos_len = 0;
    }
 
    /* If no lcrs received during previous WHILE loop and got a new fetch 
     * LWM then send a commit lcr to ping the inbound server with the new
     * fetch LWM position.
     */
    else if (fetchlwm_len > 0 && source_db_len > 0 &&
        (fetchlwm_len != sv_pingpos_len ||
         memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) 
    {
      /* To ensure we don't send multiple lcrs with duplicate position, send
       * a new ping only if we have saved the last ping position.
       */
      if (sv_pingpos_len > 0)
      {     
        ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len,
                 source_db, source_db_len); 
      }
 
      /* save the position just sent to inbound server */
      memcpy(sv_pingpos, fetchlwm, fetchlwm_len);
      sv_pingpos_len = fetchlwm_len;
    }
 
    /* flush inbound network to flush all lcrs to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT));
 
    
    /* get processed LWM of inbound server */   
    OCICALL(xin_ocip, 
            OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp,
                                        proclwm, &proclwm_len, OCI_DEFAULT));
 
    if (proclwm_len > 0)
    {
      /* Set processed LWM for outbound server */
      OCICALL(xout_ocip, 
              OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, 
                                           proclwm, proclwm_len, OCI_DEFAULT));
    }
  }
  
  if (status != OCI_SUCCESS)
    ocierror(xout_ocip, (char *)"get_lcrs() encounters error");
}
 
/*---------------------------------------------------------------------
 * get_chunks - Get each chunk for the current LCR and send it to 
 *              the inbound server.
 *---------------------------------------------------------------------*/
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip)
{
  oratext *colname;
  ub2      colname_len;
  ub2      coldty;
  oraub8   col_flags;
  ub2      col_csid;
  ub4      chunk_len;
  ub1     *chunk_ptr;
  oraub8   row_flag;
  sword    err;
  sb4      rtncode;
 
  do
  {
    /* Get a chunk from outbound server */
    OCICALL(xout_ocip,
            OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, 
                                      &colname, &colname_len, &coldty, 
                                      &col_flags, &col_csid, &chunk_len, 
                                      &chunk_ptr, &row_flag, OCI_DEFAULT));
   
    /* print chunked column info */
    printf(
      "  Chunked column name=%.*s DTY=%d  chunk len=%d csid=%d col_flag=0x%x\n",
      colname_len, colname, coldty, chunk_len, col_csid, col_flags);
 
    /* print chunk data */
    print_chunk(chunk_ptr, chunk_len, coldty);
 
    /* Send the chunk just received to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname,
                                  colname_len, coldty, col_flags,
                                  col_csid, chunk_len, chunk_ptr,
                                  row_flag, OCI_DEFAULT));
 
  } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA);
}
 
/*---------------------------------------------------------------------
 * print_chunk - Print chunked column information. Only print the first
 *               50 bytes for each chunk.
 *---------------------------------------------------------------------*/
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty)
{
#define MAX_PRINT_BYTES     (50)          /* print max of 50 bytes per chunk */
 
  ub4  print_bytes;
 
  if (chunk_len == 0)
    return;
 
  print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len;
 
  printf("  Data = ", chunk_len);
  if (dty == SQLT_CHR)
    printf("%.*s", print_bytes, chunk_ptr);
  else
  {
    ub2  idx;
 
    for (idx = 0; idx < print_bytes; idx++)
      printf("%02x", chunk_ptr[idx]);
  }
  printf("\n");
}
 
/*---------------------------------------------------------------------
 * print_lcr - Print header information of given lcr.
 *---------------------------------------------------------------------*/
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel)
{
  oratext     *cmd_type;
  ub2          cmd_type_len;
  oratext     *owner;
  ub2          ownerl;
  oratext     *oname;
  ub2          onamel;
  oratext     *txid;
  ub2          txidl;
  sword        ret;
 
  printf("\n ----------- %s LCR Header  -----------------\n",
         lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW");
 
  /* Get LCR Header information */
  ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, 
                        src_db_name, src_db_namel,              /* source db */
                        &cmd_type, &cmd_type_len,            /* command type */
                        &owner, &ownerl,                       /* owner name */
                        &oname, &onamel,                      /* object name */
                        (ub1 **)0, (ub2 *)0,                      /* lcr tag */
                        &txid, &txidl, (OCIDate *)0,   /* txn id  & src time */
                        (ub2 *)0, (ub2 *)0,              /* OLD/NEW col cnts */
                        (ub1 **)0, (ub2 *)0,                 /* LCR position */
                        (oraub8*)0, lcrp, OCI_DEFAULT);
 
  if (ret != OCI_SUCCESS)
    ocierror(ocip, (char *)"OCILCRHeaderGet failed");
  else
  {
    printf("  src_db_name=%.*s\n  cmd_type=%.*s txid=%.*s\n",
           *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid );
 
    if (ownerl > 0)
      printf("  owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname);
  } 
}
 
/*---------------------------------------------------------------------
 * detach - Detach from XStream server
 *---------------------------------------------------------------------*/
static void detach(oci_t * ocip)
{
  sword  err = OCI_SUCCESS;
 
  printf ("Detach from XStream %s server\n",
          ocip->outbound ? "outbound" : "inbound" );
 
  if (ocip->outbound)
  {
    OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, 
                                     (ub1 *)0, (ub2 *)0,    /* processed LWM */
                                     OCI_DEFAULT));
  }
}
 
/*---------------------------------------------------------------------
 * disconnect_db  - Logoff from the database
 *---------------------------------------------------------------------*/
static void disconnect_db(oci_t * ocip)
{
  if (OCILogoff(ocip->svcp, ocip->errp))
  {
    ocierror(ocip, (char *)"OCILogoff() failed");
  }
 
  if (ocip->errp)
    OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR);
 
  if (ocip->envp)
    OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV);
}
 
/*---------------------------------------------------------------------
 * ocierror - Print error status and exit program
 *---------------------------------------------------------------------*/
static void ocierror(oci_t * ocip, char * msg)
{
  sb4 errcode=0;
  text bufp[4096];
 
  if (ocip->errp)
  {
    OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode,
                bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR);
    printf("%s\n%s", msg, bufp);
  }
  else
    puts(msg);
 
  printf ("\n");
  exit(1);
}
 
/*--------------------------------------------------------------------
 * print_usage - Print command usage
 *---------------------------------------------------------------------*/
static void print_usage(int exitcode)
{
  puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" 
         "           -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" 
         "           -ib_svr <inbound_svr> -ib_db <inbound_db>\n"
         "           -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n");
  puts("  ob_svr  : outbound server name\n"
       "  ob_db   : database name of outbound server\n"
       "  ob_usr  : connect user to outbound server\n"
       "  ob_pwd  : password of outbound's connect user\n"
       "  ib_svr  : inbound server name\n"
       "  ib_db   : database name of inbound server\n"
       "  ib_usr  : apply user for inbound server\n"
       "  ib_pwd  : password of inbound's apply user\n");
 
  exit(exitcode);
}
 
/*--------------------------------------------------------------------
 * get_inputs - Get user inputs from command line
 *---------------------------------------------------------------------*/
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv)
{
  char * option;
  char * value;
 
  memset (xout_params, 0, sizeof(*xout_params));
  memset (xin_params, 0, sizeof(*xin_params));
  while(--argc)
  {
    /* get the option name */
    argv++;
    option = *argv;
 
    /* check that the option begins with a "-" */
    if (!strncmp(option, (char *)"-", 1))
    {
      option ++;
    }
    else
    {
      printf("Error: bad argument '%s'\n", option);
      print_usage(1);
    }
 
    /* get the value of the option */
    --argc;
    argv++;
 
    value = *argv;    
 
    if (!strncmp(option, (char *)"ob_db", 5))
    {
      xout_params->dbname = (oratext *)value;
      xout_params->dbnamelen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_usr", 6))
    {
      xout_params->user = (oratext *)value;
      xout_params->userlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_pwd", 6))
    {
      xout_params->passw = (oratext *)value;
      xout_params->passwlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_svr", 6))
    {
      xout_params->svrnm = (oratext *)value;
      xout_params->svrnmlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_db", 5))
    {
      xin_params->dbname = (oratext *)value;
      xin_params->dbnamelen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_usr", 6))
    {
      xin_params->user = (oratext *)value;
      xin_params->userlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_pwd", 6))
    {
      xin_params->passw = (oratext *)value;
      xin_params->passwlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_svr", 6))
    {
      xin_params->svrnm = (oratext *)value;
      xin_params->svrnmlen = strlen(value);
    }
    else
    {
      printf("Error: unknown option '%s'.\n", option);
      print_usage(1);
    }
  }
 
  /* print usage and exit if any argument is not specified */
  if (!xout_params->svrnmlen || !xout_params->passwlen || 
      !xout_params->userlen || !xout_params->dbnamelen ||
      !xin_params->svrnmlen || !xin_params->passwlen || 
      !xin_params->userlen || !xin_params->dbnamelen)
  {
    printf("Error: missing command arguments. \n");
    print_usage(1);
  }
}

Sample XStream Client Application for the Java API

To run the sample XStream client application for the Oracle Call Interface API, compile and link the application file, and enter the following on a command line:

java xio xsin_oraclesid xsin_host xsin_port xsin_username 
xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port 
xsout_username xsout_passwd xsout_servername

where:

  • xsin_oraclesid is the name Oracle SID of the inbound server's database

  • xsin_host is the host name of the computer system running the inbound server

  • xsin_port is the port number of the listener for the inbound server's database

  • xsin_username is the inbound server's apply user

  • xsin_passwd is the password for the inbound server's apply user

  • xin_servername is the name of the inbound server

  • xsout_oraclesid is the name Oracle SID of the outbound server's database

  • xsout_host is the host name of the computer system running the outbound server

  • xsout_port is the port number of the listener for the outbound server's database

  • xsout_username is the outbound server's connect user

  • xsout_passwd is the password for the outbound server's connect user

  • xsout_servername is the name of the outbound server

When the sample client application is running, it prints information attaching to the inbound server and outbound server, along with the last position for each server. The output looks similar to the following:

Attached to inbound server:xin
Inbound Server Last Position is: 0000000937e100000001000000010000000937e1000000010000000101
Attached to outbound server:xout
Last Position is: 0000000937e100000001000000010000000937e1000000010000000101

This demo is available in the following location:

$ORACLE_HOME/rdbms/demo/xstream/java

The file name for the demo is xio.java. See the README.txt file in the demo directory for more information about compiling and running the application.

The code for the sample application that uses the Java API follows:

import oracle.streams.*;
import oracle.jdbc.internal.OracleConnection;
import oracle.jdbc.*;
import oracle.sql.*;
import java.sql.*;
import java.util.*;
 
public class xio
{
  public static String xsinusername = null;
  public static String xsinpasswd = null;
  public static String xsinName = null;
  public static String xsoutusername = null;
  public static String xsoutpasswd = null;
  public static String xsoutName = null;
  public static String in_url = null;
  public static String out_url = null;
  public static Connection in_conn = null;
  public static Connection out_conn = null;
  public static XStreamIn xsIn = null;
  public static XStreamOut xsOut = null;
  public static byte[] lastPosition = null;
  public static byte[] processedLowPosition = null;
    
  public static void main(String args[])
  {
    // get connection url to inbound and outbound server
    in_url = parseXSInArguments(args);    
    out_url = parseXSOutArguments(args);    
 
    // create connection to inbound and outbound server
    in_conn = createConnection(in_url, xsinusername, xsinpasswd);
    out_conn = createConnection(out_url, xsoutusername, xsoutpasswd);
 
    // attach to inbound and outbound server
    xsIn = attachInbound(in_conn);
    xsOut = attachOutbound(out_conn);
    
    // main loop to get lcrs 
    get_lcrs(xsIn, xsOut);
    
    // detach from inbound and outbound server
    detachInbound(xsIn);
    detachOutbound(xsOut);
  }
    
  // parse the arguments to get the conncetion url to inbound db
  public static String parseXSInArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[0];
    host = args[1];
    port = args[2];
    xsinusername = args[3];
    xsinpasswd = args[4];
    xsinName = args[5];
    
    System.out.println("xsin_host = "+host);
    System.out.println("xsin_port = "+port);
    System.out.println("xsin_ora_sid = "+orasid);
 
    String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsin connection url: "+ in_url);
 
    return in_url;
  }
 
  // parse the arguments to get the conncetion url to outbound db
  public static String parseXSOutArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[6];
    host = args[7];
    port = args[8];
    xsoutusername = args[9];
    xsoutpasswd = args[10];
    xsoutName = args[11];
    
    
    System.out.println("xsout_host = "+host);
    System.out.println("xsout_port = "+port);
    System.out.println("xsout_ora_sid = "+orasid);
 
    String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsout connection url: "+ out_url);
 
    return out_url;
  }
 
  // print out sample program usage message
  public static void printUsage()
  {
    System.out.println("");      
    System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> "
                                         + "<xsin_port> ");
    System.out.println("                "+"<xsin_username> " + "<xsin_passwd> "
                                         + "<xsin_servername> ");
    System.out.println("                "+"<xsout_oraclesid> " + "<xsout_host> "
                                         + "<xsout_port> ");
    System.out.println("                "+"<xsout_username> " + "<xsout_passwd> "
                                         + "<xsout_servername> ");
  }
 
  // create a connection to an Oracle Database
  public static Connection createConnection(String url, 
                                            String username, 
                                            String passwd)
  {
    try
    {
      DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
      return DriverManager.getConnection(url, username, passwd);
    }
    catch(Exception e)
    {
      System.out.println("fail to establish DB connection to: " +url);
      e.printStackTrace();
      return null;
    }
  }
 
  // attach to the XStream Inbound Server
  public static XStreamIn attachInbound(Connection in_conn)
  {
    XStreamIn xsIn = null;
    try
    {
      xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName,
                              "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE);
 
      // use last position to decide where should we start sending LCRs  
      lastPosition = xsIn.getLastPosition();
      System.out.println("Attached to inbound server:"+xsinName);
      System.out.print("Inbound Server Last Position is: ");
      if (null == lastPosition)
      {
        System.out.println("null");
      }
      else
      {
        printHex(lastPosition);
      }
      return xsIn;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    }        
  }
 
  // attach to the XStream Outbound Server    
  public static XStreamOut attachOutbound(Connection out_conn)
  {
    XStreamOut xsOut = null;
 
    try
    {
      // when attach to an outbound server, client needs to tell outbound
      // server the last position.
      xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName,
                                lastPosition, XStreamOut.DEFAULT_MODE);
      System.out.println("Attached to outbound server:"+xsoutName);
      System.out.print("Last Position is: ");  
      if (lastPosition != null)
      {
        printHex(lastPosition);
      }
      else
      {
        System.out.println("NULL");
      }
      return xsOut;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    } 
  }
 
  // detach from the XStream Inbound Server
  public static void detachInbound(XStreamIn xsIn)
  {
    byte[] processedLowPosition = null;
    try
    {
      processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE);
      System.out.print("Inbound server processed low Position is: ");
      if (processedLowPosition != null)
      {
        printHex(processedLowPosition);
      }
      else
      {
        System.out.println("NULL");
      }
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  // detach from the XStream Outbound Server    
  public static void detachOutbound(XStreamOut xsOut)
  {
    try
    {
      xsOut.detach(XStreamOut.DEFAULT_MODE);
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }       
  }
 
  public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut)
  {
    byte[] ping_pos = null;
    byte[] fetchlwm = null;
    String src_db = null;
    
    if (null == xsIn) 
    {
      System.out.println("xstreamIn is null");
      System.exit(0);
    }
 
    if (null == xsOut)
    {
      System.out.println("xstreamOut is null");
      System.exit(0);
    }
 
    try
    {
      while(true) 
      {
        // receive an LCR from outbound server
        LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);
        fetchlwm = xsOut.getFetchLowWatermark();
    
        // save source db for ping lcr
        if (null != alcr)
          src_db = alcr.getSourceDatabaseName();
        
        if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
        {
          assert alcr != null;
          // send the LCR to the inbound server
          xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
 
          // also get chunk data for this LCR if any
          if (alcr instanceof RowLCR)
          {
            // receive chunk from outbound then send to inbound
            if (((RowLCR)alcr).hasChunkData())
            {
              ChunkColumnValue chunk = null; 
              do
              {
                chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
                xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE);
              } while (!chunk.isEndOfRow());
            }
          }
          processedLowPosition = alcr.getPosition();
          ping_pos = processedLowPosition;          
        }
        else  // batch is end 
        {
          assert alcr == null;
 
          // send ping lcr if we haven't received any lcr in the batch
          // but we got a new fetch lwm, then send a commit lcr to
          // ping the inbound server with the new fetch LWM position
          if (null != src_db && null != fetchlwm && 
              !samePos(fetchlwm,ping_pos))
          {
            xsIn.sendLCR(createPing(src_db, fetchlwm), 
                         XStreamIn.DEFAULT_MODE);
            ping_pos = fetchlwm;
          }
 
          // flush the network
          xsIn.flush(XStreamIn.DEFAULT_MODE);
          // get the processed_low_position from inbound server
          processedLowPosition = 
              xsIn.getProcessedLowWatermark();
          // update the processed_low_position at oubound server
          if (null != processedLowPosition)
            xsOut.setProcessedLowWatermark(processedLowPosition, 
                                           XStreamOut.DEFAULT_MODE);
        }
      }
    }
    catch(Exception e)
    {
      System.out.println("exception when processing LCRs");
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  public static void printHex(byte[] b) 
  {
    for (int i = 0; i < b.length; ++i) 
    {
      System.out.print(
        Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
    }
    System.out.println("");
  }    
 
  // ping lcr is used to bump up the inbound server watermark
  private static RowLCR createPing(String src_db, byte[] pos)
  {
    java.util.Date today = new java.util.Date();
    java.sql.Timestamp now = new java.sql.Timestamp(today.getTime());
    oracle.sql.DATE src_time = new oracle.sql.DATE(now);
 
 
    RowLCR alcr = new DefaultRowLCR();
    ((RowLCR)alcr).setSourceDatabaseName(src_db);
    ((RowLCR)alcr).setSourceTime(src_time);
    ((RowLCR)alcr).setPosition(pos);
    ((RowLCR)alcr).setCommandType(RowLCR.COMMIT);
    ((RowLCR)alcr).setTransactionId("Ping: " + src_time.toString());
 
    return alcr;
  }
 
  private static boolean samePos(byte[] pos1, byte[] pos2)
  {
    int   cmp_len;
    boolean    result;
 
    if (pos1.length != pos2.length)
      return false;
    
    for (int i = 0; i<pos1.length; i++)
    {
      if (pos1[i] != pos2[i])
        return false;            
    }
 
    return true;
  }
}