Wrapping a Queue in Future












0















I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing library.



The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get. I think the correct solution is to wrap the get calls in some sort of future. I've tried for hours, but haven't figured out how to do this.



Inside of my multiprocessing script:



class ProcessToMonitor(multiprocessing.Process)

def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()

def run():
while True:
# do stuff
self.queue.put(value)


Then, in my Tornado script



class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()

def open(self):
self.connections.add(self)

def close(self):
self.connections.remove(self)

@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]

def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)

if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)

threading.Thread(target=worker)

ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()









share|improve this question



























    0















    I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing library.



    The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get. I think the correct solution is to wrap the get calls in some sort of future. I've tried for hours, but haven't figured out how to do this.



    Inside of my multiprocessing script:



    class ProcessToMonitor(multiprocessing.Process)

    def __init__(self):
    multiprocessing.Process.__init__(self)
    self.queue = multiprocessing.Queue()

    def run():
    while True:
    # do stuff
    self.queue.put(value)


    Then, in my Tornado script



    class MyWebSocket(tornado.websocket.WebSocketHandler):
    connections = set()

    def open(self):
    self.connections.add(self)

    def close(self):
    self.connections.remove(self)

    @classmethod
    def emit(self, message):
    [client.write_message(message) for client in self.connections]

    def worker():
    ptm = ProcessToMonitor()
    ptm.start()
    while True:
    message = ptm.queue.get()
    MyWebSocket.emit(message)

    if __name__ == '__main__':
    app = tornado.web.Application([
    (r'/', MainHandler), # Not shown
    (r'/websocket', MyWebSocket)
    ])
    app.listen(8888)

    threading.Thread(target=worker)

    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()









    share|improve this question

























      0












      0








      0








      I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing library.



      The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get. I think the correct solution is to wrap the get calls in some sort of future. I've tried for hours, but haven't figured out how to do this.



      Inside of my multiprocessing script:



      class ProcessToMonitor(multiprocessing.Process)

      def __init__(self):
      multiprocessing.Process.__init__(self)
      self.queue = multiprocessing.Queue()

      def run():
      while True:
      # do stuff
      self.queue.put(value)


      Then, in my Tornado script



      class MyWebSocket(tornado.websocket.WebSocketHandler):
      connections = set()

      def open(self):
      self.connections.add(self)

      def close(self):
      self.connections.remove(self)

      @classmethod
      def emit(self, message):
      [client.write_message(message) for client in self.connections]

      def worker():
      ptm = ProcessToMonitor()
      ptm.start()
      while True:
      message = ptm.queue.get()
      MyWebSocket.emit(message)

      if __name__ == '__main__':
      app = tornado.web.Application([
      (r'/', MainHandler), # Not shown
      (r'/websocket', MyWebSocket)
      ])
      app.listen(8888)

      threading.Thread(target=worker)

      ioloop = tornado.ioloop.IOLoop.current()
      ioloop.start()









      share|improve this question














      I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing library.



      The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get. I think the correct solution is to wrap the get calls in some sort of future. I've tried for hours, but haven't figured out how to do this.



      Inside of my multiprocessing script:



      class ProcessToMonitor(multiprocessing.Process)

      def __init__(self):
      multiprocessing.Process.__init__(self)
      self.queue = multiprocessing.Queue()

      def run():
      while True:
      # do stuff
      self.queue.put(value)


      Then, in my Tornado script



      class MyWebSocket(tornado.websocket.WebSocketHandler):
      connections = set()

      def open(self):
      self.connections.add(self)

      def close(self):
      self.connections.remove(self)

      @classmethod
      def emit(self, message):
      [client.write_message(message) for client in self.connections]

      def worker():
      ptm = ProcessToMonitor()
      ptm.start()
      while True:
      message = ptm.queue.get()
      MyWebSocket.emit(message)

      if __name__ == '__main__':
      app = tornado.web.Application([
      (r'/', MainHandler), # Not shown
      (r'/websocket', MyWebSocket)
      ])
      app.listen(8888)

      threading.Thread(target=worker)

      ioloop = tornado.ioloop.IOLoop.current()
      ioloop.start()






      python asynchronous concurrency multiprocessing tornado






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 26 '18 at 3:18









      Jonathan WheelerJonathan Wheeler

      1,122822




      1,122822
























          1 Answer
          1






          active

          oldest

          votes


















          1














          queue.get isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get fits perfectly for you use case inside a while loop.



          I think you're probably using it incorrectly. You'll have to make the worker function a coroutine (async/await syntax):



          async def worker():
          ...
          while True:
          message = await queue.get()
          ...




          However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait.



          One thing to note here is thatqueue.get_nowait will raise an exception called QueueEmpty if the queue is empty. So, you'll need to handle that exception.



          Example:



          while True:
          try:
          message = queue.get_nowait()
          except QueueEmpty:
          # wait for some time before
          # next iteration
          # otherwise this loop will
          # keep running for no reason

          MyWebSocket.emit(message)


          As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.



          So why not use queue.get in the first place?






          share|improve this answer


























          • It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

            – Jonathan Wheeler
            Nov 26 '18 at 6:57











          • @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

            – xyres
            Nov 26 '18 at 16:09













          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474369%2fwrapping-a-queue-in-future%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          queue.get isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get fits perfectly for you use case inside a while loop.



          I think you're probably using it incorrectly. You'll have to make the worker function a coroutine (async/await syntax):



          async def worker():
          ...
          while True:
          message = await queue.get()
          ...




          However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait.



          One thing to note here is thatqueue.get_nowait will raise an exception called QueueEmpty if the queue is empty. So, you'll need to handle that exception.



          Example:



          while True:
          try:
          message = queue.get_nowait()
          except QueueEmpty:
          # wait for some time before
          # next iteration
          # otherwise this loop will
          # keep running for no reason

          MyWebSocket.emit(message)


          As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.



          So why not use queue.get in the first place?






          share|improve this answer


























          • It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

            – Jonathan Wheeler
            Nov 26 '18 at 6:57











          • @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

            – xyres
            Nov 26 '18 at 16:09


















          1














          queue.get isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get fits perfectly for you use case inside a while loop.



          I think you're probably using it incorrectly. You'll have to make the worker function a coroutine (async/await syntax):



          async def worker():
          ...
          while True:
          message = await queue.get()
          ...




          However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait.



          One thing to note here is thatqueue.get_nowait will raise an exception called QueueEmpty if the queue is empty. So, you'll need to handle that exception.



          Example:



          while True:
          try:
          message = queue.get_nowait()
          except QueueEmpty:
          # wait for some time before
          # next iteration
          # otherwise this loop will
          # keep running for no reason

          MyWebSocket.emit(message)


          As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.



          So why not use queue.get in the first place?






          share|improve this answer


























          • It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

            – Jonathan Wheeler
            Nov 26 '18 at 6:57











          • @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

            – xyres
            Nov 26 '18 at 16:09
















          1












          1








          1







          queue.get isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get fits perfectly for you use case inside a while loop.



          I think you're probably using it incorrectly. You'll have to make the worker function a coroutine (async/await syntax):



          async def worker():
          ...
          while True:
          message = await queue.get()
          ...




          However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait.



          One thing to note here is thatqueue.get_nowait will raise an exception called QueueEmpty if the queue is empty. So, you'll need to handle that exception.



          Example:



          while True:
          try:
          message = queue.get_nowait()
          except QueueEmpty:
          # wait for some time before
          # next iteration
          # otherwise this loop will
          # keep running for no reason

          MyWebSocket.emit(message)


          As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.



          So why not use queue.get in the first place?






          share|improve this answer















          queue.get isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get fits perfectly for you use case inside a while loop.



          I think you're probably using it incorrectly. You'll have to make the worker function a coroutine (async/await syntax):



          async def worker():
          ...
          while True:
          message = await queue.get()
          ...




          However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait.



          One thing to note here is thatqueue.get_nowait will raise an exception called QueueEmpty if the queue is empty. So, you'll need to handle that exception.



          Example:



          while True:
          try:
          message = queue.get_nowait()
          except QueueEmpty:
          # wait for some time before
          # next iteration
          # otherwise this loop will
          # keep running for no reason

          MyWebSocket.emit(message)


          As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.



          So why not use queue.get in the first place?







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 26 '18 at 6:35

























          answered Nov 26 '18 at 6:25









          xyresxyres

          9,82232445




          9,82232445













          • It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

            – Jonathan Wheeler
            Nov 26 '18 at 6:57











          • @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

            – xyres
            Nov 26 '18 at 16:09





















          • It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

            – Jonathan Wheeler
            Nov 26 '18 at 6:57











          • @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

            – xyres
            Nov 26 '18 at 16:09



















          It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

          – Jonathan Wheeler
          Nov 26 '18 at 6:57





          It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?

          – Jonathan Wheeler
          Nov 26 '18 at 6:57













          @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

          – xyres
          Nov 26 '18 at 16:09







          @JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the await statement and will run it when queue.get returns an object. Pretty much what you'd do in a callback, but this has better syntax.

          – xyres
          Nov 26 '18 at 16:09






















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474369%2fwrapping-a-queue-in-future%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          404 Error Contact Form 7 ajax form submitting

          How to know if a Active Directory user can login interactively

          TypeError: fit_transform() missing 1 required positional argument: 'X'