Scenario : Processing data in JSON format using Encrypt/decrypt processor and loading it to a destination.
Steps to implement the pipeline without receiving Invalid ciphertext type error.
1.First Pipeline reads data in JSON format sends it to the Encrypt/decrypt processor with action as Encrypt and load it to Kafka Producer with data format as JSON.
Few columns are encrypted using Amazon KMS and fed to Kafka producer.
2.Second pipeline reads data from Kafka Consumer with data format as JSON.
Field Type convertor is used to converts columns to Byte Array before feeding data into Encrypt/Decrypt processor because the Encrypt/Decrypt processor will only accept columns with data type as Byte Array.
Columns that are converted to Byte Array are sent one by one into a Base64 Field Decoder. Please note that We can pass one field per Base64 Field Decoder.
From the Base64 Field Decoders the columns are sent to Encrypt/decrypt processor with action as decrypt and finally the data is written to a file/S3 destination.
Please Note :
The columns which are to be decrypted are converted to Byte Array before being sent to decrypt processor. Otherwise, error is thrown saying those columns should be Byte Array.
When the Data format in Kafka Producer/consumer is set as JSON and We pass the data to Encrypt/decrypt fields stage without using Base64 Field Decoder, below error occurs
1157393822@115fcc36: com.streamsets.datacollector.util.PipelineException: PREVIEW_0003 - Encountered error while previewing : com.amazonaws.encryptionsdk.exception.BadCiphertextException: Invalid ciphertext type.
com.streamsets.datacollector.util.PipelineException: PREVIEW_0003 - Encountered error while previewing : com.amazonaws.encryptionsdk.exception.BadCiphertextException: Invalid ciphertext type.
As a workaround, We can use the SDC record data format in Kafka stages but if the requirement is to use JSON format, above approach is the best to achieve the goal.