asyncio queue multi producer (sync) single consumer












0















an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.



one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed



import asyncio
import logging
import sys

logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()

async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")

async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1

async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)

async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))

await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()

asyncio.run(main())


The example does not work as i want beacause await queue.put(item) does'not await queue task_done().
A possible workaround could be to put on the queue (event,item) where event = asyncio.Event() and then await event. Is that a "good" workaraound?










share|improve this question























  • The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

    – user4815162342
    Nov 24 '18 at 12:06













  • In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

    – Enzo Scossa-Romano
    Nov 24 '18 at 20:20













  • you cancelled periodic_producer_task while it is sleeping.

    – kcorlidy
    Nov 25 '18 at 2:11


















0















an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.



one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed



import asyncio
import logging
import sys

logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()

async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")

async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1

async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)

async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))

await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()

asyncio.run(main())


The example does not work as i want beacause await queue.put(item) does'not await queue task_done().
A possible workaround could be to put on the queue (event,item) where event = asyncio.Event() and then await event. Is that a "good" workaraound?










share|improve this question























  • The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

    – user4815162342
    Nov 24 '18 at 12:06













  • In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

    – Enzo Scossa-Romano
    Nov 24 '18 at 20:20













  • you cancelled periodic_producer_task while it is sleeping.

    – kcorlidy
    Nov 25 '18 at 2:11
















0












0








0








an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.



one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed



import asyncio
import logging
import sys

logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()

async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")

async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1

async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)

async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))

await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()

asyncio.run(main())


The example does not work as i want beacause await queue.put(item) does'not await queue task_done().
A possible workaround could be to put on the queue (event,item) where event = asyncio.Event() and then await event. Is that a "good" workaraound?










share|improve this question














an asyncio program has two task that produce messages which are put on a queue, another task consume the queue.



one producer produce periodic task.
the other producer has to be synced with the consumer, it has to await till its own message have been consumed



import asyncio
import logging
import sys

logging.basicConfig( stream=sys.stdout,format='%(asctime)-5s: %(funcName)-15s: %(message)s',datefmt='%I:%M:%S',level=logging.INFO)
logger = logging.getLogger()

async def sync_producer(queue):
for x in range(5):
item = f"sync producer{x}"
logger.info(f"{item} ")
await queue.put(item)# <= at this point I want to await that the message have been consumed
logger.info(f"sync producer finish")

async def periodic_producer(queue):
x=0
while True:
await asyncio.sleep(1)
item = f"periodic producer {x}"
logger.info(f"{item} ")
queue.put_nowait(item)
x+=1

async def consumer(queue):
while True:
item = await queue.get()
logger.info(f"{item}")
queue.task_done()
await asyncio.sleep(1)

async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
periodic_producer_task = asyncio.create_task(periodic_producer(queue))
producer_task = asyncio.create_task(sync_producer(queue))

await producer_task
periodic_producer_task.cancel()
await queue.join()
consumer_task.cancel()

asyncio.run(main())


The example does not work as i want beacause await queue.put(item) does'not await queue task_done().
A possible workaround could be to put on the queue (event,item) where event = asyncio.Event() and then await event. Is that a "good" workaraound?







python-3.x python-asyncio






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 24 '18 at 10:58









Enzo Scossa-RomanoEnzo Scossa-Romano

1




1













  • The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

    – user4815162342
    Nov 24 '18 at 12:06













  • In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

    – Enzo Scossa-Romano
    Nov 24 '18 at 20:20













  • you cancelled periodic_producer_task while it is sleeping.

    – kcorlidy
    Nov 25 '18 at 2:11





















  • The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

    – user4815162342
    Nov 24 '18 at 12:06













  • In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

    – Enzo Scossa-Romano
    Nov 24 '18 at 20:20













  • you cancelled periodic_producer_task while it is sleeping.

    – kcorlidy
    Nov 25 '18 at 2:11



















The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

– user4815162342
Nov 24 '18 at 12:06







The producer waiting for a queued item to be processed appears to be contrary to the premise of a queue: that of producing and consuming (processing) being decoupled from each other. If sync_producer needs to await the processing, can't it just call the processing function and not bother with queuing in the first place?

– user4815162342
Nov 24 '18 at 12:06















In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

– Enzo Scossa-Romano
Nov 24 '18 at 20:20







In my real case the consumer will be a CAN bus instance which publish messages on the Bus. I need many normal producer and one single sync cproducer. I don't know if it will be safe to have more corutines using the same CAN bus instance at the same time

– Enzo Scossa-Romano
Nov 24 '18 at 20:20















you cancelled periodic_producer_task while it is sleeping.

– kcorlidy
Nov 25 '18 at 2:11







you cancelled periodic_producer_task while it is sleeping.

– kcorlidy
Nov 25 '18 at 2:11














0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53457420%2fasyncio-queue-multi-producer-sync-single-consumer%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53457420%2fasyncio-queue-multi-producer-sync-single-consumer%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

404 Error Contact Form 7 ajax form submitting

How to know if a Active Directory user can login interactively

TypeError: fit_transform() missing 1 required positional argument: 'X'