quinta-feira, 26 de setembro de 2013

Controlling Data Loading Process using Kettle



Sometimes you have a process to load data from a flat file to a database, even from a database to your DW.

But if something goes wrong? How do you know if your load process finished successfully or not? If not, how do you reprocess the files without duplicate records?

This post will bring an option to try addressing those problems.

The main ideas of control process are (1) generate an ID for the process (2) save a timestamp at beginning (3) save ID on all the tables controlled (4) save a timestamp at the end.

So, if some problem occurs and aborts the job, next time you will be able to identify all data inserted by that job, delete them and insert again.


I’m assuming PostgreSQL as Database. For others databases might be necessary some adjusts.


Bellow is an overview of the whole process.




For this process works you need:

(1)  Create a table called ‘ctrl_carga’ within each schema that has tables that you want to control. This table is responsible to store the ID / Start Timestamp / End Timestamp / ID_Job / Customized Fields, for each run of the job.


Create table schema.ctrl_carga (
   id bigint NOT NULL DEFAULT nextval(('schema.ctrl_carga_seq'::text)::regclass),
   start_process timestamp without time zone NOT NULL DEFAULT (now())::timestamp without time zone,
   end_process timestamp without time zone,
   id_job character varying NOT NULL
 )


All fields above are mandatory, but you can add more fields if needed to do a more precise control.

Let’s imagine that inside the schema you have two tables to be controlled and each table is loaded by different jobs. The field ‘id_job’ must store a key that identifies uniquely the job.

This is important because both jobs can be aborted, so you need retrieve the ID for job1 to delete only the data inserted by job1 during that problematic run, and retrieve the ID for job2 to delete only the data inserted by job2 as well.



(2) Add the following column in each table to be controlled

ctrl_carga_id bigint NOT NULL


(3) Set some important global variables


ctrl_schema: name of the schema where the tables are.
ctrl_id_job: key that identifies uniquely the job.
ctrl_database: database used.



(4) Create a transformation that generate and save the timestamp at beginning (start_ctrl_carga step)






If some run didn’t finish correctly the step above will return the same ID created previously, otherwise it will create a new record and return the new ID.

And if you are using additional fields to control the load process, you also need to specify those fields in the step.


And set a new variable with the ID.


(5) Create a transformation (ctrl_carga) that will search for all runs didn’t finished correctly and delete all data related to them.



Search for all IDs didn’t finished.

If for some reason the ID list is necessary for others step, they are returned using a ‘Copy rows to result’.



Step just to join ID list with database type.


Use a filter rows to select the path for the data flow. Because next step it’s customized for each database.


Like said before, this step is customized for each database. It must return a list of all tables that has the field ‘ctrl_carga_id’, except the control table ‘ctrl_carga’.

So, if you are using a different database, such as Mysql, create a new customized path with the same idea.


How I don’t know which tables hold records related to IDs didn’t finished, the step will execute a delete for all IDs in all tables controlled. This step will clean up all data from the problematic runs to be stored again without duplication.

Remember, even a new ID will be recognized as a problematic run, but no data will deleted.


(6) To finalize, save the timestamp at the end to indicate that job finished successfully.







See you !!


(Any corrections for English are welcome.)





Um comentário:

Edward disse...

Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

Big Data Consulting Services

Data Lake Solutions

Advanced Analytics Services

Full Stack Development Solutions