Skip to main content

Hi,

I am new to streamsets here, so please bear with my questions 🙂 Here is my first one. 

I created a simple pipeline to copy data from a employee table running sqlserver to employee table on postgresql. I used a JDBC Query consumer to pull records in incrementally using the offset value and a JDBC Producer stage to insert records. 

When I run the pipeline, it starts execution, select all records from sqlserver and inserts into postgres and then return a java.lang.NullPointerException. It goes into retry mode and continues to fail with same error on every retry attempt. 

I tried adding more records in the table so that the next retry attempt would pull the new records based on the offset value, however, the pipeline is not pulling them. 

I also tried filtering null records using stream selector, but it didnt work. 

I also added Pipeline Finisher stage to end the pipeline when a “no more data” event is generated, but it doesnt seem to work as expected either. The pipeline continues to throw java.lang.NullPointerException on subsequent retires. 

Kindly advise as to where I am going wrong here. 

I have attached the log file(in case it helps)

Pipeline Config - 
JDBC Query Consumer stage - 
    - JDBC connection to sqlserver
    - SQL Query - SELECT empid, empName, createTS from MWPOC.dbo.employee where createTS >  ${OFFSET} order by createTS;
    - Initial Offset  - '2023-03-13 09:49:50.707'
    - Offset Column - createTS
    - Mode - Incremental

Offset value in offset.json file in SDC after the pipeline stops is given below. 

"offsets" : {
    "$com.streamsets.datacollector.pollsource.offset$" : "2023-03-15 14:33:16.04"
  },

 


JDBC Producer 1 stage - 
    - JDBC connection to postgresql 
    - schema - schema name of postgresql database
    - table name - employee

Error Stack Trace: (attached) 

Pipeline Flow: 

Pipeline 

 

@pasha001 

can you please try with initial offset as 0 and execute the pipeline with reset from origin.

Please keep other configurations as it is .

 

Thanks & Regards

Bikram_

 


@pasha001 

One more point , please disable the incremental mode option and re-run the pipeline with reset origin mode .

Kindly let me know if it helps.


@Bikram , Thanks for you suggestions, the pipeline works just fine with full mode, but it creates duplicate entries on the target tables which I want to avoid.

My goal is to check for new records for every 5 seconds, pull them as they are added to the source table, and copy them over to the target side. If there are no new records to process, the pipeline job should end gracefully and run again as scheduled. I thought incremental mode is the right approach for my usecase, but it is not working as expected. Any suggestions to fix is appreciated. thanks 


@pasha001 

I believe the primary key setup not done properly in your destination DB.

Please set the primary key column and query the table based  offset for the primary key and it will work as it is.


Hi @Bikram, I have not setup Primary Key constraint in my sample table, however, let’s say I setup empid as the primary key column, I still want the offset to be based off the create_timestamp column to identify newly inserted records based on run intervals.

I believe you can have only one offset column in the configuration. Is there a way, to set more than 1 field (both empid and createts fields in my case) as the offsets? If it is not possible, how to detect and pull only newly added records from the table every-time the pipeline runs and exclude those that were pulled in previous runs. Appreciate your help. Thanks 


@pasha001 

We can handle the newly added records in multiple way.

Attached  the sample pipeline for your reference.

 

step1 :

Fetch data from source table based on offset column .

Step2 :

Add lookup processor and check if the data is available in the destination db based on the primary key column.

step3 :

if count == 0 , then data not present in destination DB and it will insert the row to destination DB using JDBC producer.

Else 

if you want you can update the existing rows if there are any update in it.

 

Kindly check if it helps , else i will provide you the pipeline based on the time stamp.

 

Thanks & regards

Bikram_

 

 

 


@Bikram this is great, however, I cannot do lookups on the target side. We are talking about copying 1000s of records every 5 seconds generated by 100s of devices. Doing a lookup for each record on target would take a lot of time and would impact performance of the pipeline and the target db. That’s why we don’t want to do lookup/update records on target, rather just copy them over and avoid pulling duplicates based on timestamp. 

A timestamp based pipeline would be helpful. 

Btw, I changed my query to pull records based on empid column as the offset instead of createts column and it runs perfectly fine with the incremental load. I didn’t have to add a primary key constraint on the empid column which is of integer data type. So, I wonder why it has a problem with the createts column whose datatype is a timestamp as the offset.

 


@pasha001 

I believe the incremental load works for primary keys having integer type or string type.

Always try to fetch the data with primary key offset .

If there are no more please mark the issue as resolved.

Happy to help you in case of any questions/concerns.

Thanks & Regards

Bikram_


@Bikram , can you please share the pipeline with offset based on the timestamp column, thanks 


@pasha001 

can you please try the attached pipeline and check if it helps.


@Bikram , your pipeline flow with timestamp field as offset runs without throwing nullpointerexception, However, it doesn’t insert data into target table properly. I have attached an image of the target table records for your reference below. The datatypes for empid and empname on source and target tables are same. Can you tell me how did you resolve the nullpointerexception? Thanks 

 

 


Hi @Bikram , The issue is finally resolved and I figured out the root cause of NullPointerException and also why your code was only populating empid field and not the rest of them.

  • NullPointerException occurs when the value in Initial Offset field is enclosed in single quotes and the $OFFSET variable used in query is not. It works fine initially, but results in a sql syntax error after all records are processed, but the product is not identifying it correctly and throwing a generic error such as NullPointerException. A suggestion to product team would be handle such situatons gracefully and provide ample information in the error stack trace to help determine the root cause. Below screen shot has the query and offset value which can cause this issue. 
  • Populating Null records in few fields - This occurs when the SDC is unable to find a field in target tables that matches the exact name & case type of source table fields. In my case, the field names in source table were empid, empName and createTS, whereas they were named as empid, empname and createts on the target side.

This issue can now be marked as resolved. 

Thanks a ton for your help, it was a great learning experience! Much Appreciated!!!


Reply