File manager - Edit - /usr/local/CyberPanel/lib/python3.10/site-packages/asgiref/__pycache__/testing.cpython-310.pyc
Back
o �h� � @ sB d dl Z d dlZd dlZddlmZ ddlmZ G dd� d�ZdS )� N� )�guarantee_single_callable)�timeoutc @ sP e Zd ZdZdd� Zddd�Zddd �Zd d� Zdd � Zddd�Z ddd�Z dS )�ApplicationCommunicatorz} Runs an ASGI application in a test mode, allowing sending of messages to it and retrieval of messages it sends. c C sN t |�| _|| _t�� | _t�� | _t�� � tj | �|| jj| jj��| _ d S �N)r �application�scope�asyncio�Queue�input_queue�output_queue�contextvars�Context�run�create_task�get�put�future)�selfr r � r �E/usr/local/CyberPanel/lib/python3.10/site-packages/asgiref/testing.py�__init__ s �z ApplicationCommunicator.__init__r c � s� �zXt |�4 I dH �# z | jI dH | j�� W n tjy" Y nw W d �I dH n1 I dH s3w Y W | j�� sX| j�� z | jI dH W dS tjyW Y dS w dS | j�� sw| j�� z| jI dH W w tjyv Y w w w )zV Waits for the application to stop itself and returns any exceptions. N)� async_timeoutr �resultr �CancelledError�done�cancel)r r r r r �wait s6 ���(�� �� ��zApplicationCommunicator.waitTc C s. | j �� s| j �� d S |r| j �� d S d S r )r r r r )r � exceptionsr r r �stop0 s �zApplicationCommunicator.stopc C s( z | j dd� W d S ty Y d S w )NF)r )r �RuntimeError)r r r r �__del__7 s �zApplicationCommunicator.__del__c � s �| j �|�I dH dS )z; Sends a single message to the application N)r r )r �messager r r � send_input? s �z"ApplicationCommunicator.send_inputc � s� �| j �� r| j �� z*t|�4 I dH � | j�� I dH W d �I dH W S 1 I dH s.w Y W dS tjyh } z&| j �� rJ| j �� |�| j �� z | j I dH W |� tj yc Y |�w d}~ww )zX Receives a single message from the application, with optional timeout. N) r r r r r r r �TimeoutErrorr r )r r �er r r �receive_outputF s* � 4� �����z&ApplicationCommunicator.receive_output皙�����?�{�G�z�?c � sR �t �� }t �� | |k r$| j�� sdS t�|�I dH t �� | |k s | j�� S )zO Checks that there is no message to receive in the given time. FN)�time� monotonicr �emptyr �sleep)r r �interval�startr r r �receive_nothing] s � � z'ApplicationCommunicator.receive_nothingN)r )T)r'