3. Advanced Deployments
3.3. Batch Loading for Data Warehouses
Tungsten Replicator normally applies SQL changes to slaves by constructing SQL statements and executing in the exact order that trans- actions appear in the Tungsten History Log (THL). This works well for OLTP databases like MySQL, PostgreSQL, Oracle, and MongoDB. However, it is a poor approach for data warehouses.
Data warehouse products like Vertica or GreenPlum load very slowly through JDBC interfaces (50 times slower or even more compared to MySQL). Instead, such databases supply batch loading commands that upload data in parallel. For instance, Vertica uses the COPY command, Greenplum uses gpload, InfiniDB uses cpimport, and Infobright uses LOAD DATA INFILE.
Tungsten Replicator has a batch applier named SimpleBatchApplier that groups transactions and then loads data. This is known as "batch apply." You can configure Tungsten to load 10s of thousands of transactions at once using template that apply the correct com- mands for your chosen data warehouse.
While we use the term batch apply Tungsten is not batch-oriented in the sense of traditional Extract/Transfer/Load tools, which may run only a small number of batches a day. Tungsten builds batches automatically as transactions arrive in the log. The mechanism is de- signed to be self-adjusting. If small transaction batches cause loading to be slower, Tungsten will automatically tend to adjust the batch size upwards until it no longer lags during loading.
3.3.1. How It Works
The batch applier loads data into the slave DBMS using CSV files and appropriate load commands like LOAD DATA INFILE or COPY. Here is
While executing within a commit block, we write incoming transactions into open CSV files written by class CsvWriter. There is one CSV
file per database table. The following sample shows typical contents.
"I","84900","1","986","http://www.continent.com/software" "D","84901","2","143",null
"I","84901","3","143","http://www.microsoft.com"
Tungsten adds three extra column values to each line of CSV output. Column Description
opcode A transaction code that has the value "I" for insert and "D" for delete se-
qno [269]
The Tungsten transaction sequence number
row_id A line number that starts with 1 and increments by 1 for each new row
Different update types are handled as follows:
• Each insert generates a single row containing all values in the row with an "I" opcode. • Each delete generates a single row with the key and a "D" opcode. Non-key fields are null. • Each update results in a delete with the row key followed by an insert.
• Statements are ignored. If you want DDL you need to put it in yourself.
Tungsten writes each row update into the corresponding CSV file for the SQL. At commit time the following steps occur: 1. Flush and close each CSV file. This ensures that if there is a failure the files are fully visible in storage.
2. For each table execute a merge script to move the data from CSV into the data warehouse. This script varies depending on the da- ta warehouse type or even for specific application. It generally consistes of a sequence of operating system commands, load com- mands like COPY or LOAD DATA INFILE to load in the CSV data, and ordinary SQL commands to move/massage data.
3. When all tables are loaded, issue a single commit on the SQL connection.
The main requirement of merge scripts is that they must ensure rows load and that delete and insert operations apply in the correct or- der. Tungsten includes load scripts for MySQL and Vertica that do this automatically.
It is common to use staging tables to help load data. These are described in more detail in a later section.
3.3.2. Important Limitations
Tungsten currently has some important limitations for batch loading, namely:
1. Primary keys must be a single column only. Tungsten does not handle multi-column keys.
2. Binary data is not certified and may cause problems when converted to CSV as it will be converted to Unicode. These limitations will be relaxed in future releases.
3.3.3. Batch Applier Setup
Here is how to set up on MySQL. For more information on specific data warehouse types, refer to Chapter 2, Deployment. 1. Enable row replication on the MySQL master using set global binlog_format=row or by updating my.cnf.
2. Modify the wrapper.conf file in the release to enable the correct platform encoding and timezone for the Java VM. Uncomment the following lines and edit to suit your platform.
# You may need to set the Java platform charset to replicate heterogeneously # from MySQL using row replication. This should match the default charset # of your MySQL tables. Common values are UTF8 and ISO_8859_1. Many Linux # platforms default to ISO_8859_1 (latin1).
wrapper.java.additional.4=-Dfile.encoding=UTF8
# To ensure consistent handling of dates in heterogeneous and batch replication # you should set the JVM timezone explicitly. Otherwise the JVM will default # to the platform time, which can result in unpredictable behavior when # applying date values to slaves. GMT is recommended to avoid inconsistencies.
wrapper.java.additional.5=-Duser.timezone=GMT
3. Install using the --batch-enabled=true [140] option. Here's a typical installation command using tpm:.
shell> ./tools/tpm batch --cluster-hosts=logos1,logos2 \ --master-host=logos1 \ --datasource-user=tungsten \ --datasource-password=secret \ --batch-enabled=true \ --batch-load-template=mysql \ --svc-table-engine=infinidb \ --svc-extractor-filters=colnames,pkey \ --property=replicator.filter.pkey.addPkeyToInserts=true \ --property=replicator.filter.pkey.addColumnsToDeletes=true \ --home-directory=/opt/continuent \ --channels=1 \ --buffer-size=1000 \ --mysql-use-bytes-for-string=false \ --skip-validation-check=MySQLConfigFileCheck \ --skip-validation-check=MySQLExtractorServerIDCheck \ --skip-validation-check=MySQLApplierServerIDCheck \ --svc-parallelization-type=disk --start-and-report
There are a number of important options for batch loading. • --batch-enabled=true [140]
Enables batch loading on the slave. • --batch-load-template=name [140]
Selects a set of connect and merge files. (See below.) • --svc-table-engine=name [169]
For MySQL-based data warehouses, sets the table type for Tungsten catalogs. Must be either infinidb (InfiniDB) or brighthouse (Info- bright).
• --svc-extractor-filters=colnames,pkey [168]
Filters that must run on master to fill in column names and the table primary key from the original data. • --property=replicator.filter.pkey.addPkeyToInserts=true [162]
Adds primary key data to inserts.
• --property=replicator.filter.pkey.addColumnsToDeletes=true [162]
Adds table columns to deletes.
You may force additional parameter settings using --property [162] flags if necessary.
3.3.4. Connect and Merge Scripts
The batch apply process supports two parameterized SQL scripts, which are controlled by the following properties.
Type Description
Connect script Script that executes on connection to the DBMS to initialize the session Merge script Script that merges data at commit time from CSV to the data warehouse
Tungsten provides paired scripts for each supported data warehouse type with conventional names so that it is easy to tell them apart. To select a particular pair, use the --batch-type option. For instance, --batch-type=vertica would select the standard Vertica scripts,
which are named vertica-connect.sql and vertica-merge.sql.
Connect and merge scripts follow a simple format that is describe as follows. • Any line starting with '#' is a comment.
• Any line starting with '!' is an operating system command. • Any other non-blank line is a SQL statement.
You can extend operating system commands and SQL statements to multiple lines by indenting subsequent lines.
Connect scripts are very simple and normally consist only of SQL commands. The following example shows a typical connect script for MySQL-based data warehouses like InfiniDB and Infobright.
# MySQL connection script. Ensures consistent timezone treatment. SET time_zone = '+0:00';
Merge scripts on the other hand are templates that also allow the following parameters. Parameters are surrounded by %% symbols,
which is ugly but unlikely to be confused with SQL or other commands: Parameter Description
%%BASE_COLUMNS%% Comma-separated list of base table columns %%BASE_PKEY%% Fully qualified base table primary key name %%BASE_TABLE%% Fully qualified name of the base table %%CSV_FILE%% Full path to CSV file
%%PKEY%% Primary key column name
%%STAGE_PKEY%% Fully qualified stage table primary key name %%STAGE_SCHEMA%% Name of the staging table schema
%%STAGE_TABLE%% Name of the staging table
%%STAGE_TABLE_FQN%% Fully qualified name of the staging table
Here is a typical merge script containing a mix of both SQL and operating system commands.
# Merge script for MySQL. #
# Extract deleted data keys and put in temp CSV file for deletes. !egrep '^"D",' %%CSV_FILE%% |cut -d, -f4 > %%CSV_FILE%%.delete
# Load the delete keys.
LOAD DATA INFILE '%%CSV_FILE%%.delete' INTO TABLE %%STAGE_TABLE_FQN%% CHARACTER SET utf8 FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
# Delete keys that match the staging table. DELETE %%BASE_TABLE%%
FROM %%STAGE_TABLE_FQN%% s INNER JOIN %%BASE_TABLE%%
ON s.%%PKEY%% = %%BASE_TABLE%%.%%PKEY%%
# Extract inserted data and put into temp CSV file.
!egrep '^"I",' %%CSV_FILE%% |cut -d, -f4- > %%CSV_FILE%%.insert
# Load the extracted inserts.
LOAD DATA INFILE '%%CSV_FILE%%.insert' INTO TABLE %%BASE_TABLE%% CHARACTER SET utf8 FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
Load scripts are stored by convention in directory tungsten-replicator/samples/scripts/batch. You can find scripts for all currently sup-
ported data warehouse types there.
3.3.5. Staging Tables
Staging tables are intermediate tables that help with data loading. There are different usage patterns for staging tables.
3.3.5.1. Staging Table Names
Tungsten assumes that staging tables, if present, follow certain conventions for naming and provides a number of configuration proper- ties for generating staging table names that match the base tables in the data warehouse without colliding with them.
Property Description
stageColumnPrefix Prefix for seqno, row_id, and opcode columns generated by Tungsten stageTablePrefix Prefix for stage table name
stageSchemaPrefix Prefix for the schema in which the stage tables reside
These values are set in the static properties file that defines the replication service. They can be set at install time using --proper- ty [162] options. The following example shows typical values from a service properties file.
replicator.applier.dbms.stageColumnPrefix=tungsten_ replicator.applier.dbms.stageTablePrefix=stage_xxx_ replicator.applier.dbms.stageSchemaPrefix=load_
If your data warehouse contains a table named foo in schema bar, these properties would result in a staging table name of
load_bar.stage_xxx_foo for the staging table. The Tungsten generated column containing the seqno [269], if present, would be named tungsten_seqno.
Note
Staging tables are by default in the same schema as the table they update. You can put them in a different schema using the stageSchemaPrefix property as shown in the example.
3.3.5.2. Whole Record Staging
Whole record staging loads the entire CSV file into an identical table, then runs queries to apply rows to the base table or tables in the data warehouse. One of the strengths of whole record staging is that it allows you to construct a merge script that can handle any com- bination of INSERT, UPDATE, or DELETE operations. A weakness is that whole record staging can result in sub-optimal I/O for workloads that
consist mostly of INSERT operations.
For example, suppose we have a base table created by the following CREATE TABLE command:
CREATE TABLE `mydata` ( `id` int(11) NOT NULL, `f_data` float DEFAULT NULL, PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
A whole record staging table would look as follows.
CREATE TABLE `stage_xxx_croc_mydata` ( `tungsten_opcode` char(1) DEFAULT NULL, `tungsten_seqno` int(11) DEFAULT NULL, `tungsten_row_id` int(11) DEFAULT NULL, `id` int(11) NOT NULL,
`f_data` float DEFAULT NULL
) ENGINE=InfiniDB DEFAULT CHARSET=utf8;
Note that this table does not have a primary key defined. Most data warehouses do not use primary keys and many of them do not even permit it in the create table syntax.
Note also that the non-primary columns must permit nulls. This is required for deletes, which contain only the Tungsten generated columns plus the primary key.
3.3.5.3. Delete Key Staging
Another approach is to load INSERT rows directly into the base data warehouse tables without staging. All you need to stage is the keys
for deleted records. This reduces I/O considerably for workloads that have mostly inserts. The downside is that it may require introduce ordering dependencies between DELETE and INSERT operations that require special handling by upstream applications to generate trans-
actions that will load without conflicts.
Delete key staging stables can be as simple as the follow example:
CREATE TABLE `stage_xxx_croc_mydata` ( `id` int(11) NOT NULL,
) ENGINE=InfiniDB DEFAULT CHARSET=utf8;
3.3.5.4. Staging Table Generation
Tungsten does not generate staging tables automatically. Creation of staging tables is the responsibility of users, but using the ddlscan tool with the right template can be simplified.
3.3.6. Character Sets
Character sets are a headache in batch loading because all updates are written and read from CSV files, which can result in invalid trans- actions along the replication path. Such problems are very difficult to debug. Here are some tips to improve chances of happy replicat- ing.
• Use UTF8 character sets consistently for all string and text data.
shell> tpm ... --mysql-use-bytes-for-string=false.
• When starting the replicator for MySQL replication, include the following option tpm file:
shell> tpm ... --java-file-encoding=UTF8
3.3.7. Time Zones
Time zones are another headache when using batch loading. For best results applications should standardize on a single time zone, preferably UTC, and use this consistently for all data. To ensure the Java VM outputs time data correctly to CSV files, you must set the JVM time zone to be the same as the standard time zone for your data. Here is the JVM setting in wrapper.conf:
# To ensure consistent handling of dates in heterogeneous and batch replication # you should set the JVM timezone explicitly. Otherwise the JVM will default # to the platform time, which can result in unpredictable behavior when # applying date values to slaves. GMT is recommended to avoid inconsistencies. wrapper.java.additional.5=-Duser.timezone=GMT
Note
Beware that MySQL has two very similar data types: timestamp and datetime. Timestamps are stored in UTC and
convert back to local time on display. Datetimes by contrast do not convert back to local time. If you mix time- zones and use both data types your time values will be inconsistent on loading.