IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2












0















I am using IntervalJoin function to join two streams within 10 minutes. As below:



labelStream.intervalJoin(adLogStream)
.between(Time.milliseconds(0), Time.milliseconds(600000))
.process(new processFunction())
.sink(kafkaProducer)


labelStream and adLogStream are proto-buf class that are keyed by Long id.



Our two input-streams are huge. After running about 30minutes, the output to kafka go down slowly, like this:
enter image description here



When data output begins going down, I use jstack and pstack sevaral times to get these:
enter image description hereenter image description here



It seems the program is stucked in rockdb's seek. And I find that some rockdb's srt file are accessed slowly by iteration.
enter image description here



I have tried several ways:



1)Reduce the input amount to half. This works well.
2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
4)Use new versions of rocksdbjni. This still fails.


Can anyone give me some suggestions? Thank you very much.










share|improve this question





























    0















    I am using IntervalJoin function to join two streams within 10 minutes. As below:



    labelStream.intervalJoin(adLogStream)
    .between(Time.milliseconds(0), Time.milliseconds(600000))
    .process(new processFunction())
    .sink(kafkaProducer)


    labelStream and adLogStream are proto-buf class that are keyed by Long id.



    Our two input-streams are huge. After running about 30minutes, the output to kafka go down slowly, like this:
    enter image description here



    When data output begins going down, I use jstack and pstack sevaral times to get these:
    enter image description hereenter image description here



    It seems the program is stucked in rockdb's seek. And I find that some rockdb's srt file are accessed slowly by iteration.
    enter image description here



    I have tried several ways:



    1)Reduce the input amount to half. This works well.
    2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
    3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
    4)Use new versions of rocksdbjni. This still fails.


    Can anyone give me some suggestions? Thank you very much.










    share|improve this question



























      0












      0








      0








      I am using IntervalJoin function to join two streams within 10 minutes. As below:



      labelStream.intervalJoin(adLogStream)
      .between(Time.milliseconds(0), Time.milliseconds(600000))
      .process(new processFunction())
      .sink(kafkaProducer)


      labelStream and adLogStream are proto-buf class that are keyed by Long id.



      Our two input-streams are huge. After running about 30minutes, the output to kafka go down slowly, like this:
      enter image description here



      When data output begins going down, I use jstack and pstack sevaral times to get these:
      enter image description hereenter image description here



      It seems the program is stucked in rockdb's seek. And I find that some rockdb's srt file are accessed slowly by iteration.
      enter image description here



      I have tried several ways:



      1)Reduce the input amount to half. This works well.
      2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
      3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
      4)Use new versions of rocksdbjni. This still fails.


      Can anyone give me some suggestions? Thank you very much.










      share|improve this question
















      I am using IntervalJoin function to join two streams within 10 minutes. As below:



      labelStream.intervalJoin(adLogStream)
      .between(Time.milliseconds(0), Time.milliseconds(600000))
      .process(new processFunction())
      .sink(kafkaProducer)


      labelStream and adLogStream are proto-buf class that are keyed by Long id.



      Our two input-streams are huge. After running about 30minutes, the output to kafka go down slowly, like this:
      enter image description here



      When data output begins going down, I use jstack and pstack sevaral times to get these:
      enter image description hereenter image description here



      It seems the program is stucked in rockdb's seek. And I find that some rockdb's srt file are accessed slowly by iteration.
      enter image description here



      I have tried several ways:



      1)Reduce the input amount to half. This works well.
      2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
      3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
      4)Use new versions of rocksdbjni. This still fails.


      Can anyone give me some suggestions? Thank you very much.







      apache-flink rocksdb






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 22 '18 at 1:46







      user2928444

















      asked Nov 20 '18 at 13:12









      user2928444user2928444

      103




      103
























          1 Answer
          1






          active

          oldest

          votes


















          0














          A few thoughts:




          • You could ask on the flink-user mailing list -- in general, operational questions like this are more likely to elicit knowledgeable responses on the mailing list than on stack overflow.


          • I've heard that if RocksDB is given more off-heap memory to work with, it can help because RocksDB will use it for caching. Sorry, but I don't know how any details of how to go about configuring this.


          • Perhaps increasing the parallelism would help.


          • If it's possible to do so, it might be interesting to try running with the heap-based state backend instead, just to see how much of the pain is caused by RocksDB.







          share|improve this answer
























          • Thank you. I will try these methods.

            – user2928444
            Nov 21 '18 at 13:49











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393775%2fintervaljoin-is-stucked-in-rocksdbseek-for-too-long-time-in-flink-1-6-2%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          A few thoughts:




          • You could ask on the flink-user mailing list -- in general, operational questions like this are more likely to elicit knowledgeable responses on the mailing list than on stack overflow.


          • I've heard that if RocksDB is given more off-heap memory to work with, it can help because RocksDB will use it for caching. Sorry, but I don't know how any details of how to go about configuring this.


          • Perhaps increasing the parallelism would help.


          • If it's possible to do so, it might be interesting to try running with the heap-based state backend instead, just to see how much of the pain is caused by RocksDB.







          share|improve this answer
























          • Thank you. I will try these methods.

            – user2928444
            Nov 21 '18 at 13:49
















          0














          A few thoughts:




          • You could ask on the flink-user mailing list -- in general, operational questions like this are more likely to elicit knowledgeable responses on the mailing list than on stack overflow.


          • I've heard that if RocksDB is given more off-heap memory to work with, it can help because RocksDB will use it for caching. Sorry, but I don't know how any details of how to go about configuring this.


          • Perhaps increasing the parallelism would help.


          • If it's possible to do so, it might be interesting to try running with the heap-based state backend instead, just to see how much of the pain is caused by RocksDB.







          share|improve this answer
























          • Thank you. I will try these methods.

            – user2928444
            Nov 21 '18 at 13:49














          0












          0








          0







          A few thoughts:




          • You could ask on the flink-user mailing list -- in general, operational questions like this are more likely to elicit knowledgeable responses on the mailing list than on stack overflow.


          • I've heard that if RocksDB is given more off-heap memory to work with, it can help because RocksDB will use it for caching. Sorry, but I don't know how any details of how to go about configuring this.


          • Perhaps increasing the parallelism would help.


          • If it's possible to do so, it might be interesting to try running with the heap-based state backend instead, just to see how much of the pain is caused by RocksDB.







          share|improve this answer













          A few thoughts:




          • You could ask on the flink-user mailing list -- in general, operational questions like this are more likely to elicit knowledgeable responses on the mailing list than on stack overflow.


          • I've heard that if RocksDB is given more off-heap memory to work with, it can help because RocksDB will use it for caching. Sorry, but I don't know how any details of how to go about configuring this.


          • Perhaps increasing the parallelism would help.


          • If it's possible to do so, it might be interesting to try running with the heap-based state backend instead, just to see how much of the pain is caused by RocksDB.








          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 21 '18 at 12:30









          David AndersonDavid Anderson

          5,03121121




          5,03121121













          • Thank you. I will try these methods.

            – user2928444
            Nov 21 '18 at 13:49



















          • Thank you. I will try these methods.

            – user2928444
            Nov 21 '18 at 13:49

















          Thank you. I will try these methods.

          – user2928444
          Nov 21 '18 at 13:49





          Thank you. I will try these methods.

          – user2928444
          Nov 21 '18 at 13:49


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53393775%2fintervaljoin-is-stucked-in-rocksdbseek-for-too-long-time-in-flink-1-6-2%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          404 Error Contact Form 7 ajax form submitting

          How to know if a Active Directory user can login interactively

          TypeError: fit_transform() missing 1 required positional argument: 'X'