Connecting to Destinations
In the previous sections, we have seen how to connect to the source databases. In this section, we will go through how to connect to the destination databases. Previously, we were transfering data to a demo sink, but now we will transfer data to a real sink.
For this purpose, we will be using Aerospike as the sink database. However, you can use any of the supported sinks, which you can find here.
Aerospike is a distributed, scalable NoSQL database that is optimized for flash storage and solid-state drives. It is a key-value store that is designed to be fast and reliable.
Connecting to Aerospike
Pre-requisites
Make sure you have Aerospike is installed and running. If not, you can follow the instructions here. Or you can use the Aerospike docker image as shown below.
docker pull aerospike
We now want to connect to Aerospike using Dozer to replicate the data from our sources. For this, we will add details about the Aerospike sink to the config.yaml file.
connections:
- config: !Aerospike
namespace: test
sets: []
hosts: localhost:3000
name: sink_connection
sinks:
- table_name: <source table>
config: !Aerospike
connection: sink_connection
namespace: test
set_name: <set name>
The dozer-config.yaml file should look like this:
version: 1
app_name: simple-pg-sample
connections:
- name: pagila_conn
config: !Postgres
user: postgres
password: postgres
host: localhost
port: 5433
database: pagila
- config: !Aerospike
namespace: test
sets: []
hosts: localhost:3000
name: sink_connection
sources:
- name: films
table_name: film
connection: pagila_conn
columns:
- film_id
- title
- rental_rate
sinks:
- table_name: films
config: !Aerospike
connection: sink_connection
namespace: test
set_name: films
Now start the replication by running the following command:
dozer run --enable-progress
You should see a screen like the following
INFO Source: Initializing input schema: film
+-------------+---------+----------+-------+
| Field | Type | Nullable | PK |
+-------------+---------+----------+-------+
| film_id | Int | false | true |
+-------------+---------+----------+-------+
| title | String | false | false |
+-------------+---------+----------+-------+
| rental_rate | Decimal | false | false |
+-------------+---------+----------+-------+
INFO [pipeline] Validation completed
INFO Starting Internal Server on 0.0.0.0:50053
▹▹▹▹▹ film: 0: 0/s INFO NOTICE: publication "dozer_publication_pagila_conn" does not exist, skipping
INFO Snapshotting for connection pagila_conn took 6.535752ms
Replication
Now that we have the Aerospike sink set up, we can start the replication process. We will update film table in the source database and see if the changes are reflected in the sink database.
-- Increase the price of all films by 10%
UPDATE film
SET rental_rate = rental_rate * 1.1;
-- Mark all films released before 2005 as old
UPDATE film
SET release_year = 'Old'
WHERE release_year < 2005;
-- Set the rental duration of all films to 7 days
UPDATE film
SET rental_duration = 7;
-- Increase the replacement cost of all films by 5
UPDATE film
SET replacement_cost = replacement_cost + 5;
After running the above sql queries, you should see the changes reflected in the Aerospike sink.
INFO NOTICE: publication "dozer_publication_pagila_conn" does not exist, skipping
INFO Snapshotting for connection pagila_conn took 6.535752ms
INFO [pagila_conn] Starting Replication: 0/24F7998, "dozer_publication_pagila_conn"
▹▹▸▹▹ film: 3072: 3.7526/s