We have an requirement to transform Complex XML data into JSON using XSLT. This needs to be done in DataCollector. The incoming file will contain millions of records and for each record, we need to apply XSLT and write the output to S3 location.
I could not find resource on support for XSLT in DataCollector documentation. Could you please help me with this query?
Note 1: We also have similar use case to transform JSON data to XML. Does StreamSets support usage of FreeMarker in DataCollector pipeline.
Note 2: For both XSLT and freemarker, both uses external java functions to support transformation
Note 3: For both XSLT and freemarker, they are compiled once for the run for better performance.
Regards
Varadha
Best answer by Dash
Hi Varadha,
You can use one of the scripting processors to import libraries (installed on where SDC is running) that will enable you to transform XML. For example, in Groovy processor you can use code similar to the following.
// Loop through the records in the batch records = sdc.records for (record in records) { try { // Create a parser for the XML text DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml" Document doc = db.parse(new ByteArrayInputStream(record.value["input"]["xml"].getBytes())); doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt) StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["input"]["xslt"].getBytes())); TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field ByteArrayOutputStream stream = new ByteArrayOutputStream(); tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream)); record.value["xml"] = stream;
// Write a record to the processor output sdc.output.write(record) } catch (e) { // Write a record to the error pipeline sdc.log.error(e.toString(), e) sdc.error.write(record, e.toString()) } }
Senior Technical Evangelist and Developer Advocate at Snowflake
Answer
August 30, 2021
Hi Varadha,
You can use one of the scripting processors to import libraries (installed on where SDC is running) that will enable you to transform XML. For example, in Groovy processor you can use code similar to the following.
// Loop through the records in the batch records = sdc.records for (record in records) { try { // Create a parser for the XML text DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml" Document doc = db.parse(new ByteArrayInputStream(record.value["input"]["xml"].getBytes())); doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt) StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["input"]["xslt"].getBytes())); TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field ByteArrayOutputStream stream = new ByteArrayOutputStream(); tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream)); record.value["xml"] = stream;
// Write a record to the processor output sdc.output.write(record) } catch (e) { // Write a record to the error pipeline sdc.log.error(e.toString(), e) sdc.error.write(record, e.toString()) } }
Thanks for your quick response. I am getting below given error. Could you please help?
com.streamsets.pipeline.api.base.OnRecordErrorException: SCRIPTING_04 - Script sent record to error: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp") at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err.write(ScriptingProcessorInitDestroyBindings.java:48) at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err$write.call(Unknown Source) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:133) at Script22.run(Script22.groovy:40) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317) at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72) at javax.script.CompiledScript.eval(CompiledScript.java:92) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167) at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95) at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35) at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299) at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352) at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195) at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140) at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runPollSource(PreviewPipelineRunner.java:330) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.run(PreviewPipelineRunner.java:220) at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:533) at com.streamsets.datacollector.runner.preview.PreviewPipeline.run(PreviewPipeline.java:39) at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:226) at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$1(AsyncPreviewer.java:93) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
// Loop through the records in the batch records = sdc.records for (record in records) { try { // Create a parser for the XML text DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml" Document doc = db.parse(new ByteArrayInputStream(record.value["xml"].getBytes())); doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt) StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["xslt"].getBytes())); TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field ByteArrayOutputStream stream = new ByteArrayOutputStream(); tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream)); record.value["xml"] = stream;
// Write a record to the processor output sdc.output.write(record) } catch (e) { // Write a record to the error pipeline sdc.log.error(e.toString(), e) sdc.error.write(record, e.toString()) } }
// Groovy will parse files in a different context, so we need to grant it additional privileges grant codebase "file:///groovy/script" { permission java.lang.RuntimePermission "getClassLoader"; permission java.util.PropertyPermission "*", "read"; };
6. File "xercesImpl-2.11.0.jar" is available in /opt/sdc-extras folder
But still getting error given below. Could you please help?
2021-09-06 08:28:47,852 ERROR java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp") GroovyProcessor *da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d 0 RandomDataGenerator-0::testRun__05d2ea06-993a-42cb-bc09-2cd69c1f250b__149e11c1-f697-11eb-b9dc-fd846d33049d__da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp") at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) at java.security.AccessController.checkPermission(AccessController.java:886) at java.lang.SecurityManager.checkPermission(SecurityManager.java:549) at java.lang.SecurityManager.checkPackageAccess(SecurityManager.java:1564) at java.lang.Class.checkPackageAccess(Class.java:2372) at java.lang.Class.checkMemberAccess(Class.java:2351) at java.lang.Class.getConstructor(Class.java:1824) at org.codehaus.groovy.reflection.ClassLoaderForClassArtifacts.defineClassAndGetConstructor(ClassLoaderForClassArtifacts.java:83) at org.codehaus.groovy.runtime.callsite.CallSiteGenerator.compilePojoMethod(CallSiteGenerator.java:234) at org.codehaus.groovy.reflection.CachedMethod.createPojoMetaMethodSite(CachedMethod.java:262) at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createCachedMethodSite(PojoMetaMethodSite.java:162) at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createPojoMetaMethodSite(PojoMetaMethodSite.java:151) at groovy.lang.MetaClassImpl.createPojoCallSite(MetaClassImpl.java:3393) at org.codehaus.groovy.runtime.callsite.CallSiteArray.createPojoSite(CallSiteArray.java:132) at org.codehaus.groovy.runtime.callsite.CallSiteArray.createCallSite(CallSiteArray.java:166) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117) at Script3.run(Script3.groovy:20) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317) at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72) at javax.script.CompiledScript.eval(CompiledScript.java:92) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167) at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95) at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35) at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299) at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352) at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195) at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140) at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.processBatch(PreviewPipelineRunner.java:272) at com.streamsets.datacollector.runner.StageRuntime$3.run(StageRuntime.java:371) at java.security.AccessController.doPrivileged(Native Method) at com.streamsets.datacollector.runner.StageRuntime.processBatch(StageRuntime.java:367) at com.streamsets.datacollector.runner.StageContext.processBatch(StageContext.java:287) at com.streamsets.pipeline.stage.devtest.RandomDataGeneratorSource$GeneratorRunnable.run(RandomDataGeneratorSource.java:298) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)