Question

Write to sdc in groovy scripting

  • 19 August 2022
  • 5 replies
  • 296 views

I am pretty new to Streamsets and Groovy and I am updating my “Dev Raw Data Source” to “Groovy Scripting” as my input. The issue which I face here is, I have the below code in my “Groovy Scripting” and it throws an error.

 

Groovy Scripting:

import com.streamsets.pipeline.stage.origin.scripting.*

record = sdc.createRecord("bulkUpdateRequestTemplate")
bulkUpdateRequestTemplate = "{ \"payload\": { \"objects\": { \"filter\": \"(equals(type,'configuration/entityTypes/Contract') and equals(attributes.ContractStatus, 'ACTIVE') and lt (attributes.ExpirationDate,'currentTimestampEpoch'))\", \"options\": \"searchByOv,ovOnly\" }, \"actions\": [ { \"operation\": \"UpdateAttribute\", \"operationParameters\": { \"attributeURI\": \"configuration/entityTypes/Contract/attributes/ContractStatus\", \"attributeValue\": \"INACTIVE\" } } ] } }"

record.value = bulkUpdateRequestTemplate
sdc.state['bulkUpdateRequestTemplate'] = record

 

Error:

com.streamsets.pipeline.api.StageException: SCRIPTING_10 - Script error in user script: javax.script.ScriptException: groovy.lang.MissingPropertyException: No such property: state for class: com.streamsets.pipeline.stage.origin.scripting.ScriptingOriginBindings
at com.streamsets.pipeline.stage.origin.scripting.AbstractScriptingSource.produce(AbstractScriptingSource.java:124)
at com.streamsets.pipeline.api.base.configurablestage.DPushSource.produce(DPushSource.java:44)
at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$1(StageRuntime.java:258)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:267)
at com.streamsets.datacollector.runner.SourcePipe.process(SourcePipe.java:67)
at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.runPushSource(PreviewPipelineRunner.java:233)
at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.run(PreviewPipelineRunner.java:218)
at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:535)
at com.streamsets.datacollector.runner.preview.PreviewPipeline.run(PreviewPipeline.java:39)
at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:227)
at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$1(AsyncPreviewer.java:93)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:44)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:25)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:44)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:25)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

 

Am I missing something here? Please help.


5 replies

Userlevel 4
Badge

@nidheesh I was able to use your exact code in Groovy Evaluator and able to see the output as below:

 

I had to add this line of code to write to the output.

 

sdc.output.write(record)

Is this Groovy Origin or processor? Because it is giving me the same error

Userlevel 5
Badge +1

@abbeygad 

Groovy processor.

ok

Can I get help in how to use the Groovy script for s3? See sample code:

 

import java.io.ByteArrayInputStream;
import java.io.File;
import java.util.List;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.util.StringUtils;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;

// First, import the ClientConfiguration and Protocol classes.
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;

// define the client configuration, and add the client configuration as an argument for the S3 client.
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);

ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setProtocol(Protocol.HTTP);

AmazonS3 conn = new AmazonS3Client(credentials, clientConfig);
conn.setEndpoint('endpoint.com');

// CREATING A CONNECTION: Create a connection that can interact with the server.
String accessKey = 'xxxxxx';
String secretKey = 'xxxxxx';
  
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 conn = new AmazonS3Client(credentials);
conn.setEndpoint("objects.dreamhost.com");

// LISTING OWNED BUCKETS: This gets a list of Buckets that you own. This also prints out the bucket name and creation date of each bucket.

List<Bucket> buckets = conn.listBuckets();
for (Bucket bucket : buckets) {
        System.out.println(bucket.getName() + "\t" +
                StringUtils.fromDate(bucket.getCreationDate()) );
}
// single threaded - no entityName because we need only one offset
entityName = ''

// get the previously committed offset or start at 0
if (sdc.lastOffsets.containsKey(entityName)) {
    offset = sdc.lastOffsets.get(entityName) as int
} else {
    offset = 0
}

if (sdc.userParams.containsKey('recordPrefix')) {
    prefix = sdc.userParams.get('recordPrefix')
} else {
    prefix = ''
}

cur_batch = sdc.createBatch()
record = sdc.createRecord('generated data')

hasNext = true
while (hasNext) {
    try {
        offset = offset + 1
        record = sdc.createRecord('generated data')
        value = prefix + entityName + ':' + offset.toString()
        record.value = value
        cur_batch.add(record)

        // if the batch is full, process it and start a new one
        if (cur_batch.size() >= sdc.batchSize) {
            // blocks until all records are written to all destinations
            // (or failure) and updates offset
            // in accordance with delivery guarantee
            cur_batch.process(entityName, offset.toString())
            cur_batch = sdc.createBatch()
            if (sdc.isStopped()) {
                hasNext = false
            }
        }
    } catch (Exception e) {
        cur_batch.addError(record, e.toString())
        cur_batch.process(entityName, offset.toString())
        hasNext = false
    }
}

if (cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0) {
    cur_batch.process(entityName, offset.toString())
}


 

Reply