Task exception was never retrieved when using Asyncio and Websockets

  python, python-3.x, python-asyncio, websocket

I keep getting an error while using Asyncio and Websockets in a multiprocessing python script. Each time this error occurs, it causes my program to basically hang. My other processes seem to continue to work, but the websocket process no longer communicates. Any suggestion on what my problem is and how to solve it?

The error in the logs is as follows:

Nov 29 15:28:34 fbcradiocast1 python3[22482]: Task exception was never retrieved
Nov 29 15:28:34 fbcradiocast1 python3[22482]: future: <Task finished coro=<websocket_server.<locals>.broadcast() done, defined at /home/wruser/radio/python/fbcradiocast.py:329> exception=Connectio$
Nov 29 15:28:34 fbcradiocast1 python3[22482]: Traceback (most recent call last):
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/home/wruser/radio/python/fbcradiocast.py", line 342, in broadcast
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     return_exceptions=False,)
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 555, in send
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     await self.ensure_open()
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 803, in ensure_open
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     raise self.connection_closed_exc()
Nov 29 15:28:34 fbcradiocast1 python3[22482]: websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason

The function running websockets is as follows:

def websocket_server(event, q):
    # imports needed only by this function
    import asyncio
    import websockets
    import queue

    # we will ignore signal and let main process terminate child processes
    signal.signal(signal.SIGINT, signal.SIG_IGN)

    # Get PID of process
    pid = os.getpid()
    mylog.logger.info(f"Starting process with PID {pid}")

    CLIENTS = set()
    workQueue = queue.Queue(10)

    async def broadcast():
        while True:
            # when event is set, we'll stop loop
            if event.is_set():
                loop.stop()
            elif not workQueue.empty():
                data = workQueue.get()
                if (data.find("update_airplay") != -1):
                    q.put(data);

                mylog.logger.info(f"Processing {data}")
                await asyncio.gather(
                *[ws.send(data) for ws in CLIENTS],
                return_exceptions=False,)
     
            await asyncio.sleep(0.01)

    async def handler(websocket, path):
        CLIENTS.add(websocket)
        #mylog.logger.info("[WEBSOCKET_SERVER] Add client")
        try:
            async for msg in websocket:
                workQueue.put(msg)
        except websockets.ConnectionClosedError:
            mylog.logger.warning("Connection closed")   
        finally:
            CLIENTS.remove(websocket)
            #mylog.logger.info("Remove client")

    loop = asyncio.get_event_loop()
    loop.create_task(broadcast())

    # read config file to get server and port and then assign variable the command
    config = configparser.ConfigParser()
    config.read('config.ini')
    server = config['WEBSOCKET']['server']
    port = config['WEBSOCKET']['port']
    start_server = websockets.serve(handler, server, port)

    try:
        # start the server and looping it forever
        loop.run_until_complete(start_server)
        loop.run_forever()
    except Exception as err: # catch *all* exceptions
        mylog.logger.error(f"{err}")
        radio_lib.sendmail("websocket_server function in fbcradiocast", f"Error in websocket_server: {err}")

Source: Python Questions

LEAVE A COMMENT