asyncio queue multi producer (sync) single consumer
an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.
one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed
import asyncio
import logging
import sys
logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()
async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")
async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1
async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))
await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()
asyncio.run(main())
The example does not work as i want beacause await queue.put(item)
does'not await queue task_done()
.
A possible workaround could be to put on the queue (event,item)
where event = asyncio.Event()
and then await event
. Is that a "good" workaraound?
python-3.x python-asyncio
add a comment |
an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.
one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed
import asyncio
import logging
import sys
logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()
async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")
async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1
async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))
await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()
asyncio.run(main())
The example does not work as i want beacause await queue.put(item)
does'not await queue task_done()
.
A possible workaround could be to put on the queue (event,item)
where event = asyncio.Event()
and then await event
. Is that a "good" workaraound?
python-3.x python-asyncio
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. Ifsync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?
– user4815162342
Nov 24 '18 at 12:06
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
you cancelledperiodic_producer_task
while it is sleeping.
– kcorlidy
Nov 25 '18 at 2:11
add a comment |
an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.
one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed
import asyncio
import logging
import sys
logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()
async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")
async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1
async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))
await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()
asyncio.run(main())
The example does not work as i want beacause await queue.put(item)
does'not await queue task_done()
.
A possible workaround could be to put on the queue (event,item)
where event = asyncio.Event()
and then await event
. Is that a "good" workaraound?
python-3.x python-asyncio
an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.
one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed
import asyncio
import logging
import sys
logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()
async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")
async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1
async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))
await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()
asyncio.run(main())
The example does not work as i want beacause await queue.put(item)
does'not await queue task_done()
.
A possible workaround could be to put on the queue (event,item)
where event = asyncio.Event()
and then await event
. Is that a "good" workaraound?
python-3.x python-asyncio
python-3.x python-asyncio
asked Nov 24 '18 at 10:58
Enzo Scossa-RomanoEnzo Scossa-Romano
1
1
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. Ifsync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?
– user4815162342
Nov 24 '18 at 12:06
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
you cancelledperiodic_producer_task
while it is sleeping.
– kcorlidy
Nov 25 '18 at 2:11
add a comment |
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. Ifsync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?
– user4815162342
Nov 24 '18 at 12:06
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
you cancelledperiodic_producer_task
while it is sleeping.
– kcorlidy
Nov 25 '18 at 2:11
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If
sync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?– user4815162342
Nov 24 '18 at 12:06
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If
sync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?– user4815162342
Nov 24 '18 at 12:06
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
you cancelled
periodic_producer_task
while it is sleeping.– kcorlidy
Nov 25 '18 at 2:11
you cancelled
periodic_producer_task
while it is sleeping.– kcorlidy
Nov 25 '18 at 2:11
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53457420%2fasyncio-queue-multi-producer-sync-single-consumer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53457420%2fasyncio-queue-multi-producer-sync-single-consumer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If
sync_producer
needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?– user4815162342
Nov 24 '18 at 12:06
In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time
– Enzo Scossa-Romano
Nov 24 '18 at 20:20
you cancelled
periodic_producer_task
while it is sleeping.– kcorlidy
Nov 25 '18 at 2:11