Skip to main content

I am using streamsets pipeline to upload a file from GCS bucket to SFTP location, occasionally i am getting the below error. the error is causing due to renaming the file after uploading to the SFTP location. not sure how to fix it.

 

"errorCode":"REMOTE_UPLOAD_03","errorMessage":"REMOTE_UPLOAD_03 - Error happened when writing to file 'net.schmizz.sshj.sftp.SFTPException: Resource busy. Failed to temp/_tmp_testTrigger.txt to temp/testTrigger.txt because the file is currently in progress.'. Reason: {}","errorTimestamp":1667283307037,"errorStackTrace":"net.schmizz.sshj.sftp.SFTPException: Resource busy. Failed to rename temp/_tmp_testTrigger.txt to temp/testTrigger.txt because the file is currently in progress.\n\tat net.schmizz.sshj.sftp.Response.error(Response.java:140)\n\tat 

in progress.\n\tat net.schmizz.sshj.sftp.Response.error(Response.java:140)\n\tat net.schmizz.sshj.sftp.Response.ensureStatusIs(Response.java:133)\n\tat net.schmizz.sshj.sftp.Response.ensureStatusPacketIsOK(Response.java:125)\n\tat net.schmizz.sshj.sftp.SFTPEngine.rename(SFTPEngine.java:245)\n\tat net.schmizz.sshj.sftp.SFTPClient.rename(SFTPClient.java:124)\n\tat net.schmizz.sshj.sftp.SFTPClient.rename(SFTPClient.java:119)\n\tat com.streamsets.pipeline.lib.remote.ChrootSFTPClient.renameInternal(ChrootSFTPClient.java:237)\n\tat com.streamsets.pipeline.lib.remote.ChrootSFTPClient.rename(ChrootSFTPClient.java:215)\n\tat com.streamsets.pipeline.lib.remote.SFTPRemoteFile.commitOutputStream(SFTPRemoteFile.java:51)\n\tat com.streamsets.pipeline.stage.destination.remote.RemoteUploadTarget.write(RemoteUploadTarget.java:123)\n\tat com.streamsets.pipeline.api.base.configurablestage.DTarget.write(DTarget.java:34)\n\tat com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:291)\n\tat com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)\n\tat com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)\n\tat com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299)\n\tat com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:859)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.lambda$executeRunner$3(ProductionPipelineRunner.java:903)\n\tat com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195)\n\tat com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140)\n\tat com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:902)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:880)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:602)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:388)\n\tat com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:525)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:100)\n\tat com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:63)\n\tat com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.startInternal(StandaloneRunner.java:746)\n\tat com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:739)\n\tat com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:150)\n\tat com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)\n\tat com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)\n\tat com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)\n\tat com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)\n\tat com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)\n\tat com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)\n\tat com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)\n\tat com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat 

Hi @mskarthik648  As you mentioned the error is due to the error is causing due to renaming the file just after uploading to the SFTP location.  

I think you can take advantage of the SDC feature of naming the files _tmp until they have been completely copied. 

SDC does this so that the receiving system will not access the file while it is still being copied. Can you change the receiving system to ignore the _tmp files for renaming or whatever downstream processing you are doing?  If we don't wait until they have been completely copied.

 

OR

Another way to do this might be to transfer the file to a different directory on the SFTP Server, when it's completed use Shell Executor to rename the file into the "actual" destination directory - using the events that can be generated when a file is completely transferred.  SFTP can generate a "file-closed" event, or "wholeFileProcessed" (Generated in Whole File mode when the destination completes streaming a whole file.)
 


@mskarthik648 

If the file exists and you want to overwrite ,then you can select over write option in sftp destination processor.

 

 


@Rishi @Bikram ,

Yes i have set the option to override in the SFTP client if the file already exist.

 

for the renaming issue, I haven’t tried to rename it, the SFTP client itself creating the file with _temp_<original file name> and once the upload is completed then its trying to rename it to the original file name.

 


Reply