Remove items in "Ready to process" from Queue Processor
We have a queue processor 'pyProcessNotification' not running and about 100k items are in the "Ready to Process" state. How can we delete/clear the queue since we don't want to trigger notifications for old items.
Pega version is 8.3.1
Thank you
-
Like (0)
-
Accepted Solution

You can create an activity with one Java step using the attached code.
Ensure to update the TopicName field in the attached code with your QP name in Capital letters for which you want to truncate the ready to process queue.
Let me know if that helps!!

@Harish_Gunneri : Thanks Harish. This worked for us.

Hi Harish,
I am also facing the issue. I have tried with the code which you shared but i am getting issue with the withTopicName method. Let me know is there any other way to clear the Ready to process queues from Queue Processor.
Error Details are below:
The method withTopicName(String) is undefined for the type StreamDataSetBuilder.
Thanks in Advance.
Regards,
Raghu Kumar K

@Harish_Gunneri This is really helpful and it helped us in clearing all the records

Could you please provide that Java code. i could not find the code in this thread.

Hello @Harish_Gunneri We are on version, 8.6 and when trying to save the activity with the java code you provided, resulted in the following compilation error
The method withTopicName(String) is undefined for the type StreamDataSetBuilder
Compile failed.
@VVODNALAItems queued to Queue Processors are pushed to Kafka, not database table.
In order to clear the queues, you can try removing the partition directories for your Queue Processor, which are created on the app filesystem. These partitions can be located in the 'kafka-data' directory on each app-node that supports 'Stream' service. For example, kafka-data can be located inside the tomcat installation directory for a Pega app running on tomcat.
Prior to removing the affected partitions from each node separately, ensure the app-servers are stopped to prevent replication between the Stream nodes. Otherwise the Stream service will repopulate the nodes with partition data soon as you remove it from any node within the cluster.
Let me know how this goes.
Pratik Agarwal
What are the steps you performed let me know. Or else did i miss anything to clear the Ready to Process items in Queue Processor.
Thanks in Advance.
Raghu K

@Harish_Gunneri , Hi Harish, We are getting com.pega.fnx.stream.spi.StreamServiceException: Stream SPI error in pega when running this code in our environment

Hi For V8.2.8, here is the code
java.util.ArrayList<String> partitionKeys = new java.util.ArrayList<>(); partitionKeys.add(".pzMessageContent.pzInsKey"); com.pega.dsm.dnode.api.dataset.DataSet dataset = com.pega.dsm.dnode.api.dataset.stream.StreamDataSet.builder() .withClassName("System-Message-QueueProcessor") .withTopicName("<QP_NAME IN CAPITAL LETTER>") .withPartitionKeys(partitionKeys) .withIdempotentSave(true) .withOperation(com.pega.dsm.dnode.api.dataset.operation.SaveOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaGetTopicsOperation.NAME) .withOperation(com.pega.dsm.dnode.api.dataset.operation.BrowseOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaDropOperation.NAME) .withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME) .withSerde(com.pega.dsm.kafka.api.serde.ClipboardPageStreamSerde.create()) .build(tools); com.pega.dsm.dnode.api.dataset.operation.Operation operation = dataset.getOperationByName(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME); ((com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation)operation).truncate().await(tools); |

@Harish_Gunneriwhile runnig the activity with the above java code getting this error.
Java Exception: java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.pega.fnx.stream.spi.StreamServiceException: Unknown stream ProcessFailedEvents
Can you please let me know solution for this.

Please go through the below article, hope it helps.