Skip to main content

Dear StreamSets

We have an requirement to transform Complex XML data into JSON using XSLT. This needs to be done in DataCollector. The incoming file will contain millions of records and for each record, we need to apply XSLT and write the output to S3 location.

I could not find resource on support for XSLT in DataCollector documentation. Could you please help me with this query? 

Note 1: We also have similar use case to transform JSON data to XML. Does StreamSets support usage of FreeMarker in DataCollector pipeline.

Note 2: For both XSLT and freemarker, both uses external java functions to support transformation

Note 3: For both XSLT and freemarker, they are compiled once for the run for better performance. 

Regards

Varadha

Hi Varadha,

You can use one of the scripting processors to import libraries (installed on where SDC is running) that will enable you to transform XML. For example, in Groovy processor you can use code similar to the following.

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;

import org.w3c.dom.Document;

// Loop through the records in the batch
records = sdc.records
for (record in records) {
try {
// Create a parser for the XML text
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();

// Parse the string in input field "/input/xml"
Document doc = db.parse(new ByteArrayInputStream(record.value["input"]["xml"].getBytes()));
doc.normalize();

// Use XSLT to transform it (xlst is in /input/xslt)
StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.value["input"]["xslt"].getBytes()));
TransformerFactory tf = TransformerFactory.newInstance();

// Store the transformed XML document in a field
ByteArrayOutputStream stream = new ByteArrayOutputStream();
tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream));
record.value["xml"] = stream;

// Write a record to the processor output
sdc.output.write(record)
} catch (e) {
// Write a record to the error pipeline
sdc.log.error(e.toString(), e)
sdc.error.write(record, e.toString())
}
}

Cheers,

Dash

 


 Dear @Dash 

Thanks for your quick response.  I am getting below given error. Could you please help?

com.streamsets.pipeline.api.base.OnRecordErrorException: SCRIPTING_04 - Script sent record to error: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp")	at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err.write(ScriptingProcessorInitDestroyBindings.java:48)	at com.streamsets.pipeline.stage.processor.scripting.ScriptingProcessorInitDestroyBindings$Err$write.call(Unknown Source)	at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)	at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)	at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:133)	at Script22.run(Script22.groovy:40)	at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)	at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72)	at javax.script.CompiledScript.eval(CompiledScript.java:92)	at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346)	at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195)	at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190)	at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167)	at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95)	at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35)	at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287)	at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)	at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299)	at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209)	at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352)	at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195)	at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140)	at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120)	at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348)	at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runPollSource(PreviewPipelineRunner.java:330)	at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.run(PreviewPipelineRunner.java:220)	at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:533)	at com.streamsets.datacollector.runner.preview.PreviewPipeline.run(PreviewPipeline.java:39)	at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:226)	at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$1(AsyncPreviewer.java:93)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:22)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)

Pipeline:

Input data:

>
  {
  "xml": "<catalog>  <cd>    <title>Empire Burlesque</title>    <artist>Bob Dylan</artist>    <country>USA</country>    <company>Columbia</company>    <price>10.90</price>    <year>1985</year>  </cd>  <cd>    <title>Hide your heart</title>    <artist>Bonnie Tyler</artist>    <country>UK</country>    <company>CBS Records</company>    <price>9.90</price>    <year>1988</year>  </cd></catalog>",
  "xslt": "<xsl:stylesheet version=\"1.0\" xmlns:xsl=\"http://www.w3.org/1999/XSL/Transform\"><xsl:template match=\"/\"><html> <body>  <h2>My CD Collection</h2>  <table border=\"1\">    <tr bgcolor=\"#9acd32\">      <th style=\"text-align:left\">Title</th>      <th style=\"text-align:left\">Artist</th>    </tr>    <xsl:for-each select=\"catalog/cd\">    <tr>      <td><xsl:value-of select=\"title\"/></td>      <td><xsl:value-of select=\"artist\"/></td>    </tr>    </xsl:for-each>  </table></body></html>

XSL Transformation:

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;

import org.w3c.dom.Document;

// Loop through the records in the batch
records = sdc.records
for (record in records) {
    try {
          // Create a parser for the XML text
          DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
        DocumentBuilder db = dbf.newDocumentBuilder();

          // Parse the string in input field "/input/xml"
        Document doc = db.parse(new ByteArrayInputStream(record.value "xml"].getBytes()));
        doc.normalize();

          // Use XSLT to transform it  (xlst is in /input/xslt)
        StreamSource xsl = new StreamSource(new ByteArrayInputStream(record.valuet"xslt"].getBytes()));
        TransformerFactory tf = TransformerFactory.newInstance();

          // Store the transformed XML document in a field
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        tf.newTransformer(xsl).transform(new DOMSource(doc), new StreamResult(stream));
          record.valueS"xml"] = stream;

        // Write a record to the processor output
        sdc.output.write(record)
    } catch (e) {
        // Write a record to the error pipeline
        sdc.log.error(e.toString(), e)
        sdc.error.write(record, e.toString())
    }
}

 


Hi Varadha,

 

Following instructions in our docs to updated $SDC_CONF/sdc-security.policy to allow access --   https://docs.streamsets.com/portal/#datacollector/latest/help/datacollector/UserGuide/Configuration/ExternalLibs.html#concept_hhl_nxz_bz 

 

Cheers,

Dash


Dear @Dash

As given in the link, I performed the below given steps

1. Created directory 

mkdir /opt/sdc-extras


2. Ran command 

chown -R sdc:sdc /opt/sdc-extras


3. Added the entry to the /opt/streamsets-datacollector-4.1.0/libexec/sdc-env.sh

export STREAMSETS_LIBRARIES_EXTRA_DIR="/opt/sdc-extras/"

4. Added below entry to $SDC_CONF/sdc-security.policy

// user-defined external directory
grant codebase "file:///opt/sdc-extras/-" {
  permission java.security.AllPermission;
};

// Groovy will parse files in a different context, so we need to grant it additional privileges
grant codebase "file:///groovy/script" {
  permission java.lang.RuntimePermission "getClassLoader";
  permission java.util.PropertyPermission "*", "read";
};

6. File "xercesImpl-2.11.0.jar"  is available in /opt/sdc-extras folder

bash-5.0$ pwd
/opt/sdc-extras
bash-5.0$ ls -ltr xer*
-rwxrwxrwx    1 sdc      sdc        1367760 Sep  6 07:17 xercesImpl-2.11.0.jar

5. Restarted Data Collector

But still getting error given below. Could you please help?

2021-09-06 08:28:47,852    ERROR    java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp")    GroovyProcessor    *da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d    0    RandomDataGenerator-0::testRun__05d2ea06-993a-42cb-bc09-2cd69c1f250b__149e11c1-f697-11eb-b9dc-fd846d33049d__da115c5f-f696-11eb-b9dc-67019588858b@149e11c1-f697-11eb-b9dc-fd846d33049d
java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "accessClassInPackage.com.sun.org.apache.xerces.internal.jaxp")
    at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472)
    at java.security.AccessController.checkPermission(AccessController.java:886)
    at java.lang.SecurityManager.checkPermission(SecurityManager.java:549)
    at java.lang.SecurityManager.checkPackageAccess(SecurityManager.java:1564)
    at java.lang.Class.checkPackageAccess(Class.java:2372)
    at java.lang.Class.checkMemberAccess(Class.java:2351)
    at java.lang.Class.getConstructor(Class.java:1824)
    at org.codehaus.groovy.reflection.ClassLoaderForClassArtifacts.defineClassAndGetConstructor(ClassLoaderForClassArtifacts.java:83)
    at org.codehaus.groovy.runtime.callsite.CallSiteGenerator.compilePojoMethod(CallSiteGenerator.java:234)
    at org.codehaus.groovy.reflection.CachedMethod.createPojoMetaMethodSite(CachedMethod.java:262)
    at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createCachedMethodSite(PojoMetaMethodSite.java:162)
    at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.createPojoMetaMethodSite(PojoMetaMethodSite.java:151)
    at groovy.lang.MetaClassImpl.createPojoCallSite(MetaClassImpl.java:3393)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.createPojoSite(CallSiteArray.java:132)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.createCallSite(CallSiteArray.java:166)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
    at Script3.run(Script3.groovy:20)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)
    at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72)
    at javax.script.CompiledScript.eval(CompiledScript.java:92)
    at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:346)
    at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runScript(AbstractScriptingProcessor.java:195)
    at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.runBatch(AbstractScriptingProcessor.java:190)
    at com.streamsets.pipeline.stage.processor.scripting.AbstractScriptingProcessor.process(AbstractScriptingProcessor.java:167)
    at com.streamsets.pipeline.api.base.SingleLaneProcessor.process(SingleLaneProcessor.java:95)
    at com.streamsets.pipeline.api.base.configurablestage.DProcessor.process(DProcessor.java:35)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:287)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:299)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:209)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$runSourceLessBatch$0(PreviewPipelineRunner.java:352)
    at com.streamsets.datacollector.runner.PipeRunner.acceptConsumer(PipeRunner.java:195)
    at com.streamsets.datacollector.runner.PipeRunner.forEachInternal(PipeRunner.java:140)
    at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:120)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runSourceLessBatch(PreviewPipelineRunner.java:348)
    at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.processBatch(PreviewPipelineRunner.java:272)
    at com.streamsets.datacollector.runner.StageRuntime$3.run(StageRuntime.java:371)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.streamsets.datacollector.runner.StageRuntime.processBatch(StageRuntime.java:367)
    at com.streamsets.datacollector.runner.StageContext.processBatch(StageContext.java:287)
    at com.streamsets.pipeline.stage.devtest.RandomDataGeneratorSource$GeneratorRunnable.run(RandomDataGeneratorSource.java:298)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 


Hi @srinivasanvar!

Were you able to resolve this issue? 


Reply