How can i reduce lag in kafka consumer/producer












-1














I am looking for improvement in scala kafka code. For reduce lag, what should i do in consumer & producer.
This is the code I got from someone.
I know this code is not a difficult code. But I have never seen scala code before, and I am just beginning to learn about kafka. So I have a hard time finding the problem.



import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.util.Try

class KafkaMessenger(val servers: String, val sender: String) {
val props = new Properties()
props.put("bootstrap.servers", servers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")

val producer = new KafkaProducer[String, String](props)

def send(topic: String, message: Any): Try[Unit] = Try {
producer.send(new ProducerRecord(topic, message.toString))
}

def close(): Unit = producer.close()
}

object KafkaMessenger {
def apply(host: String, topic: String, sender: String, message: String): Unit = {
val messenger = new KafkaMessenger(host, sender)
messenger.send(topic, message)
messenger.close()
}
}


and this is consumer code.



import java.util.Properties
import java.util.concurrent.Executors

import com.satreci.g2gs.common.impl.utils.KafkaMessageTypes._
import kafka.admin.AdminUtils
import kafka.consumer._
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import org.slf4j.LoggerFactory

import scala.language.postfixOps

class KafkaListener(val zookeeper: String,
val groupId: String,
val topic: String,
val handleMessage: ByteArrayMessage => Unit,
val workJson: String = ""
) extends AutoCloseable {
private lazy val logger = LoggerFactory.getLogger(this.getClass)
val config: ConsumerConfig = createConsumerConfig(zookeeper, groupId)
val consumer: ConsumerConnector = Consumer.create(config)
val sessionTimeoutMs: Int = 10 * 1000
val connectionTimeoutMs: Int = 8 * 1000
val zkClient: ZkClient = ZkUtils.createZkClient(zookeeper, sessionTimeoutMs, connectionTimeoutMs)
val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false)

def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("auto.offset.reset", "smallest")
props.put("zookeeper.session.timeout.ms", "5000")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
props.put("partition.assignment.strategy", "roundrobin")
new ConsumerConfig(props)
}

def run(threadCount: Int = 1): Unit = {
val streams = consumer.createMessageStreamsByFilter(Whitelist(topic), threadCount)

if (!AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.createTopic(zkUtils, topic, 1, 1)
}

val executor = Executors.newFixedThreadPool(threadCount)
for (stream <- streams) {
executor.submit(new MessageConsumer(stream))
}
logger.debug(s"KafkaListener start with ${threadCount}thread (topic=$topic)")
}

override def close(): Unit = {
consumer.shutdown()
logger.debug(s"$topic Listener close")
}

class MessageConsumer(val stream: MessageStream) extends Runnable {
override def run(): Unit = {
val it = stream.iterator()
while (it.hasNext()) {
val message = it.next().message()
if (workJson == "") {
handleMessage(message)
}
else {
val strMessage = new String(message)
val newMessage = s"$strMessage/#/$workJson"
val outMessage = newMessage.toCharArray.map(c => c.toByte)
handleMessage(outMessage)
}
}
}
}
}


Specifically, I want to modify the structure that creates KafkaProduce objects whenever I send a message. There seems to be many other improvements to reduce lag.










share|improve this question





























    -1














    I am looking for improvement in scala kafka code. For reduce lag, what should i do in consumer & producer.
    This is the code I got from someone.
    I know this code is not a difficult code. But I have never seen scala code before, and I am just beginning to learn about kafka. So I have a hard time finding the problem.



    import java.util.Properties
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

    import scala.util.Try

    class KafkaMessenger(val servers: String, val sender: String) {
    val props = new Properties()
    props.put("bootstrap.servers", servers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("producer.type", "async")

    val producer = new KafkaProducer[String, String](props)

    def send(topic: String, message: Any): Try[Unit] = Try {
    producer.send(new ProducerRecord(topic, message.toString))
    }

    def close(): Unit = producer.close()
    }

    object KafkaMessenger {
    def apply(host: String, topic: String, sender: String, message: String): Unit = {
    val messenger = new KafkaMessenger(host, sender)
    messenger.send(topic, message)
    messenger.close()
    }
    }


    and this is consumer code.



    import java.util.Properties
    import java.util.concurrent.Executors

    import com.satreci.g2gs.common.impl.utils.KafkaMessageTypes._
    import kafka.admin.AdminUtils
    import kafka.consumer._
    import kafka.utils.ZkUtils
    import org.I0Itec.zkclient.{ZkClient, ZkConnection}
    import org.slf4j.LoggerFactory

    import scala.language.postfixOps

    class KafkaListener(val zookeeper: String,
    val groupId: String,
    val topic: String,
    val handleMessage: ByteArrayMessage => Unit,
    val workJson: String = ""
    ) extends AutoCloseable {
    private lazy val logger = LoggerFactory.getLogger(this.getClass)
    val config: ConsumerConfig = createConsumerConfig(zookeeper, groupId)
    val consumer: ConsumerConnector = Consumer.create(config)
    val sessionTimeoutMs: Int = 10 * 1000
    val connectionTimeoutMs: Int = 8 * 1000
    val zkClient: ZkClient = ZkUtils.createZkClient(zookeeper, sessionTimeoutMs, connectionTimeoutMs)
    val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false)

    def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper)
    props.put("group.id", groupId)
    props.put("auto.offset.reset", "smallest")
    props.put("zookeeper.session.timeout.ms", "5000")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("auto.commit.interval.ms", "1000")
    props.put("partition.assignment.strategy", "roundrobin")
    new ConsumerConfig(props)
    }

    def run(threadCount: Int = 1): Unit = {
    val streams = consumer.createMessageStreamsByFilter(Whitelist(topic), threadCount)

    if (!AdminUtils.topicExists(zkUtils, topic)) {
    AdminUtils.createTopic(zkUtils, topic, 1, 1)
    }

    val executor = Executors.newFixedThreadPool(threadCount)
    for (stream <- streams) {
    executor.submit(new MessageConsumer(stream))
    }
    logger.debug(s"KafkaListener start with ${threadCount}thread (topic=$topic)")
    }

    override def close(): Unit = {
    consumer.shutdown()
    logger.debug(s"$topic Listener close")
    }

    class MessageConsumer(val stream: MessageStream) extends Runnable {
    override def run(): Unit = {
    val it = stream.iterator()
    while (it.hasNext()) {
    val message = it.next().message()
    if (workJson == "") {
    handleMessage(message)
    }
    else {
    val strMessage = new String(message)
    val newMessage = s"$strMessage/#/$workJson"
    val outMessage = newMessage.toCharArray.map(c => c.toByte)
    handleMessage(outMessage)
    }
    }
    }
    }
    }


    Specifically, I want to modify the structure that creates KafkaProduce objects whenever I send a message. There seems to be many other improvements to reduce lag.










    share|improve this question



























      -1












      -1








      -1







      I am looking for improvement in scala kafka code. For reduce lag, what should i do in consumer & producer.
      This is the code I got from someone.
      I know this code is not a difficult code. But I have never seen scala code before, and I am just beginning to learn about kafka. So I have a hard time finding the problem.



      import java.util.Properties
      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

      import scala.util.Try

      class KafkaMessenger(val servers: String, val sender: String) {
      val props = new Properties()
      props.put("bootstrap.servers", servers)
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("producer.type", "async")

      val producer = new KafkaProducer[String, String](props)

      def send(topic: String, message: Any): Try[Unit] = Try {
      producer.send(new ProducerRecord(topic, message.toString))
      }

      def close(): Unit = producer.close()
      }

      object KafkaMessenger {
      def apply(host: String, topic: String, sender: String, message: String): Unit = {
      val messenger = new KafkaMessenger(host, sender)
      messenger.send(topic, message)
      messenger.close()
      }
      }


      and this is consumer code.



      import java.util.Properties
      import java.util.concurrent.Executors

      import com.satreci.g2gs.common.impl.utils.KafkaMessageTypes._
      import kafka.admin.AdminUtils
      import kafka.consumer._
      import kafka.utils.ZkUtils
      import org.I0Itec.zkclient.{ZkClient, ZkConnection}
      import org.slf4j.LoggerFactory

      import scala.language.postfixOps

      class KafkaListener(val zookeeper: String,
      val groupId: String,
      val topic: String,
      val handleMessage: ByteArrayMessage => Unit,
      val workJson: String = ""
      ) extends AutoCloseable {
      private lazy val logger = LoggerFactory.getLogger(this.getClass)
      val config: ConsumerConfig = createConsumerConfig(zookeeper, groupId)
      val consumer: ConsumerConnector = Consumer.create(config)
      val sessionTimeoutMs: Int = 10 * 1000
      val connectionTimeoutMs: Int = 8 * 1000
      val zkClient: ZkClient = ZkUtils.createZkClient(zookeeper, sessionTimeoutMs, connectionTimeoutMs)
      val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false)

      def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
      val props = new Properties()
      props.put("zookeeper.connect", zookeeper)
      props.put("group.id", groupId)
      props.put("auto.offset.reset", "smallest")
      props.put("zookeeper.session.timeout.ms", "5000")
      props.put("zookeeper.sync.time.ms", "200")
      props.put("auto.commit.interval.ms", "1000")
      props.put("partition.assignment.strategy", "roundrobin")
      new ConsumerConfig(props)
      }

      def run(threadCount: Int = 1): Unit = {
      val streams = consumer.createMessageStreamsByFilter(Whitelist(topic), threadCount)

      if (!AdminUtils.topicExists(zkUtils, topic)) {
      AdminUtils.createTopic(zkUtils, topic, 1, 1)
      }

      val executor = Executors.newFixedThreadPool(threadCount)
      for (stream <- streams) {
      executor.submit(new MessageConsumer(stream))
      }
      logger.debug(s"KafkaListener start with ${threadCount}thread (topic=$topic)")
      }

      override def close(): Unit = {
      consumer.shutdown()
      logger.debug(s"$topic Listener close")
      }

      class MessageConsumer(val stream: MessageStream) extends Runnable {
      override def run(): Unit = {
      val it = stream.iterator()
      while (it.hasNext()) {
      val message = it.next().message()
      if (workJson == "") {
      handleMessage(message)
      }
      else {
      val strMessage = new String(message)
      val newMessage = s"$strMessage/#/$workJson"
      val outMessage = newMessage.toCharArray.map(c => c.toByte)
      handleMessage(outMessage)
      }
      }
      }
      }
      }


      Specifically, I want to modify the structure that creates KafkaProduce objects whenever I send a message. There seems to be many other improvements to reduce lag.










      share|improve this question















      I am looking for improvement in scala kafka code. For reduce lag, what should i do in consumer & producer.
      This is the code I got from someone.
      I know this code is not a difficult code. But I have never seen scala code before, and I am just beginning to learn about kafka. So I have a hard time finding the problem.



      import java.util.Properties
      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

      import scala.util.Try

      class KafkaMessenger(val servers: String, val sender: String) {
      val props = new Properties()
      props.put("bootstrap.servers", servers)
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("producer.type", "async")

      val producer = new KafkaProducer[String, String](props)

      def send(topic: String, message: Any): Try[Unit] = Try {
      producer.send(new ProducerRecord(topic, message.toString))
      }

      def close(): Unit = producer.close()
      }

      object KafkaMessenger {
      def apply(host: String, topic: String, sender: String, message: String): Unit = {
      val messenger = new KafkaMessenger(host, sender)
      messenger.send(topic, message)
      messenger.close()
      }
      }


      and this is consumer code.



      import java.util.Properties
      import java.util.concurrent.Executors

      import com.satreci.g2gs.common.impl.utils.KafkaMessageTypes._
      import kafka.admin.AdminUtils
      import kafka.consumer._
      import kafka.utils.ZkUtils
      import org.I0Itec.zkclient.{ZkClient, ZkConnection}
      import org.slf4j.LoggerFactory

      import scala.language.postfixOps

      class KafkaListener(val zookeeper: String,
      val groupId: String,
      val topic: String,
      val handleMessage: ByteArrayMessage => Unit,
      val workJson: String = ""
      ) extends AutoCloseable {
      private lazy val logger = LoggerFactory.getLogger(this.getClass)
      val config: ConsumerConfig = createConsumerConfig(zookeeper, groupId)
      val consumer: ConsumerConnector = Consumer.create(config)
      val sessionTimeoutMs: Int = 10 * 1000
      val connectionTimeoutMs: Int = 8 * 1000
      val zkClient: ZkClient = ZkUtils.createZkClient(zookeeper, sessionTimeoutMs, connectionTimeoutMs)
      val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false)

      def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
      val props = new Properties()
      props.put("zookeeper.connect", zookeeper)
      props.put("group.id", groupId)
      props.put("auto.offset.reset", "smallest")
      props.put("zookeeper.session.timeout.ms", "5000")
      props.put("zookeeper.sync.time.ms", "200")
      props.put("auto.commit.interval.ms", "1000")
      props.put("partition.assignment.strategy", "roundrobin")
      new ConsumerConfig(props)
      }

      def run(threadCount: Int = 1): Unit = {
      val streams = consumer.createMessageStreamsByFilter(Whitelist(topic), threadCount)

      if (!AdminUtils.topicExists(zkUtils, topic)) {
      AdminUtils.createTopic(zkUtils, topic, 1, 1)
      }

      val executor = Executors.newFixedThreadPool(threadCount)
      for (stream <- streams) {
      executor.submit(new MessageConsumer(stream))
      }
      logger.debug(s"KafkaListener start with ${threadCount}thread (topic=$topic)")
      }

      override def close(): Unit = {
      consumer.shutdown()
      logger.debug(s"$topic Listener close")
      }

      class MessageConsumer(val stream: MessageStream) extends Runnable {
      override def run(): Unit = {
      val it = stream.iterator()
      while (it.hasNext()) {
      val message = it.next().message()
      if (workJson == "") {
      handleMessage(message)
      }
      else {
      val strMessage = new String(message)
      val newMessage = s"$strMessage/#/$workJson"
      val outMessage = newMessage.toCharArray.map(c => c.toByte)
      handleMessage(outMessage)
      }
      }
      }
      }
      }


      Specifically, I want to modify the structure that creates KafkaProduce objects whenever I send a message. There seems to be many other improvements to reduce lag.







      scala apache-kafka kafka-consumer-api kafka-producer-api






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 21 at 14:43









      cricket_007

      79k1142109




      79k1142109










      asked Nov 21 at 4:57









      조현욱

      51




      51
























          1 Answer
          1






          active

          oldest

          votes


















          0














          Increase the number of consumer(KafkaListener) instances with same group id.
          It will increase the consumption rate. Eventually your lag between producer write & consumer will get minimized.






          share|improve this answer





















          • This assumes there's more than one partition to consume from
            – cricket_007
            Nov 21 at 14:41










          • Thanks to your answer. I'll consider it.
            – 조현욱
            Nov 22 at 1:14











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405501%2fhow-can-i-reduce-lag-in-kafka-consumer-producer%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          Increase the number of consumer(KafkaListener) instances with same group id.
          It will increase the consumption rate. Eventually your lag between producer write & consumer will get minimized.






          share|improve this answer





















          • This assumes there's more than one partition to consume from
            – cricket_007
            Nov 21 at 14:41










          • Thanks to your answer. I'll consider it.
            – 조현욱
            Nov 22 at 1:14
















          0














          Increase the number of consumer(KafkaListener) instances with same group id.
          It will increase the consumption rate. Eventually your lag between producer write & consumer will get minimized.






          share|improve this answer





















          • This assumes there's more than one partition to consume from
            – cricket_007
            Nov 21 at 14:41










          • Thanks to your answer. I'll consider it.
            – 조현욱
            Nov 22 at 1:14














          0












          0








          0






          Increase the number of consumer(KafkaListener) instances with same group id.
          It will increase the consumption rate. Eventually your lag between producer write & consumer will get minimized.






          share|improve this answer












          Increase the number of consumer(KafkaListener) instances with same group id.
          It will increase the consumption rate. Eventually your lag between producer write & consumer will get minimized.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 21 at 6:15









          Swapnil Chougule

          507




          507












          • This assumes there's more than one partition to consume from
            – cricket_007
            Nov 21 at 14:41










          • Thanks to your answer. I'll consider it.
            – 조현욱
            Nov 22 at 1:14


















          • This assumes there's more than one partition to consume from
            – cricket_007
            Nov 21 at 14:41










          • Thanks to your answer. I'll consider it.
            – 조현욱
            Nov 22 at 1:14
















          This assumes there's more than one partition to consume from
          – cricket_007
          Nov 21 at 14:41




          This assumes there's more than one partition to consume from
          – cricket_007
          Nov 21 at 14:41












          Thanks to your answer. I'll consider it.
          – 조현욱
          Nov 22 at 1:14




          Thanks to your answer. I'll consider it.
          – 조현욱
          Nov 22 at 1:14


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405501%2fhow-can-i-reduce-lag-in-kafka-consumer-producer%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          404 Error Contact Form 7 ajax form submitting

          How to know if a Active Directory user can login interactively

          TypeError: fit_transform() missing 1 required positional argument: 'X'