• No results found

I have been playing with Oracle Streams again lately. My goal is to capture changes in 10g and send them to a 9i database.

N/A
N/A
Protected

Academic year: 2021

Share "I have been playing with Oracle Streams again lately. My goal is to capture changes in 10g and send them to a 9i database."

Copied!
16
0
0

Loading.... (view fulltext now)

Full text

(1)

I have been playing with Oracle Streams again lately. My goal is to capture changes in

10g and send them to a 9i

database

.

Below is the short list for setting up Change Data Capture using Oracle Streams. These

steps are mostly from the docs with a few tweaks I have added. This entry only covers

setting up the local capture and apply. I'll add the propagation to 9i later this week or next

weekend.

First the set up: we will use the HR account's Employee table. We'll capture all changes

to the Employee table and insert them into an audit table. I'm not necessarily saying this

is the way you should audit your database but it makes a nice example.

I'll also add a monitoring piece to capture process. I want to be able to see exactly what is

being captured when it is being captured.

You will need to have sysdba access to follow along with me. Your database must also be

in archivelog mode. The changes are picked up from the redo log.

So, away we go!

The first step is to create out streams administrator. I will follow the guidelines from the

oracle docs exactly for this:

Connect as sysdba:

sqlplus / as sysdba

Create the streams tablespace (change the name and/or location to suit):

create tablespace streams_tbs datafile 'c:\temp\stream_tbs.dbf' size 25M

reuse autoextend on maxsize unlimited;

Create our streams administrator:

create user strmadmin identified by strmadmin default tablespace streams_tbs

quota unlimited on streams_tbs;

I haven't quite figured out why, but we need to grant our administrator DBA privs. I think

this is a bad thing. There is probably a work around where I could do some direct grants

instead but I haven't had time to track those down.

(2)

grant dba to strmadmin;

We also want to grant streams admin privs to the user.

BEGIN DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'strmadmin', grant_privileges => true); END; /

The next steps we'll run as the HR user.

conn hr/hr

Grant all access to the employee table to the streams admin:

grant all on hr.employees to strmadmin;

We also need to create the employee_audit table. Note that I am adding three columns in

this table that do not exist in the employee table.

CREATE TABLE employee_audit( employee_id NUMBER(6), first_name VARCHAR2(20), last_name VARCHAR2(25), email VARCHAR2(25), phone_number VARCHAR2(20), hire_date DATE, job_id VARCHAR2(10), salary NUMBER(8,2), commission_pct NUMBER(2,2), manager_id NUMBER(6), department_id NUMBER(4), upd_date DATE, user_name VARCHAR2(30), action VARCHAR2(30));

Grant all access to the audit table to the streams admin user:

grant all on hr.employee_audit to strmadmin;

(3)

We connect as the streams admin user:

conn strmadmin/strmadmin

We can create a logging table. You would NOT want to do this in a

high-volume

production system. I am doing this to illustrate user defined monitoring and show how

you can get inside the capture process.

CREATE TABLE streams_monitor (

date_and_time TIMESTAMP(6) DEFAULT systimestamp, txt_msg CLOB );

Here we create the queue. Unlike AQ, where you have to create a separate table, this step

creates the queue and the underlying ANYDATA table.

BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue'); END; /

This just defines that we want to capture DML and not DDL.

BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'capture', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true); END; /

Tell the capture process that we want to know who made the change:

BEGIN

DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE( capture_name => 'capture_emp',

(4)

attribute_name => 'username', include => true); END;

/

We also need to tell Oracle where to start our capture. Change the source_database_name

to match your database.

DECLARE iscn NUMBER; BEGIN iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'hr.employees', source_database_name => 'ORCL', instantiation_scn => iscn); END; /

And the fun part! This is where we define our capture procedure. I'm taking this right

from the docs but I'm adding a couple steps.

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; command VARCHAR2(30); old_values SYS.LCR$_ROW_LIST; BEGIN -- Access the LCR rc := in_any.GETOBJECT(lcr); -- Get the object command type command := lcr.GET_COMMAND_TYPE();

-- I am inserting the XML equivalent of the LCR into the monitoring table.

insert into streams_monitor (txt_msg) values (command ||

DBMS_STREAMS.CONVERT_LCR_TO_XML(in_any) );

-- Set the command_type in the row LCR to INSERT lcr.SET_COMMAND_TYPE('INSERT');

-- Set the object_name in the row LCR to EMP_DEL lcr.SET_OBJECT_NAME('EMPLOYEE_AUDIT');

-- Set the new values to the old values for update and delete IF command IN ('DELETE', 'UPDATE') THEN

(5)

old_values := lcr.GET_VALUES('old');

-- Set the old values in the row LCR to the new values in the row LCR

lcr.SET_VALUES('new', old_values);

-- Set the old values in the row LCR to NULL lcr.SET_VALUES('old', NULL);

END IF;

-- Add a SYSDATE for upd_date

lcr.ADD_COLUMN('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE)); -- Add a user column

lcr.ADD_COLUMN('new', 'user_name',

lcr.GET_EXTRA_ATTRIBUTE('USERNAME') ); -- Add an action column

lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));

-- Make the changes lcr.EXECUTE(true); commit;

END; /

Create the DML handlers:

BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; / BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE',

(6)

error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => NULL); END; /

Create the apply rule. This tells streams, yet again, that we in fact do want to capture

changes. The second calls tells streams where to put the info. Change the

source_database_name to match your database.

DECLARE emp_rule_name_dml VARCHAR2(30); emp_rule_name_ddl VARCHAR2(30); BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, source_database => 'ORCL', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl); DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue'); END; /

We don't want to stop applying changes when there is an error, so:

BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n'); END; /

Turn on the apply process:

BEGIN

DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_emp'); END;

(7)

Turn on the capture process:

BEGIN DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_emp'); END; /

Connect as HR and make some changes to Employees.

sqlplus hr/hr

INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', '[email protected]',

NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);

COMMIT;

UPDATE hr.employees SET salary=5999 WHERE employee_id=206; COMMIT;

DELETE FROM hr.employees WHERE employee_id=207; COMMIT;

It takes a few seconds for the data to make it to the logs and then back into the system to

be appled. Run this query until you see data (remembering that it is not instantaneous):

SELECT employee_id, first_name, last_name, upd_Date, action

FROM hr.employee_audit ORDER BY employee_id;

Then you can log back into the streams admin account:

sqlplus strmadmin/strmadmin

View the XML LCR that we inserted during the capture process:

set long 9999

set pagesize 0

(8)

That's it! It's really not that much work to capture and apply changes. Of course, it's a

little bit more work to cross database instances, but it's not that much. Keep an eye out for

a future entry where I do just that.

One of the things that amazes me is how little

code

is required to accomplish this. The

less code I have to write, the less code I have to maintain.

The entry builds directly on my last entry,

Oracle Streams Configuration: Change Data

Capture

. This entry will show you how to propagate the changes you captured in that

entry to a 9i

database

.

NOTE #1: I would recommend that you run the commands and make sure the last entry

works for you before trying the

code

in this entry. That way you will need to debug as

few moving parts as possible.

NOTE #2: I have run this code windows to windows, windows to linux, linux to solaris

and solaris to solaris. The only time I had any problem at all was solaris to solaris. If you

run into problems with propagation running but not sending

data

, shutdown the source

database and restart it. That worked for me.

NOTE #3: I have run this code 10g to 10g and 10g to 9i. It works without change

between them.

NOTE #4: If you are not sure of the exact name of your database (including domain), use

global_name, i.e. select * from global_name;

NOTE #5: Streams is not available with XE. Download and install EE. If you have 1 GB

or more of RAM on your PC, you can download EE and use the DBCA to run two

database instances. You do not physically need two machines to get this to work.

NOTE #6: I promise this is the last note. Merry Christmas and/or Happy Holidays!

Now for the fun part.

As I mentioned above, you need two instances for this. I called my first instance ORCL

(how creative!) and I called my second instance SECOND. It works for me!

ORCL will be my source instance and SECOND will be my target instance. You should

already have the CDC code from the last article running in ORCL.

ORCL must be in archivelog mode to run CDC. SECOND does not need archivelog

mode. Having two databases running on a single PC in archivelog mode can really beat

up a poor IDE drive.

(9)

You already created your streams admin user in ORCL so now do the same thing in

SECOND. The code below is mostly the same code that you ran on ORCL. I made a few

minor changes in case you are running both instances on a single PC:

sqlplus / as sysdba

create tablespace streams_second_tbs datafile 'c:\temp\stream_2_tbs.dbf' size 25M

reuse autoextend on maxsize unlimited; create user strmadmin identified by strmadmin default tablespace streams_second_tbs

quota unlimited on streams_second_tbs; grant dba to strmadmin;

Connect as strmadmin. You need to create an AQ table, AQ queue and then start the

queue. That's what the code below does.

BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'lrc_emp_t', queue_payload_type => 'sys.anydata', multiple_consumers => TRUE, compatible => '8.1'); DBMS_AQADM.CREATE_QUEUE( queue_name => 'lrc_emp_q', queue_table => 'lrc_emp_t'); DBMS_AQADM.START_QUEUE ( queue_name => 'lrc_emp_q'); END; /

You also need to create a database link. You have to have one from ORCL to SECOND

but for debugging, I like a link in both. So, while you're in SECOND, create a link:

CREATE DATABASE LINK orcl.world

CONNECT TO strmadmin IDENTIFIED BY strmadmin USING 'orcl.world';

Log into ORCL as strmadmin and run the exact same command there. Most of the setup

for this is exactly the same between the two instances.

(10)

Create your link on this side also.

CREATE DATABASE LINK second.world CONNECT TO strmadmin

IDENTIFIED BY strmadmin USING 'second.world';

Ok, now we have running queues in ORCL and SECOND. While you are logged into

ORCL, you will create a propagation schedule. You DO NOT need to run this in

SECOND.

BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.employees', streams_name => 'orcl_2_second', source_queue_name => 'strmadmin.lrc_emp_q', destination_queue_name => '[email protected]', include_dml => true, include_ddl => FALSE, source_database => 'orcl.world'); END; /

This tells the database to take the data in the local lrc_emp_q and send it to the named

destination queue.

We're almost done with the propagation now. We just need to change the code we wrote

in the last article in our DML handler. Go back and review that code now.

We are going to modify the EMP_DML_HANDLER so that we get an enqueue block

just above the execute statement:

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; command VARCHAR2(30); old_values SYS.LCR$_ROW_LIST; BEGIN -- Access the LCR rc := in_any.GETOBJECT(lcr); -- Get the object command type command := lcr.GET_COMMAND_TYPE();

-- I am inserting the XML equivalent of the LCR into the monitoring table.

insert into streams_monitor (txt_msg) values (command ||

(11)

DBMS_STREAMS.CONVERT_LCR_TO_XML(in_any) ); -- Set the command_type in the row LCR to INSERT lcr.SET_COMMAND_TYPE('INSERT');

-- Set the object_name in the row LCR to EMP_DEL lcr.SET_OBJECT_NAME('EMPLOYEE_AUDIT');

-- Set the new values to the old values for update and delete IF command IN ('DELETE', 'UPDATE') THEN

-- Get the old values in the row LCR old_values := lcr.GET_VALUES('old');

-- Set the old values in the row LCR to the new values in the row LCR

lcr.SET_VALUES('new', old_values);

-- Set the old values in the row LCR to NULL lcr.SET_VALUES('old', NULL);

END IF;

-- Add a SYSDATE value for the timestamp column

lcr.ADD_COLUMN('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE)); -- Add a user value for the timestamp column

lcr.ADD_COLUMN('new', 'user_name',

lcr.GET_EXTRA_ATTRIBUTE('USERNAME') ); -- Add an action column

lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));

DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); recipients DBMS_AQ.aq$_recipient_list_t; BEGIN recipients(1) := sys.aq$_agent( 'anydata_subscriber', '[email protected]', NULL); message_properties.recipient_list := recipients; DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.lrc_emp_q', enqueue_options => enqueue_options, message_properties => message_properties, payload => anydata.convertObject(lcr), msgid => message_handle); EXCEPTION

WHEN OTHERS THEN

insert into streams_monitor (txt_msg)

values ('Anydata: ' || DBMS_UTILITY.FORMAT_ERROR_STACK ); END;

-- Make the changes lcr.EXECUTE(true); commit;

(12)

END; /

The declaration section above created some variable required for an enqueue. We created

a subscriber (that's the name of the consumer). We will use that name to dequeue the

record in the SECOND instance.

We then enqueued our LCR as an ANYDATA datatype.

I put the exception handler there in case there are any problems with our enqueue.

That's all it takes. Insert some records into the HR.employees table and commit them.

Then log into strmadmin@second and select * from the lrc_emp_t table. You should

have as many records there as you inserted.

There are not a lot of moving parts so there aren't many things that will go wrong.

Propagation is where I have the most troubles. You can query DBA_PROPAGATION to

see if you have any propagation errors.

That's it for moving the data from 10g to 9i. In my next article, I will show you how to

dequeue the data and put it into the employee_audit table on the SECOND side.

I've been doing quite a bit recently with Oracle Streams. I recently had the need to create

some test

data

in LCR format. If you have manual control of your LCR, you can test

specific data issues a lot easier.

Anyway, I (and a coworker) searched the web and looked through various documentation

but was not able to find a concise description of how to go about creating an LCR

manually. So I decided to write one.

NOTE: Everything below is specifically for a ROW type LCR as opposed to a DDL type

LCR. The concepts would be the same but the specific

code

would change.

First a little definition. An LCR format is the format of data that Oracle uses in the redo

logs and is used for Oracle Streams (and probably data guard although I am guessing

about that). The LCR has information about what the object is as well as the old and new

values. The old and new values are exactly the same as :old and :new in a trigger.

You can see the definition of the LCR format by viewing

the LCR XML Schema

.

An LCR is an object type. Actually, it is two table collections of an object type embedded

within another object type. You can get the details of that from the documentation in the

supplied PL/SQL Packages and Types documentation

.

(13)

owner, and command type) at the top level. Beneath that is column and data information

in a name/value pair collection.

I tend to think of things like this in a relational format. If I put it in database terms, it

might look something like:

Excuse the poor diagram. An artiste I am not.

You can do a describe to see the methods as well as view the documentation. The

important thing to note is the constructor. Normally, a constructor has the same name as

the object type. In this case, they chose to name it CONSTRUCT.

SQL> desc sys.lcr$_row_record METHOD

---

STATIC FUNCTION CONSTRUCT RETURNS LCR$_ROW_RECORD

Argument Name Type In/Out Default? --- --- --- --- SOURCE_DATABASE_NAME VARCHAR2 IN

COMMAND_TYPE VARCHAR2 IN OBJECT_OWNER VARCHAR2 IN OBJECT_NAME VARCHAR2 IN

TAG RAW IN DEFAULT TRANSACTION_ID VARCHAR2 IN DEFAULT SCN NUMBER IN DEFAULT OLD_VALUES LCR$_ROW_LIST IN DEFAULT NEW_VALUES LCR$_ROW_LIST IN DEFAULT

Based on that info, populating the test LCR is relatively straight-forward.

Just for your info, the type LCR$_ROW_LIST is a collection of LCR$_ROW_UNIT.

Those types are also documented in the reference guide I mentioned above. You will not

want to access those directly though. You can use the built-in LCR$_ROW_RECORD

methods to populate those fields.

(14)

One thing to remember is that the data values that you are dealing with are sys.AnyData

data types. That type has it's own rules and deserves a blog entry all to itself.

Here is a function that will create an empty LCR for you automatically for any table.

Once you have the LCR you can modify the values to suit.

CREATE OR REPLACE FUNCTION create_lcr(

p_table_owner IN all_tables.owner%TYPE, p_table_name IN all_tables.table_name%TYPE, p_command IN VARCHAR2 ) RETURN sys.lcr$_row_record AS v_lcr sys.lcr$_row_record; v_database global_name.global_name%TYPE; BEGIN

-- verify the command type

IF p_command NOT IN ('INSERT', 'UPDATE', 'DELETE') THEN

RETURN v_lcr; END IF;

-- Get the database name -- This could be parameterized SELECT global_name INTO v_database FROM global_name; -- Construct the LCR v_lcr := sys.lcr$_row_record.construct( source_database_name => v_database, command_type => p_command, object_owner => p_table_owner, object_name => p_table_name );

-- You can override the values in the constructor by calling these methods

v_lcr.set_command_type(p_command); v_lcr.set_object_name(p_table_name); v_lcr.set_object_owner(p_table_owner); v_lcr.set_source_database_name(v_database);

-- Loop through the columns and add new and old values FOR c1 IN (

SELECT column_name, data_type FROM all_tab_columns

(15)

AND table_name = p_table_name ORDER BY column_id )

LOOP

-- Create an anydata based on column data type -- You would expand this for all data types -- I'm going to keep this example fairly simple CASE c1.data_type WHEN 'VARCHAR2' THEN v_lcr.add_column('new', c1.column_name, sys.AnyData.convertVarChar2(TO_CHAR(NULL))); v_lcr.add_column('old', c1.column_name, sys.AnyData.convertVarChar2(TO_CHAR(NULL))); WHEN 'DATE' THEN v_lcr.add_column('new', c1.column_name, sys.AnyData.convertDate(TO_DATE(NULL))); v_lcr.add_column('old', c1.column_name, sys.AnyData.convertDate(TO_DATE(NULL))); WHEN 'NUMBER' THEN v_lcr.add_column('new', c1.column_name, sys.AnyData.convertNumber(TO_NUMBER(NULL))); v_lcr.add_column('old', c1.column_name, sys.AnyData.convertNumber(TO_NUMBER(NULL))); END CASE; END LOOP; RETURN v_lcr; END;

To call this function and manipulate it, you might write something like the following:

DECLARE

v_lcr sys.lcr$_row_record; BEGIN

v_lcr := create_lcr( 'HR', 'EMPLOYEES', 'INSERT' ); -- Set some values

v_lcr.set_value('new', 'first_name', sys.anyData.convertVarchar2('Lewis')); v_lcr.set_value('old', 'first_name', sys.anyData.convertVarchar2('George')); -- Display Some Values

DBMS_OUTPUT.PUT_LINE(

'Database: ' || v_lcr.get_source_database_name() || ', Object Owner: ' || v_lcr.get_object_owner() || ', Object Name: ' || v_lcr.get_object_name() || ', Command: ' || v_lcr.get_command_type() );

(16)

DBMS_OUTPUT.PUT_LINE( 'New First Name: ' ||

sys.anyData.accessVarchar2(v_lcr.get_value('new', 'first_name')) || ', Old First Name: ' ||

sys.anyData.accessVarchar2(v_lcr.get_value('old', 'first_name')) ); END;

The output from this is:

Database: XE, Object Owner: HR, Object Name: EMPLOYEES, Command: INSERT New First Name: Lewis, Old First Name: George

And that's all there is to it.

My book is making good progress. It is due Feb 1 and I am on track for delivery. I will

start posting much more often after I get past that milestone.

References

Related documents