Question

How to rename a file with its original file name with the Local FS stage

  • 27 June 2023
  • 0 replies
  • 44 views

Userlevel 1

This article’s purpose is to show how to maintain the original filename when processing files from a Directory to a Local FS maintaining the original name, records and structure.

We will show and review a sample pipeline that you can recreate to fit into your deployment. In this example. we will read from a CSV file and write to a CSV file:

 

 

Directory 1 origin

Here you can set up your origin to read a file from a local directory

 

Jython Evaluator

In the Jython configuration tab, add this code:

from com.streamsets.pipeline.api.ext import DataCollectorServices

# Sample Jython code
for record in sdc.records:
try:

if sdc.state['previous_file'] == sdc.NULL_STRING:
sdc.state['previous_file'] = record.attributes['filename']

if record.attributes['filename'] != sdc.state['previous_file']:
record.attributes['roll'] = 'True'
DataCollectorServices.instance().put("orig_filename", sdc.state['previous_file'])
sdc.state['previous_file'] = record.attributes['filename']

# Write record to processor output

sdc.output.write(record)

except Exception as e:
# Send record to error
sdc.error.write(record, str(e))

 

Local FS

Here, configure your target folder and the data format you want to use. Also, activate the setting to generate events.

 

Groovy Evaluator

Add this sample code to the Groovy tab:

import com.streamsets.pipeline.api.ext.DataCollectorServices;

# Sample Groovy code
for (record in records) {
try {

#this sentence changes the original filename attribute for each record with the orig_filename we have set up before

record.attributes['orig_filename'] = DataCollectorServices.instance().get("orig_filename")

# Write a record to the processor output
output.write(record)
} catch (e) {
# Write a record to the error pipeline
sdc.log.error(e.toString(), e)
error.write(record, e.toString())
}
}

 

Shell

You have to define 3 variables in the Environment tab:

filepath ${record:value('/filepath')}
filename ${record:value('/filename')}
orig_filename ${record:attribute('orig_filename')}

And then, go to Script tab:

#! /bin/bash
dirname=${filepath%/*}

echo ${dirname}
echo ${filename}
echo ${orig_filename}

cd ${dirname}

mv ${filename} ${orig_filename}

exit 0

Note: This script will rename your file by using the ‘mv’ command.

Note 2: These scripts are not supported by StreamSets and are to be used as examples only. The scripts should be subjected to your organization's code review and functional testing policies before use.


0 replies

Be the first to reply!

Reply