Wrapping a Queue in Future
I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing
library.
The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get
. I think the correct solution is to wrap the get
calls in some sort of future. I've tried for hours, but haven't figured out how to do this.
Inside of my multiprocessing script:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
Then, in my Tornado script
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
python asynchronous concurrency multiprocessing tornado
add a comment |
I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing
library.
The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get
. I think the correct solution is to wrap the get
calls in some sort of future. I've tried for hours, but haven't figured out how to do this.
Inside of my multiprocessing script:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
Then, in my Tornado script
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
python asynchronous concurrency multiprocessing tornado
add a comment |
I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing
library.
The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get
. I think the correct solution is to wrap the get
calls in some sort of future. I've tried for hours, but haven't figured out how to do this.
Inside of my multiprocessing script:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
Then, in my Tornado script
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
python asynchronous concurrency multiprocessing tornado
I am writing a Tornado webserver in Python 3.7 to display the status of processes run by the multiprocessing
library.
The following code works, but I'd like to be able to do it using Tornado's built-in library instead of hacking in the threading library. I haven't figured out how to do it without blocking Tornado during queue.get
. I think the correct solution is to wrap the get
calls in some sort of future. I've tried for hours, but haven't figured out how to do this.
Inside of my multiprocessing script:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
Then, in my Tornado script
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
python asynchronous concurrency multiprocessing tornado
python asynchronous concurrency multiprocessing tornado
asked Nov 26 '18 at 3:18
Jonathan WheelerJonathan Wheeler
1,122822
1,122822
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
queue.get
isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get
fits perfectly for you use case inside a while loop.
I think you're probably using it incorrectly. You'll have to make the worker
function a coroutine (async
/await
syntax):
async def worker():
...
while True:
message = await queue.get()
...
However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait
.
One thing to note here is thatqueue.get_nowait
will raise an exception called QueueEmpty
if the queue is empty. So, you'll need to handle that exception.
Example:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.
So why not use queue.get
in the first place?
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at theawait
statement and will run it whenqueue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.
– xyres
Nov 26 '18 at 16:09
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474369%2fwrapping-a-queue-in-future%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
queue.get
isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get
fits perfectly for you use case inside a while loop.
I think you're probably using it incorrectly. You'll have to make the worker
function a coroutine (async
/await
syntax):
async def worker():
...
while True:
message = await queue.get()
...
However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait
.
One thing to note here is thatqueue.get_nowait
will raise an exception called QueueEmpty
if the queue is empty. So, you'll need to handle that exception.
Example:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.
So why not use queue.get
in the first place?
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at theawait
statement and will run it whenqueue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.
– xyres
Nov 26 '18 at 16:09
add a comment |
queue.get
isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get
fits perfectly for you use case inside a while loop.
I think you're probably using it incorrectly. You'll have to make the worker
function a coroutine (async
/await
syntax):
async def worker():
...
while True:
message = await queue.get()
...
However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait
.
One thing to note here is thatqueue.get_nowait
will raise an exception called QueueEmpty
if the queue is empty. So, you'll need to handle that exception.
Example:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.
So why not use queue.get
in the first place?
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at theawait
statement and will run it whenqueue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.
– xyres
Nov 26 '18 at 16:09
add a comment |
queue.get
isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get
fits perfectly for you use case inside a while loop.
I think you're probably using it incorrectly. You'll have to make the worker
function a coroutine (async
/await
syntax):
async def worker():
...
while True:
message = await queue.get()
...
However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait
.
One thing to note here is thatqueue.get_nowait
will raise an exception called QueueEmpty
if the queue is empty. So, you'll need to handle that exception.
Example:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.
So why not use queue.get
in the first place?
queue.get
isn't a blocking function, it just waits until there's an item in the queue in case the queue is empty. I can see from your code that queue.get
fits perfectly for you use case inside a while loop.
I think you're probably using it incorrectly. You'll have to make the worker
function a coroutine (async
/await
syntax):
async def worker():
...
while True:
message = await queue.get()
...
However, if you don't want to wait for an item and would like to proceed immediately, its alternative is queue.get_nowait
.
One thing to note here is thatqueue.get_nowait
will raise an exception called QueueEmpty
if the queue is empty. So, you'll need to handle that exception.
Example:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
As you can see, you'll have to use pause the while loop for some time if the queue is empty to prevent it from overwhelming the system.
So why not use queue.get
in the first place?
edited Nov 26 '18 at 6:35
answered Nov 26 '18 at 6:25
xyresxyres
9,82232445
9,82232445
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at theawait
statement and will run it whenqueue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.
– xyres
Nov 26 '18 at 16:09
add a comment |
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at theawait
statement and will run it whenqueue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.
– xyres
Nov 26 '18 at 16:09
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
It sounds like this is just going to end up polling the queue. Is there a way to set up a listener/callback?
– Jonathan Wheeler
Nov 26 '18 at 6:57
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the
await
statement and will run it when queue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.– xyres
Nov 26 '18 at 16:09
@JonathanWheeler The first example is not polling at all. Tornado will pause the coroutine at the
await
statement and will run it when queue.get
returns an object. Pretty much what you'd do in a callback, but this has better syntax.– xyres
Nov 26 '18 at 16:09
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474369%2fwrapping-a-queue-in-future%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown