Question

Handling multiple destination tables in data copy/CDC pipeline

  • 21 December 2022
  • 2 replies
  • 165 views

Hello,

Scenario:  Multiple tables from source DB are to be replicated in the target DB. Whatever changes happen in source tables should be reflected in the target tables. When a record gets deleted, an entry with all columns of the record must be saved to a file. Both source and target are postgres DBs.

I have implemented this for a single table. Created one pipeline that simply performs one time data copy from source to target. Then created a CDC pipeline like in the picture below. I am using the JDBC Producer destination. 

Though this works as designed for single table, I am not sure if this is the right approach when working with multiple tables. Though the CDC origin accepts the table name pattern “source_tab%” , the JDBC Producer destination seems to allow just one table name. i.e. though CDC is giving me all updates for all records, I can only use one table name at a time in the JDBC producer destination.

I have tried using EL in the table name e.g. ${record:attribute(/'change.table')} but got validation error. 

Do I need to create multiple jobs for multiple tables and pass tables names as runtime parameters?

What is the best way to implement this? Looks like I am missing something obvious. Please help.

 


2 replies

Can anyone help?

I see this in documentation for PostgresCDC, Aurora CDC. Is something similar available for Aurora postgres destination?

 

Userlevel 4
Badge

Hi @Dhanashri_Bhate 

 

The approach demonstrated in your post above is exactly how it is recommended to implement CDC. First pipeline to read all the historical data and the second pipeline to read the changes. Depending upon the database of CDC client, you can use Change Number, Earliest changes, latest changes or even a specific date time from which you want to track changes.

 

Regarding your first query, try removing the / from record:attribute.

 

For e.g. for Oracle CDC, you can use ${record:attribute('oracle.cdc.table')}. This will refer to the table number at run time.

Reply