Question

5
Replies
6384
Views
Mani Bharghav Member since 2017 36 posts
Evonsys
Posted: November 22, 2018
Last activity: February 28, 2019
Closed

Kafka Integration with Pega 8.1

Hi All,

can someone explain the steps to integrate/listen the messages from a KAFKA topic in localhost

the steps which i did

1)Installed Kafka in Personal machine (localhost)

2)Started Zookeper and kafka servers

3)created a topic

4)in pega created kafka configuration instance --did not give SASL authentication

5)created a kafka dataset

6)created a dataflow

7)created a real-time dataflow, after that in dataflow run it says

Stream service is not enabled. Enable it via the

8)on stream services landing page under "STREAM" tab i saw an alert "Not enough nodes to achieve replication factor 2" and also the the node status is "JOINING_FAILED"

9)then i selected the item to see the error message as "ChannelException: Failed to bind to: /127.0.0.1:2181"

10)in Pega log file

2018-11-21 16:15:10,913 [ PegaRULES-Batch-2] [ STANDARD] [ ] [ TestApp123:01.01.01] (ernal.async.BatchRequestorTask) ERROR manibhargav - Batch activity "Embed-Decision-Service-Operation.pzAsyncExecuteOperation" threw:
com.pega.pegarules.pub.PRRuntimeError: PRRuntimeError
at com.pega.pegarules.session.internal.mgmt.base.ThreadRunner.runActivitiesAlt(ThreadRunner.java:713) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.mgmt.base.ThreadRunner.runActivities(ThreadRunner.java:572) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.mgmt.PRThreadImpl.runActivities(PRThreadImpl.java:481) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.agent.QueueProcessor.executeBatchTask(QueueProcessor.java:297) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.agent.QueueProcessor.execute(QueueProcessor.java:362) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.BatchRequestorTask.run(BatchRequestorTask.java:1139) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.performTargetActionWithLock(PRSessionProviderImpl.java:1366) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.doWithRequestorLocked(PRSessionProviderImpl.java:1109) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.PRSessionProviderImpl.doWithRequestorLocked(PRSessionProviderImpl.java:990) ~[prprivate.jar:?]
at com.pega.pegarules.session.internal.async.BatchRequestorTask.run(BatchRequestorTask.java:805) ~[prprivate.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:2181
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ~[netty-3.10.6.Final.jar:?]
at com.pega.charlatan.server.CharlatanNettyServer.start(CharlatanNettyServer.java:112) ~[charlatan-server-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.server.StreamServerService$StreamServiceStartOperation$3.emit(StreamServerService.java:377) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:102) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:91) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.doActualServerStart(StartOperation.java:175) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.access$400(StartOperation.java:22) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation$3.execute(StartOperation.java:156) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.couldAcquireLock(OperationWithLock.java:133) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.performLockOperation(OperationWithLock.java:123) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock$LockingOperation.access$100(OperationWithLock.java:70) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock.doWithLock(OperationWithLock.java:67) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.OperationWithLock.doWithLock(OperationWithLock.java:63) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeWithLockInternal(ServiceHelper.java:212) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeWithLock(ServiceHelper.java:168) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.initializeServerMode(StartOperation.java:153) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation.access$200(StartOperation.java:22) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.operation.StartOperation$1.emit(StartOperation.java:83) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:338) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:40) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.await(DataObservableImpl.java:102) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.toList(DataObservableImpl.java:81) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.RemoteServiceOperation$ExecuteOperationMessage.call(RemoteServiceOperation.java:112) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.RemoteServiceOperation$ExecuteOperationMessage.call(RemoteServiceOperation.java:72) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext$1.run(CallableMessageWithContext.java:33) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext$1.run(CallableMessageWithContext.java:30) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeInPrpcContextInternal(ServiceHelper.java:244) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceHelper.executeInPrpcContext(ServiceHelper.java:103) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.CallableMessageWithContext.call(CallableMessageWithContext.java:30) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.api.prpc.service.AbstractDsmService.handleMessageInternal(AbstractDsmService.java:256) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJobData.handleMessage(ServiceDeploymentImpl.java:579) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJob.execute(ServiceDeploymentImpl.java:545) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.service.ServiceDeploymentImpl$PostMessageJob.execute(ServiceDeploymentImpl.java:542) ~[d-node-8.1.0-192.jar:?]
at com.pega.pegarules.cluster.internal.PRPCTask.call(PRPCTask.java:130) ~[prcluster.jar:?]
at com.pega.pegarules.cluster.internal.PRPCTask.call(PRPCTask.java:53) ~[prcluster.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108) ~[guava-19.0.jar:?]
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41) ~[guava-19.0.jar:?]
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77) ~[guava-19.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_121]
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:44) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:41) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:52) ~[d-node-8.1.0-192.jar:?]
at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:109) ~[d-node-8.1.0-192.jar:?]
Caused by: java.net.BindException: Address already in use: bind
at sun.nio.ch.Net.bind0(Native Method) ~[?:1.8.0_121]
at sun.nio.ch.Net.bind(Net.java:433) ~[?:1.8.0_121]
at sun.nio.ch.Net.bind(Net.java:425) ~[?:1.8.0_121]
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) ~[?:1.8.0_121]
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) ~[?:1.8.0_121]
at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.6.Final.jar:?]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.6.Final.jar:?]
... 3 more
11)i attached the diagnostic file also to this post
please help if i did any mistakes in configurations both in kafka and pega sides
***Edited by Moderator Marissa to update platform capability tags****
Data Integration Decision Management
Moderation Team has archived post
Share this page LinkedIn