Environment:
- Streamsets Data Collector v4.2.1.
- Kafka Multi-topic consumer origin.
Issue:
It has been observed that Kafka origin is failing with the following error message when “produce single records” configuration is on + data is in JSON format.
2021-01-10 15:31:12,193 2user:test] tpipeline:test_kafka_working] krunner:] nthread:Kafka pipeline-thread1] estage:KafkaMultitopicConsumer_01] INFO MultiKafkaSource - Multi kafka thread halted unexpectedly: KAFKA_29 - Error fetching data from Kafka: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: KAFKA_29 - Error fetching data from Kafka: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.streamsets.pipeline.stage.origin.multikafka.MultiKafkaSource.produce(MultiKafkaSource.java:163)
at com.streamsets.pipeline.api.base.configurablestage.DPushSource.produce(DPushSource.java:44)
at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$1(StageRuntime.java:258)
at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:232)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:267)
at com.streamsets.datacollector.runner.SourcePipe.process(SourcePipe.java:67)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPushSource(ProductionPipelineRunner.java:420)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:386)
at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:523)
at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:100)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:63)
at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.startInternal(StandaloneRunner.java:750)
at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:743)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:150)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: KAFKA_29 - Error fetching data from Kafka: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at com.streamsets.pipeline.stage.origin.multikafka.KafkaMultitopicRunnable.call(KafkaMultitopicRunnable.java:102)
at com.streamsets.pipeline.stage.origin.multikafka.KafkaMultitopicRunnable.call(KafkaMultitopicRunnable.java:38)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at com.streamsets.pipeline.stage.origin.multikafka.KafkaMultitopicRunnable.createRecord(KafkaMultitopicRunnable.java:256)
at com.streamsets.pipeline.stage.origin.multikafka.KafkaMultitopicRunnable.produceRecords(KafkaMultitopicRunnable.java:146)
at com.streamsets.pipeline.stage.origin.multikafka.KafkaMultitopicRunnable.call(KafkaMultitopicRunnable.java:97)
... 5 more
Resolution:
We found an issue with the data collector version 4.2.1 and logged the internal Jira COLLECTOR-2081 that is going to be fixed in the latest version of data collector v5.4.0