
    i                     T    S SK r SSKJr   S SKr " S S\5      rg! \ a    Sr Nf = f)    N   )AsyncPubSubManagerc                   \   ^  \ rS rSrSrSr  SU 4S jjrS rS rS r	S r
S	 rS
 rSrU =r$ )AsyncAioPikaManager   af  Client manager that uses aio_pika for inter-process messaging under
asyncio.

This class implements a client manager backend for event sharing across
multiple processes, using RabbitMQ

To use a aio_pika backend, initialize the :class:`Server` instance as
follows::

    url = 'amqp://user:password@hostname:port//'
    server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(
        url))

:param url: The connection URL for the backend messaging queue. Example
            connection URLs are ``'amqp://guest:guest@localhost:5672//'``
            for RabbitMQ.
:param channel: The channel name on which the server sends and receives
                notifications. Must be the same in all the servers.
                With this manager, the channel name is the exchange name
                in rabbitmq
:param write_only: If set to ``True``, only initialize to emit events. The
                   default of ``False`` initializes the class for emitting
                   and receiving. A write-only instance can be used
                   independently of the server to emit to clients from an
                   external process.
:param logger: a custom logger to log it. If not given, the server logger
               is used.
:param json: An alternative JSON module to use for encoding and decoding
             packets. Custom json modules must have ``dumps`` and ``loads``
             functions that are compatible with the standard library
             versions. This setting is only used when ``write_only`` is set
             to ``True``. Otherwise the JSON module configured in the
             server is used.
asyncaiopikac                    > [         c  [        S5      e[        TU ]  X#UUS9  Xl        [
        R                  " 5       U l        S U l        S U l	        S U l
        g )NzRaio_pika package is not installed (Run "pip install aio_pika" in your virtualenv).)channel
write_onlyloggerjson)aio_pikaRuntimeErrorsuper__init__urlasyncioLock_lockpublisher_connectionpublisher_channelpublisher_exchange)selfr   r
   r   r   r   	__class__s         [/home/admin/cozy_coffee/venv/lib/python3.13/site-packages/socketio/async_aiopika_manager.pyr   AsyncAioPikaManager.__init__1   s`      . / / 	" 	 	$\\^
$(!!%"&    c                 ^   #    [         R                  " U R                  5      I S h  vN $  N7fN)r   connect_robustr   )r   s    r   _connectionAsyncAioPikaManager._connection?   s      ,,TXX6666s   $-+-c                 >   #    UR                  5       I S h  vN $  N7fr   )r
   )r   
connections     r   _channelAsyncAioPikaManager._channelB   s     ''))))s   c                    #    UR                  U R                  [        R                  R                  5      I S h  vN $  N7fr   )declare_exchanger
   r   ExchangeTypeFANOUT)r   r
   s     r   	_exchangeAsyncAioPikaManager._exchangeE   s;     --dll.6.C.C.J.JL L 	L Ls   8A?Ac                 |   #    UR                  SSS0S9I S h  vN nUR                  U5      I S h  vN   U$  N N7f)NFz	x-expiresi )durable	arguments)declare_queuebind)r   r
   exchangequeues       r   _queueAsyncAioPikaManager._queueI   sK     ++E7BF6K , M Mjj"""M"s   <8<:<<c                   #    U R                   c  U R                   IS h  vN   U R                   cm  U R                  5       I S h  vN U l         U R                  U R                   5      I S h  vN U l        U R                  U R                  5      I S h  vN U l        S S S 5      IS h  vN   Sn  U R                  R                  [        R                  " U R                  R                  U5      R                  5       [        R                  R                  S9SS9I S h  vN   g  GN N N N N! , IS h  vN  (       d  f       N= f N*! [        R                   aL    U(       a#  U R!                  5       R#                  S5        Sn OWU R!                  5       R#                  S5         g [        R$                  R&                   a    [(        R*                  " 5       ef = fGM5  7f)NT)bodydelivery_mode*)routing_keyz&Cannot publish to rabbitmq... retryingFz'Cannot publish to rabbitmq... giving up)r   r   r!   r%   r   r+   r   publishr   Messager   dumpsencodeDeliveryMode
PERSISTENTAMQPException_get_loggererror
exceptionsChannelInvalidStateErrorr   CancelledError)r   dataretrys      r   _publishAsyncAioPikaManager._publishO   s    $$,zzz,,46:6F6F6H0HD-37==114 .D* 59NN..5 /D+ "z /--55$$!YY__T299;&.&;&;&F&F $'	 6    % "0H./ "zzz )) $$&,, .8 9!E$$&,,AC&&?? /,,../% s   G'D(G'!D3D+'D3-D-.'D3D/	D3G'*D1+G'3A/E "E#E 'G'+D3-D3/D31G'3E
9D<:E
G'E <G!G'G!,G'.3G!!G'c           
     >  #    Sn  U R                  5       I S h  vN  IS h  vN nU R                  U5      I S h  vN nUR                  SS9I S h  vN   U R                  U5      I S h  vN nU R	                  X45      I S h  vN nUR                  5        IS h  vN nU  S h  vN nUR                  5        IS h  vN   UR                  7v   SnS S S 5      IS h  vN   ME   N N N N N Nn NW NN N7 N! , IS h  vN  (       d  f       Mq  = f
 S S S 5      IS h  vN    O! , IS h  vN  (       d  f       O= fS S S 5      IS h  vN    O! , IS h  vN  (       d  f       O= f! [        R                   a_    U R                  5       R                  SR                  U5      5        [        R                  " U5      I S h  vN    [        US-  S5      n O6[        R                   R"                   a    [        R$                  " 5       ef = fGM  7f)Nr   )prefetch_countz3Cannot receive from rabbitmq... retrying in {} secs   <   )r!   r%   set_qosr+   r4   iteratorprocessr7   r   rA   rB   rC   formatr   sleepminrD   rE   rF   )r   retry_sleepr$   r
   r2   r3   
queue_itermessages           r   _listenAsyncAioPikaManager._listenq   s    /"&"2"2"4444$(MM*$==G!///;;;%)^^G%<<H"&++g"@@E$~~//:-7 0''.'8'8&-ll 2./ (9'8'8 5=;<@/0'8'8'8'8 .8  0///// 544444 )) 7  "((**0&*=? mmK000!+/26&&?? /,,../' s~  HE2 C*E2 C,E2 EC.EC0E,C2-EC4EC6E"D,%D)C8
*D-D,C:D,C>	D,#C<
$D,*E2 ,E2 .E0E2E4E6E8D:D,<D,>DDD	D,E%D(&E,E	2D53E	?EE2 EE2 HE/E!E/+E2 .H/E2 2AHGH"H$3HH)r   r   r   r   r   )z#amqp://guest:guest@localhost:5672//socketioFNN)__name__
__module____qualname____firstlineno____doc__namer   r!   r%   r+   r4   rI   rX   __static_attributes____classcell__)r   s   @r   r   r      s@    !F D@IM'7*L /D/ /r   r   )r   async_pubsub_managerr   r   ImportErrorr    r   r   <module>rf      s8     4
}/, }/	  Hs    ''