Skip to main content

Hi there,

 

I have a requirement to read data from Snowflake DB and then use a specific column value as a filter condition for reading from a SQL Server DB. Has anyone dealt with such design in a data collector pipeline?

I am stuck, since I can ONLY read from one source at a time using the JDBC Query Origin and I don’t think, I can pass values of one query as a parameter to another.

 

Example:

Get the max date of a column from a Snowflake table and use that date value as a filter condition for a SQL Server table.

 

Thanks,

Srini

Hi @Srinivasan Sankar I believe that using a two-pipeline approach would be good solution for your needs. The first pipeline would retrieve the field, and the second pipeline would materialize it to query from SqlSeverDB


Hi @Srinivasan Sankar,

You can try below.

  1. Use ‘JDBC Lookup’ stage to fetch the max date of a column from snowflake table
  2. And then use a stream selector to filter records which are having the required snowflake column value using stream selector.

I have tried something similar locally using Snowflake query in JDBC Lookup stage. You might want to replace your SQL Server table with ‘Dev Raw Data Source 1’ in below pipeline.

 

Below is the stream selector condition to filter out the snowflake table column C1 values.

 


Thanks @Pradeep, I did think about using the JDBC Lookup stage but the whole idea is to limit the data  I read from the SQL Server side. The source system custodian does not want me to read all the data from SQL Server.


Hi @Srinivasan Sankar I believe that using a two-pipeline approach would be good solution for your needs. The first pipeline would retrieve the field, and the second pipeline would materialize it to query from SqlSeverDB

@Rishi , could you please help elaborate your answer? How would you call the second pipeline and how does the value get passed from one pipeline to another? 


Hello @Srinivasan Sankar , This is what I was thinking 

Pipeline -1

Dev Raw Data (stop after first batch) → JDBC lookup ( fetch the max date of a column from snowflake table) → Field Type convertor ( correct the date format as you required for SqlServer) → LocalFS ( We can have Shell executor stage as part of event stream)

Shell executor will move this file to SDC_RESOURCE dir 

 

Pipeline -2

JDBC query consumer  → And rest of your pipeline logic → Destination

 

Now here we use this previously generated output file as initial offset and we call this via function

${runtime:loadResource("offset_date", true)}

 

ref : https://docs.streamsets.com/portal/transformer/3.16.x/help/transformer/Pipelines/Runtime.html#concept_fj3_dst_tlb

 

 


Reply