Skip to main content

Hi, I have a product table named ‘product’ in MySQL as follows:

product_id   |   Product     |  FieldName

1        Milk              milk

2        Water           water

3        Coffee          coffee

 

Then, I have a source fully de-normalized table named ‘raw_transaction’ as follows:

transaction_Id   |  Date            | customer | milk   | water  | coffee  | 

1        1/1/2021         John           1

2        1/1/2021         Mary                       1            1

3         1/1/2021        Anna                       1

 

Can you give me a hint on how I can create a pipeline in StreamSets so that I can use the product table as meta-data in creating a dynamic query so that I can populate a ‘FactCustomerProduct’ as follows

 

For each product in products

       INSERT INTO FactCustomerProduct (product_id,date_id,customer_id,transaction_id,quantity)

         SELECT p.product_id,r.date_id,customer_id,r.transaction_id,r.<fieldName>

          FROM ‘raw_transaction’ r <...]

          WHERE r.<fieldName> IS NOT NULL

 

The idea is to avoid hardcoding in the ETL. 

 

I have tried several processors to not avail. 

 

Any hint is most welcome, in essentially, how can I have a loop through the products and then generate dynamically a query that will be populating the Fact table.

 

Thank you. 

 

 

 

 

      

 

@jmazariegos As given in example tables, assuming there is no common key between ‘products’ and ‘raw_transaction’ tables. 

  1. Stage-1: JDBC Query Consumer → select * from (select *, 1 as k1 from products) as A join (select milk,water,coffee,<columns-needed>, 1 as k2 from raw_transaction) as B on A.k1=B.k2; (Since there is no common key in order to implement cross-join used dummy 🔑 s k1 and k2.
  1. Stage-2: Use Stream Selector to filter records which are not null.         ${str:isNullOrEmpty(record:value(str:concat("/",record:value('/FieldName'))))} → this can be assigned to Trash and the left path of Stream Selector can be the target JDBC Producer which will insert to FactCustomerProduct
  2. You might also want to rename fields out of Stream Selector to match the columns in destination table.

Let’s say if there is a foreign key(assuming product_id) relation between products and raw_transaction, you can use JDBC Lookup as next stage to Origin and have query something like below.

select pid, dum, ${record:value('/FieldName')} as quantity from raw_trans where ${record:value('/FieldName')} is not NULL and product_id = ${record:value('/product_idid')}
 


Reply