Data Collector - Field Renamer stage

  • 6 December 2021
  • 0 replies
  • 290 views

Userlevel 3
Badge +1

I am on Data Collector 3.21.x

I observed the following and I would like to share with the community. Unsure whether this is a bug OR a gap that needs to be addressed in the documentation. 

 

My pipeline is simple: Reads from a SQL Sever DB table and writes to AWS S3.

I use a Origin stage followed by a “Field Renamer” stage.

 

Scenario 1: 

When I use the JDBC multi-table consumer origin, i.e., I read all the fields from the source table I do not need to use the “REPLACE” option in “Rename” tab within the “Field Renamer” stage. Refer picture below,

 

Pipeline Design

 

 

Field Renamer Stage Config.

 

 

 

Scenario 2:

When I use the JDBC Query Consumer stage as my origin and I issue a SELECT statement that is NOT SELECT * FROM …. i.e., I only select a few columns. Example: Select <column1> from <table1>, I have to use the REPLACE option in “Target Field Already Exists” otherwise my pipeline fails.

 

Pipeline Design

 

com.streamsets.pipeline.api.base.OnRecordErrorException: FIELD_RENAMER_01 - Target Fields '/<column1>' cannot be overwritten for record 'SELECT <column1> FROM XXXX.dbo.<table1>::rowCount:0'
    at com.streamsets.pipeline.stage.processor.fieldrenamer.FieldRenamerProcessor.process(FieldRenamerProcessor.java:364)
    at com.streamsets.pipeline.api.base.SingleLaneRecordProcessor.process(SingleLaneRecordProcessor.java:53)
    at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95)
    at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:299)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:244)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:311)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:221)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:364)
    at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:207)
    at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:152)
    at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:132)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:360)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runPollSource(PreviewPipelineRunner.java:342)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.run(PreviewPipelineRunner.java:232)
    at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:545)
    at com.streamsets.datacollector.runner.preview.PreviewPipeline.run(PreviewPipeline.java:51)
    at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:238)
    at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$1(AsyncPreviewer.java:105)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:34)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:34)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

Here <column1> and <table1> refers to one column in my SQL Server DB Table.

 

 

Field Renamer Stage Config.

 

To overcome the above error, I had to set the REPLACE option as shown below,

 

 

Let me know if my understanding is correct. 

 

Not sure whether this is a bug or a gap in the documentation. Depending on whether you do a SELECT * or SELECT only a subset of columns you need to tweak the “Field Renamer” stage when your source stage is “JDBC Query Consumer”.


0 replies

Be the first to reply!

Reply