Question

5
Replies
6567
Views
Close popover
Mani Bharghava Reddy (Mani Bharghav)
Evonsys
Senior Consultant-Technology
Evonsys
IN
Mani Bharghav Member since 2017 37 posts
Evonsys
Posted: November 22, 2018
Last activity: February 28, 2019
Closed

Kafka Integration with Pega 8.1

Hi All,

can someone explain the steps to integrate/listen the messages from a KAFKA topic in localhost

the steps which i did

1)Installed Kafka in Personal machine (localhost)

2)Started Zookeper and kafka servers

3)created a topic

4)in pega created kafka configuration instance --did not give SASL authentication

5)created a kafka dataset

6)created a dataflow

7)created a real-time dataflow, after that in dataflow run it says

Stream service is not enabled. Enable it via the

8)on stream services landing page under "STREAM" tab i saw an alert "Not enough nodes to achieve replication factor 2" and also the the node status is "JOINING_FAILED"

9)then i selected the item to see the error message as "ChannelException: Failed to bind to: /127.0.0.1:2181"

10)in Pega log file

2018-11-21 16:15:10,913 [ PegaRULES-Batch-2] [ STANDARD] [ ] [ TestApp123:01.01.01] (ernal.async.BatchRequestorTask) ERROR manibhargav - Batch activity "Embed-Decision-Service-Operation.pzAsyncExecuteOperation" threw:
com.pega.pegarules.pub.PRRuntimeError: PRRuntimeError
at com.pega.pegarules.session.internal.mgmt.base.ThreadRunner.runActivitiesAlt(ThreadRunner.java:713) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.mgmt.base.ThreadRunner.runActivities(ThreadRunner.java:572) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.mgmt.PRThreadImpl.runActivities(PRThreadImpl.java:481) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.agent.QueueProcessor.executeBatchTask(QueueProcessor.java:297) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.agent.QueueProcessor.execute(QueueProcessor.java:362) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.BatchRequestorTask.run(BatchRequestorTask.java:1139) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.performTargetActionWithLock(PRSessionProviderImpl.java:1366) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.doWithRequestorLocked(PRSessionProviderImpl.java:1109) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.doWithRequestorLocked(PRSessionProviderImpl.java:990) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.BatchRequestorTask.run(BatchRequestorTask.java:805) ~[prprivate.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:2181
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ~[netty-3.10.6.Final.jar:?]
at com.pega.charlatan.server.CharlatanNettyServer.start(CharlatanNettyServer.java:112) ~[charlatan-server-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.server.StreamServerService$StreamServiceStartOperation$3.emit(StreamServerService.java:377) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:102) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:91) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.doActualServerStart(StartOperation.java:175) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.access$400(StartOperation.java:22) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation$3.execute(StartOperation.java:156) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.couldAcquireLock(OperationWithLock.java:133) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.performLockOperation(OperationWithLock.java:123) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.access$100(OperationWithLock.java:70) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock.doWithLock(OperationWithLock.java:67) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock.doWithLock(OperationWithLock.java:63) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeWithLockInternal(ServiceHelper.java:212) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeWithLock(ServiceHelper.java:168) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.initializeServerMode(StartOperation.java:153) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.access$200(StartOperation.java:22) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation$1.emit(StartOperation.java:83) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:102) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.toList(DataObservableImpl.java:81) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.RemoteServiceOperation$ExecuteOperationMessage.call(RemoteServiceOperation.java:112) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.RemoteServiceOperation$ExecuteOperationMessage.call(RemoteServiceOperation.java:72) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext$1.run(CallableMessageWithContext.java:33) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext$1.run(CallableMessageWithContext.java:30) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeInPrpcContextInternal(ServiceHelper.java:244) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeInPrpcContext(ServiceHelper.java:103) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext.call(CallableMessageWithContext.java:30) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.AbstractDsmService.handleMessageInternal(AbstractDsmService.java:256) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJobData.handleMessage(ServiceDeploymentImpl.java:579) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJob.execute(ServiceDeploymentImpl.java:545) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJob.execute(ServiceDeploymentImpl.java:542) ~[d-node-8.1.0-192.jar:?]
at com.pega.pegarules.cluster.internal.PRPCTask.call(PRPCTask.java:130) ~[prcluster.jar:?]
at com.pega.pegarules.cluster.internal.PRPCTask.call(PRPCTask.java:53) ~[prcluster.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108) ~[guava-19.0.jar:?]
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41) ~[guava-19.0.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77) ~[guava-19.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_121]
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:44) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:41) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:109) ~[d-node-8.1.0-192.jar:?]
Caused by: java.net.BindException: Address already in use: bind
at sun.nio.ch.Net.bind0(Native Method) ~[?:1.8.0_121]
at sun.nio.ch.Net.bind(Net.java:433) ~[?:1.8.0_121]
at sun.nio.ch.Net.bind(Net.java:425) ~[?:1.8.0_121]
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) ~[?:1.8.0_121]
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) ~[?:1.8.0_121]
at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.6.Final.jar:?]
... 3 more
11)i attached the diagnostic file also to this post
please help if i did any mistakes in configurations both in kafka and pega sides
***Edited by Moderator Marissa to update platform capability tags****
Data Integration Decision Management
Moderation Team has archived post,
Close popover This thread is closed to future replies. Content and links will no longer be updated. If you have the same/similar Question, please write a new Question.