We have an requirement to transform Complex XML data into JSON using XSLT. This needs to be done in DataCollector. The incoming file will contain millions of records and for each record, we need to apply XSLT and write the output to S3 location.
I could not find resource on support for XSLT in DataCollector documentation. Could you please help me with this query?
Note 1: We also have similar use case to transform JSON data to XML. Does StreamSets support usage of FreeMarker in DataCollector pipeline.
Note 2: For both XSLT and freemarker, both uses external java functions to support transformation
Note 3: For both XSLT and freemarker, they are compiled once for the run for better performance.
Regards
Varadha
Best answer by Dash
Hi Varadha,
You can use one of the scripting processors to import libraries (installed on where SDC is running) that will enable you to transform XML. For example, in Groovy processor you can use code similar to the following.
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.w3c.dom.Document;
// Loop through the records in the batch
records = sdc.records
for (record in records) {
try {
// Create a parser for the XML text
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml"
Document doc = db.parse(new ByteArrayInputStream(record.value["input"]["xml"].getBytes()));
doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt)
StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["input"]["xslt"].getBytes()));
TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field
ByteArrayOutputStream stream = new ByteArrayOutputStream();
tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream));
record.value["xml"] = stream;
// Write a record to the processor output
sdc.output.write(record)
} catch (e) {
// Write a record to the error pipeline
sdc.log.error(e.toString(), e)
sdc.error.write(record, e.toString())
}
}
Senior Technical Evangelist and Developer Advocate at Snowflake
67 replies
Answer
August 30, 2021
Hi Varadha,
You can use one of the scripting processors to import libraries (installed on where SDC is running) that will enable you to transform XML. For example, in Groovy processor you can use code similar to the following.
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.w3c.dom.Document;
// Loop through the records in the batch
records = sdc.records
for (record in records) {
try {
// Create a parser for the XML text
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml"
Document doc = db.parse(new ByteArrayInputStream(record.value["input"]["xml"].getBytes()));
doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt)
StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["input"]["xslt"].getBytes()));
TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field
ByteArrayOutputStream stream = new ByteArrayOutputStream();
tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream));
record.value["xml"] = stream;
// Write a record to the processor output
sdc.output.write(record)
} catch (e) {
// Write a record to the error pipeline
sdc.log.error(e.toString(), e)
sdc.error.write(record, e.toString())
}
}
Thanks for your quick response. I am getting below given error. Could you please help?
com.streamsets.pipeline.api.base.OnRecordErrorException: SCRIPTING_04 - Script sent record to error: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp") at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err.write(ScriptingProcessorInitDestroyBindings.java:48) at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err$write.call(Unknown Source) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:133) at Script22.run(Script22.groovy:40) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317) at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72) at javax.script.CompiledScript.eval(CompiledScript.java:92) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190) at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167) at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95) at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35) at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299) at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352) at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195) at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140) at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runPollSource(PreviewPipelineRunner.java:330) at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.run(PreviewPipelineRunner.java:220) at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:533) at com.streamsets.datacollector.runner.preview.PreviewPipeline.run(PreviewPipeline.java:39) at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:226) at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$1(AsyncPreviewer.java:93) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
// Loop through the records in the batch records = sdc.records for (record in records) { try { // Create a parser for the XML text DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder();
// Parse the string in input field "/input/xml" Document doc = db.parse(new ByteArrayInputStream(record.value["xml"].getBytes())); doc.normalize();
// Use XSLT to transform it (xlst is in /input/xslt) StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["xslt"].getBytes())); TransformerFactory tf = TransformerFactory.newInstance();
// Store the transformed XML document in a field ByteArrayOutputStream stream = new ByteArrayOutputStream(); tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream)); record.value["xml"] = stream;
// Write a record to the processor output sdc.output.write(record) } catch (e) { // Write a record to the error pipeline sdc.log.error(e.toString(), e) sdc.error.write(record, e.toString()) } }
4. Added below entry to $SDC_CONF/sdc-security.policy
// user-defined external directory
grant codebase "file:///opt/sdc-extras/-" {
permission java.security.AllPermission;
};
// Groovy will parse files in a different context, so we need to grant it additional privilegesgrant codebase "file:///groovy/script" {
permission java.lang.RuntimePermission "getClassLoader";
permission java.util.PropertyPermission "*", "read";
};
6. File "xercesImpl-2.11.0.jar" is available in /opt/sdc-extras folder
But still getting error given below. Could you please help?
2021-09-06 08:28:47,852 ERRORjava.security.AccessControlException: accessdenied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp") GroovyProcessor *da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d0 RandomDataGenerator-0::testRun__05d2ea06-993a-42cb-bc09-2cd69c1f250b__149e11c1-f697-11eb-b9dc-fd846d33049d__da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d
java.security.AccessControlException: access denied ("java.lang.RuntimePermission""accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp")
at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472)
at java.security.AccessController.checkPermission(AccessController.java:886)
at java.lang.SecurityManager.checkPermission(SecurityManager.java:549)
at java.lang.SecurityManager.checkPackageAccess(SecurityManager.java:1564)
at java.lang.Class.checkPackageAccess(Class.java:2372)
at java.lang.Class.checkMemberAccess(Class.java:2351)
at java.lang.Class.getConstructor(Class.java:1824)
at org.codehaus.groovy.reflection.ClassLoaderForClassArtifacts.defineClassAndGetConstructor(ClassLoaderForClassArtifacts.java:83)
at org.codehaus.groovy.runtime.callsite.CallSiteGenerator.compilePojoMethod(CallSiteGenerator.java:234)
at org.codehaus.groovy.reflection.CachedMethod.createPojoMetaMethodSite(CachedMethod.java:262)
at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createCachedMethodSite(PojoMetaMethodSite.java:162)
at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createPojoMetaMethodSite(PojoMetaMethodSite.java:151)
at groovy.lang.MetaClassImpl.createPojoCallSite(MetaClassImpl.java:3393)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.createPojoSite(CallSiteArray.java:132)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.createCallSite(CallSiteArray.java:166)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
at Script3.run(Script3.groovy:20)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)
at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72)
at javax.script.CompiledScript.eval(CompiledScript.java:92)
at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346)
at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195)
at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190)
at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167)
at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95)
at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35)
at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299)
at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209)
at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352)
at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195)
at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140)
at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120)
at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348)
at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.processBatch(PreviewPipelineRunner.java:272)
at com.streamsets.datacollector.runner.StageRuntime$3.run(StageRuntime.java:371)
at java.security.AccessController.doPrivileged(Native Method)
at com.streamsets.datacollector.runner.StageRuntime.processBatch(StageRuntime.java:367)
at com.streamsets.datacollector.runner.StageContext.processBatch(StageContext.java:287)
at com.streamsets.pipeline.stage.devtest.RandomDataGeneratorSource$GeneratorRunnable.run(RandomDataGeneratorSource.java:298)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)