Sometimes
you have a process to load data from a flat file to a database, even from a
database to your DW.
But if something
goes wrong? How do you know if your load process finished successfully or not?
If not, how do you reprocess the files without duplicate records?
This post
will bring an option to try addressing those problems.
The main
ideas of control process are (1) generate an ID for the process (2) save a
timestamp at beginning (3) save ID on all the tables controlled (4) save a
timestamp at the end.
So, if some
problem occurs and aborts the job, next time you will be able to identify all data
inserted by that job, delete them and insert again.
I’m
assuming PostgreSQL as Database. For others databases might be necessary some adjusts.
Bellow is
an overview of the whole process.
For this
process works you need:
(1) Create a table called ‘ctrl_carga’ within
each schema that has tables that you want to control. This table is responsible
to store the ID / Start Timestamp / End Timestamp / ID_Job / Customized Fields,
for each run of the job.
Create table schema.ctrl_carga (
id bigint NOT NULL DEFAULT
nextval(('schema.ctrl_carga_seq'::text)::regclass),
start_process timestamp without time zone NOT
NULL DEFAULT (now())::timestamp without time zone,
end_process timestamp without
time zone,
id_job character varying NOT
NULL
)
All fields
above are mandatory, but you can add more fields if needed to do a more precise
control.
Let’s
imagine that inside the schema you have two tables to be controlled and each
table is loaded by different jobs. The field ‘id_job’ must store a key that
identifies uniquely the job.
This is
important because both jobs can be aborted, so you need retrieve the ID for
job1 to delete only the data inserted by job1 during that problematic run, and retrieve
the ID for job2 to delete only the data inserted by job2 as well.
(2) Add the
following column in each table to be controlled
ctrl_carga_id
bigint NOT NULL
(3) Set
some important global variables
ctrl_schema:
name of the schema where the tables are.
ctrl_id_job:
key that identifies uniquely the job.
ctrl_database:
database used.
(4) Create
a transformation that generate and save the timestamp at beginning
(start_ctrl_carga step)
If some run didn’t finish correctly the step above will return the same ID created previously, otherwise it will create a new record and return the new ID.
And if you are
using additional fields to control the load process, you also need to specify those
fields in the step.
And set a
new variable with the ID.
(5) Create
a transformation (ctrl_carga) that will search for all runs didn’t finished
correctly and delete all data related to them.
Search for all IDs didn’t finished.
If for some
reason the ID list is necessary for others step, they are returned using a ‘Copy
rows to result’.
Step just
to join ID list with database type.
Like said
before, this step is customized for each database. It must return a list of all
tables that has the field ‘ctrl_carga_id’, except the control table ‘ctrl_carga’.
So, if you
are using a different database, such as Mysql, create a new customized path
with the same idea.
How I don’t
know which tables hold records related to IDs didn’t finished, the step will
execute a delete for all IDs in all tables controlled. This step will clean up all
data from the problematic runs to be stored again without duplication.
Remember,
even a new ID will be recognized as a problematic run, but no data will
deleted.
(6) To
finalize, save the timestamp at the end to indicate that job finished successfully.
See you !!
(Any corrections
for English are welcome.)
Nenhum comentário:
Postar um comentário