Hi, I have a product table named ‘product’ in MySQL as follows:
product_id | Product | FieldName
1 Milk milk
2 Water water
3 Coffee coffee
Then, I have a source fully de-normalized table named ‘raw_transaction’ as follows:
transaction_Id | Date | customer | milk | water | coffee |
1 1/1/2021 John 1
2 1/1/2021 Mary 1 1
3 1/1/2021 Anna 1
Can you give me a hint on how I can create a pipeline in StreamSets so that I can use the product table as meta-data in creating a dynamic query so that I can populate a ‘FactCustomerProduct’ as follows
For each product in products
INSERT INTO FactCustomerProduct (product_id,date_id,customer_id,transaction_id,quantity)
Any hint is most welcome, in essentially, how can I have a loop through the products and then generate dynamically a query that will be populating the Fact table.
Thank you.
Best answer by Pradeep
@jmazariegos As given in example tables, assuming there is no common key between ‘products’ and ‘raw_transaction’ tables.
Stage-1: JDBC Query Consumer → select * from (select *, 1 as k1 from products) as A join (select milk,water,coffee,<columns-needed>, 1 as k2 from raw_transaction) as B on A.k1=B.k2; (Since there is no common key in order to implement cross-join used dummy s k1 and k2.
Stage-2: Use Stream Selector to filter records which are not null.${str:isNullOrEmpty(record:value(str:concat("/",record:value('/FieldName'))))}→ this can be assigned to Trash and the left path of Stream Selector can be the target JDBC Producer which will insert to FactCustomerProduct
You might also want to rename fields out of Stream Selector to match the columns in destination table.
Let’s say if there is a foreign key(assuming product_id) relation between products and raw_transaction, you can use JDBC Lookup as next stage to Origin and have query something like below.
select pid, dum, ${record:value('/FieldName')} as quantity from raw_trans where ${record:value('/FieldName')} is not NULL and product_id = ${record:value('/product_idid')}
@jmazariegos As given in example tables, assuming there is no common key between ‘products’ and ‘raw_transaction’ tables.
Stage-1: JDBC Query Consumer → select * from (select *, 1 as k1 from products) as A join (select milk,water,coffee,<columns-needed>, 1 as k2 from raw_transaction) as B on A.k1=B.k2; (Since there is no common key in order to implement cross-join used dummy s k1 and k2.
Stage-2: Use Stream Selector to filter records which are not null.${str:isNullOrEmpty(record:value(str:concat("/",record:value('/FieldName'))))}→ this can be assigned to Trash and the left path of Stream Selector can be the target JDBC Producer which will insert to FactCustomerProduct
You might also want to rename fields out of Stream Selector to match the columns in destination table.
Let’s say if there is a foreign key(assuming product_id) relation between products and raw_transaction, you can use JDBC Lookup as next stage to Origin and have query something like below.
select pid, dum, ${record:value('/FieldName')} as quantity from raw_trans where ${record:value('/FieldName')} is not NULL and product_id = ${record:value('/product_idid')}