Streaming aiokafka consumer messages

I’m trying to create an stream for an aiokafka consumer without success. My code is this one so far:

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiohttp_sse import sse_response

import asyncio
from views import render
import motor.motor_asyncio
import json

loop = asyncio.get_event_loop()
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://mongodb:27017')
db = client['kafka_data']

async def create_order(request):
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='kafka1:9092')
        # Get cluster layout and initial topic/partition leadership information
    await producer.start()
        # Produce message
        await producer.send_and_wait("order", b"TESTE")
        return await render.json({"status": "sent", "msg": "TESTE"})
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

async def initiate_consumer(loop):
    consumer = AIOKafkaConsumer(
        loop=loop, bootstrap_servers='kafka1:9092')
    await consumer.start()
    return consumer

async def consume(c):
    msgs = []
        async for msg in c:
            msgs.append({'msg': msg.value.decode('utf-8'), 'timestamp':msg.timestamp})
            await db.kafka.insert_one({'msg': msg.value.decode('utf-8'), 'time': msg.timestamp})
        return msgs

async def get_orders(request):
        loop = asyncio.get_event_loop()
        async with sse_response(request) as resp:
            while True:
                consumer = await initiate_consumer(loop)
                messages = await consume(consumer)
                for msg in messages:
                    await resp.send(json.dumps(msg))
                    await asyncio.sleep(3, loop=loop)
                await consumer.stop()

        return resp

Hi. Do you have an exception with stack trace?

Nope, I just don’t have output. Actually the insert on mongodb is working fine.

Anyway, aiokafka has no insert_one method, so I have no idea what did you want to write.

Can you check if the message is arriving to Kafka?
Try with the following:
bin / --bootstrap-server localhost:9092 --topic test --from-beginning
Taken from here: