Spark - combine filter results from all executors












4















I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question

























  • use accumulators...pls share ur code..

    – Taha Naqvi
    Nov 23 '18 at 10:23













  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

    – masay
    Nov 23 '18 at 11:22
















4















I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question

























  • use accumulators...pls share ur code..

    – Taha Naqvi
    Nov 23 '18 at 10:23













  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

    – masay
    Nov 23 '18 at 11:22














4












4








4








I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?










share|improve this question
















I have 3 executors in my spark streaming job which consumes from Kafka. Executor count depends on partition count in topic. When a message consumed from this topic, I am starting query on Hazelcast. Every executor finds results from some filtering operation on hazelcast and returns duplicated results. Because data statuses are not updated when executor returns the data and other executor finds the same data.



My question is, is there a way to combine all results in only one list which are found by executors during streaming?







java apache-spark hazelcast






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 23 '18 at 11:21







masay

















asked Nov 23 '18 at 9:00









masaymasay

55621131




55621131













  • use accumulators...pls share ur code..

    – Taha Naqvi
    Nov 23 '18 at 10:23













  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

    – masay
    Nov 23 '18 at 11:22



















  • use accumulators...pls share ur code..

    – Taha Naqvi
    Nov 23 '18 at 10:23













  • thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

    – masay
    Nov 23 '18 at 11:22

















use accumulators...pls share ur code..

– Taha Naqvi
Nov 23 '18 at 10:23







use accumulators...pls share ur code..

– Taha Naqvi
Nov 23 '18 at 10:23















thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

– masay
Nov 23 '18 at 11:22





thx for your comment. I detailed my question. Accumulator is still on the table and I am reading about it.

– masay
Nov 23 '18 at 11:22












2 Answers
2






active

oldest

votes


















0














Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




  1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

  2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


I hope this helps






share|improve this answer































    0














    To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



    Do you think that way it solves the issue?






    share|improve this answer























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53443456%2fspark-combine-filter-results-from-all-executors%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      0














      Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




      1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

      2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


      I hope this helps






      share|improve this answer




























        0














        Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




        1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

        2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


        I hope this helps






        share|improve this answer


























          0












          0








          0







          Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




          1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

          2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


          I hope this helps






          share|improve this answer













          Spark Executors are distributed across Cluster, so if you are trying to deduplicate data across cluster. So deduplicating is difficult. you have following options




          1. Use accumulators.- problem here is that accumulators are not consistent when job is running and you may end up reading stale data

          2. Other option is Offload this work to external system. - store your output in some external storage which can deduplicate it. (Probably HBase). efficiency of this storage system becomes key here.


          I hope this helps







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 28 '18 at 6:47









          Harjeet KumarHarjeet Kumar

          3115




          3115

























              0














              To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



              Do you think that way it solves the issue?






              share|improve this answer




























                0














                To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                Do you think that way it solves the issue?






                share|improve this answer


























                  0












                  0








                  0







                  To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                  Do you think that way it solves the issue?






                  share|improve this answer













                  To avoid duplicate data read, you need to maintain the offset somewhere, preferred in HBase and everytime you consume the data from Kafka, you read it from HBase and then check the offset for each topic which is already consumed and then start reading and writing it. After each successful write, you must update the offset count.



                  Do you think that way it solves the issue?







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 29 '18 at 14:24









                  H RoyH Roy

                  12926




                  12926






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53443456%2fspark-combine-filter-results-from-all-executors%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      404 Error Contact Form 7 ajax form submitting

                      How to know if a Active Directory user can login interactively

                      TypeError: fit_transform() missing 1 required positional argument: 'X'