Question

3
Replies
647
Views
Nikhil_Garge Member since 2015 4 posts
Virtusa Corporation
Posted: 1 year ago
Last activity: 11 months ago

Error Reading JSON message from KAFKA topic

Hello Every one,

I am working on 7.4 and for my Use case

I am streaming the JSON message to Kafka topic which is configured in the external infrastructure (ECP).The hand shake is successful with below steps-

1. I am able to stream messages to Kafka topic where i am storing the JSON message in one string say .pyNote and stream to KAFKA successfully using DataSet-Execute method

I am trying to to retrieve the message and i tried below 2 options -

A. DataSet-Execute method on step page ReadEventsFromKafka of work class with operation as "browse" and store results in "OperationResult" of Code-Pega-List but i am getting error while reading from the topic "com.pega.pegarules.pub.PRRuntimeException: Exception during data set execution"

B. DataFlow-Execute method on step page ReadEventsFromKafka of work class with operation as "Start" with below steps-

--> call data flow ReadKafkaMessages

--> define the dataset name in the first shape and then defined the custom activity(ReadMsg) in the next shape and using the same step page ReadEventsFromKafka but i am not able to read it and on tracer it gives below error.

Note:: SharedKafka2_MatrixONBEvents is also configured as consumer and producer both.

Can you please help here

m.pega.dsm.dnode.api.dataflow.StageException: Exception in stage: SharedKafka2_MatrixONBEvents at com.pega.dsm.dnode.api.dataflow.StageException.create(StageException.java:39) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageOutputSubscriber.onError(DataFlowStage.java:512) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageInputSubscriber.onError(DataFlowStage.java:380) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:305) at com.pega.dsm.dnode.api.stream.DataSubscriber.onError(DataSubscriber.java:60) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:305) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:175) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.access$000(KafkaBrowseOperation.java:50) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation$1.emit(KafkaBrowseOperation.java:100) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$3.emit(DataObservableImpl.java:161) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:417) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:411) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext$1.run(DataFlowThreadContext.java:161) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:44) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:41) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:109) Caused by: com.pega.dsm.dnode.api.ExceptionWithInputRecord: java.lang.IllegalStateException: Cannot parse json ... 22 more Caused by: java.lang.IllegalStateException: Cannot parse json at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:411) at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:246) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:217) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:163) ... 21 more Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'test': was expecting 'null', 'true', 'false' or NaN at [Source: (com.pega.dsm.dnode.util.ClipboardPageJsonConverter$ByteBufferInputStream); line: 1, column: 6] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:673) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3502) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2837) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchTrue(UTF8StreamJsonParser.java:2771) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) at com.pega.dsm.dnode.util.ClipboardPageJsonConverter.adoptJson(ClipboardPageJsonConverter.java:287) ... 24 more

***Edited by Moderator: Pallavi to update platform capability tags***

Data Integration Java and Activities Decision Management
Share this page LinkedIn