Question

Streamsets for processing large CSV files?

  • 18 February 2023
  • 19 replies
  • 258 views

We currently have a C# console application for processing large CSV files. We are exploring using Streamsets to perform the same. Below is my high-level requirement.

  1. Currently we have 50+ consumers calling our application by placing a CSV file at               on-premises shared drive folder.
  2. The file size can be from 10,000 to 2 million rows. 
  3. The application should be able to handle processing up to 5 million rows an hour.
  4. Each file drop triggers an individual process run. So that multiple files are processed simultaneously.
  5. Below are the high-level steps that are involved in processing the file.
    1. Write the file to a database table.
    2. Enrich every record with additional data attributes.
    3. Make an API call for each record.
    4. Update the DB record with the response received from the API call.
    5. Also write the response of the API call to a result CSV file for the consumer.
    6. After all the records are processed write the result CSV file for the consumer to pick from the shared drive.

 Thank you.


19 replies

Userlevel 5
Badge +1

@jkathawalla 

There are different ways to handle the above scenario.

 

Proposal 1 :

Keep the source files under the SDC local directory and try to fetch data from the file and send to respective destinations.

Proposal 2:

Keep source csv file under S3 bucket folder and fetch data from S3 in streamsets data collector and send to destination .

 

Please check if you manage to read all the files by using the below options , if you are fine with that then you can move with you next steps to send data to destination.

 

I will help you on this , please confirm me if the file reading is working as expected or not.

 

We will move step by step .

 

Thanks & Regards

Bikram_

 

@Bikram Thank you Bikram for the response and I like the idea of doing to step by step. I can neither keep the file in the SDC local directory nor can I fetch it from Amazon S3 bucket. I have to pull it from a remote drive location, but I think I will be able to do it using the SFTP/FTP/FTPS client stage. Agree?

@Bikram Our main concern is if streamset will be able to handle multiple client files simultaneously and process them in parallel. Currently the C# application can handle upto 2 million in an hour which is a problem. We want to be able to scale to 5 million an hour at least.

@Bikram  I other question I had was if there is a way to write a HTTP API Response to a csv file and store it to a remote location?

Userlevel 5
Badge +1

@jkathawalla 

HTTP API response generally comes with JSON format I believe and it can be store as a CSV format.

In this case the json data should converted to CSV using jython,Groovy or Java script processors.

 

 

Please let me know if I can help you on your use case , then I will be happy to help you.

 

Thanks & Regards

Bikram_

 

 

 

@Bikram Yes, the response of the API call is a JSON object with the below attributes. I need to create a CSV file and write it to a remote location. So if you can help me with this use case that would be wonderful. 

 

{

    "ApplicationId": "3585656b-f5b1-ed11-9ab9-005056a2dd48",

    "SourceSystem": "JXK6500",

    "SourceSystemId": "JXK6500",

    "CommunicationAllowed": "Pass",

    "ExpirationDate": "2023-03-07T09:37:51.676-05:00",

    "Priority": 1,

    "FailureReasons": null

}

Userlevel 5
Badge +1

@jkathawalla 

I have created the pipeline which is converting json to csv file.

Pipeline 1 : Reading json data from source and converting it to csv file and keeping it in local directory

pipeline 2 : Fetching  .csv file from the local directory and sending to sftp server as a whole file.

 

Two jobs has been created because these pipeline are dependant each other.

Created orchestration pipeline to complete the task.

In this case , after successful execution of first pipeline execution , the second pipeline will trigger and send file file to destination.

Please find attached the pipelines and jobs for your use case and let me know if it helps.

 

Please do let me know if you have any concerns/questions.

 

 Thanks & Regards

Bikram_

 

@Bikram  Thank you. I will take a look at this today and provide you feedback. But truly appreciate your help. 

@Bikram  Another question - How do I handle multiple files available at the source? We can have multiple consumers place the file at the source FTP server.

As an example, if there are 3 files available for processing, the pipeline should create 3 individual result file. One for each consumer. We cannot have all the data dumped into a single result file. 

Userlevel 5
Badge +1

@jkathawalla 

From SFTP server we can read all the files by using (*.csv) as a whole file from the source and add filter which file needs to be send to the destinations.

Please let me know your use case and I will create pipeline for you and provide you the same.

 

Thanks & Regards

Bikram_

@Bikram  Will there be any issues reading multiple files as a whole file from the file size standpoint? So every consumer will send files with a unique “SourceSystem” attribute for the entire file. We can use that field to filter records. Below is the layout for the CSV file even though I am showing it in JSON here.

 

{

  "SourceSystemId": "JXK6500",

  "SourceSystem": "JXK6500",

  "RecipientRole": "Member",

  "RecipientMemberCardId": "",

  "RecipientMemberDependentCode": "",

  "RecipientPlatformCode": "",

  "RecipientMemberGenkey": "",

  "RecipientSdrPersonId": "12345",

  "MessageType": "Emergency",

  "MessageDefinitionCode": "IT_9000_XYZ",

  "CommunicationType": "Manual",

  "CommunicationChannel": "VAT",

  "CommunicationToken": "30030303030",

  "MCREPreferenceId": "N/A"

}

@Bikram  I am getting the below error when I try to preview the sample pipelines you created

VALIDATION_0096 - This pipeline was created in version 5.1.0 and is not compatible with current version 4.4.1...

Userlevel 4
Badge

@jkathawalla 

 

for some origins like Directory there is an option to multithread. SFTP origin does not support that. In this case if you have a pipeline with SFTP origin it will process 1 file at a time.

Is it possible to mount your on-prem file system to your data collector? In that case you can use Directory origin and use the multi threading functionality.

 

@saleempothiwala  thank you Saleem for your feedback. It is a good idea. I will check with the Streamsets platform team at my organization if we can map the remote origin to SDC.

Assuming that is not an option. Can I still do multithreading at the HTTP Client Stage which is the next stage that follows the SFTP/FTP/FTPS Client stage?

@Bikram  - Any updates on the below.

“From SFTP server we can read all the files by using (*.csv) as a whole file from the source and add filter which file needs to be send to the destinations.

Please let me know your use case and I will create pipeline for you and provide you the same.”

Userlevel 5
Badge +1

@jkathawalla 

I have prepared the pipeline to read multiple csv files from local directory  and added filter on “SourceSystem” and sending data to different destination and its working fine.

Attached the pipeline for your reference and let me know if you have any issues/concerns.

 

Reading csv file as whole and adding filter on the field “sourcesystem” will be bit complex to handle it . so i came up with the solution and let me know if it helps or not.

 

Thanks & Regards

Bikram_

@Bikram Thank you for the sample pipeline. Unfortunately for production environment this will not work. We cannot read/write files from local directories. The source data is placed on remote location that needs to be read and processed. Unless there is a way to read from remote location and write to remote location we will not be able to use streamsets.

Hi There

 

can i have sample solution that reads mulitple csv file in directory and load results in a sql table 

Userlevel 5
Badge +1

@jkathawalla 

 

Please find attached the pipeline for your reference to read data from multiple files and send to oracle db using jdbc producer.

Reply