
    iD                     \    S SK r S SKr S SKrSSKJr   " S S\5      rg! \ a    Sr Nf = f)    N   )PubSubManagerc                   l   ^  \ rS rSrSrSr    SU 4S jjrU 4S jrS rS r	S r
S	 rS
 rS rSrU =r$ )KombuManager   a	  Client manager that uses kombu for inter-process messaging.

This class implements a client manager backend for event sharing across
multiple processes, using RabbitMQ, Redis or any other messaging mechanism
supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.

To use a kombu backend, initialize the :class:`Server` instance as
follows::

    url = 'amqp://user:password@hostname:port//'
    server = socketio.Server(client_manager=socketio.KombuManager(url))

:param url: The connection URL for the backend messaging queue. Example
            connection URLs are ``'amqp://guest:guest@localhost:5672//'``
            and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
            respectively. Consult the `kombu documentation
            <http://kombu.readthedocs.org/en/latest/userguide                /connections.html#urls>`_ for more on how to construct
            connection URLs.
:param channel: The channel name on which the server sends and receives
                notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
                   default of ``False`` initializes the class for emitting
                   and receiving. A write-only instance can be used
                   independently of the server to emit to clients from an
                   external process.
:param logger: a custom logger to log it. If not given, the server logger
               is used.
:param json: An alternative JSON module to use for encoding and decoding
             packets. Custom json modules must have ``dumps`` and ``loads``
             functions that are compatible with the standard library
             versions. This setting is only used when ``write_only`` is set
             to ``True``. Otherwise the JSON module configured in the
             server is used.
:param connection_options: additional keyword arguments to be passed to
                           ``kombu.Connection()``.
:param exchange_options: additional keyword arguments to be passed to
                         ``kombu.Exchange()``.
:param queue_options: additional keyword arguments to be passed to
                      ``kombu.Queue()``.
:param producer_options: additional keyword arguments to be passed to
                         ``kombu.Producer()``.
kombuc
                    > [         c  [        S5      e[        T
U ]  X#UUS9  Xl        U=(       d    0 U l        U=(       d    0 U l        U=(       d    0 U l        U	=(       d    0 U l        U R                  5       U l
        g )NzLKombu package is not installed (Run "pip install kombu" in your virtualenv).)channel
write_onlyloggerjson)r   RuntimeErrorsuper__init__urlconnection_optionsexchange_optionsqueue_optionsproducer_options_connectionpublisher_connection)selfr   r
   r   r   r   r   r   r   r   	__class__s             S/home/admin/cozy_coffee/venv/lib/python3.13/site-packages/socketio/kombu_manager.pyr   KombuManager.__init__:   s     =  . / / 	" 	 	$"4": 0 6B*0b 0 6B$($4$4$6!    c                   > [         TU ]  5         SnU R                  R                  S:X  a  SSKJn  U" S5      nO(SU R                  R                  ;   a  SSKJn  U" S5      nU(       d"  [        SU R                  R                  -   5      eg )	NTeventletr   )is_monkey_patchedsocketgevent)is_module_patchedz<Kombu requires a monkey patched socket library to work with )	r   
initializeserver
async_modeeventlet.patcherr   gevent.monkeyr"   r   )r   monkey_patchedr   r"   r   s       r   r#   KombuManager.initializeK   s}    ;;!!Z/:.x8N///7.x8N++0012 2 r   c                 X    [         R                  " U R                  40 U R                  D6$ )N)r   
Connectionr   r   )r   s    r   r   KombuManager._connectionZ   s"    DD,C,CDDr   c                     SSS.nUR                  U R                  5        [        R                  " U R                  40 UD6$ )NfanoutF)typedurable)updater   r   Exchanger
   )r   optionss     r   	_exchangeKombuManager._exchange]   s6    #6t,,-~~dll6g66r   c                     S[        [        R                  " 5       5      -   nSSS0S.nUR                  U R                  5        [
        R                  " XR                  5       40 UD6$ )Nzpython-socketio.Fz	x-expiresi )r0   queue_arguments)struuiduuid4r1   r   r   Queuer4   )r   
queue_namer3   s      r   _queueKombuManager._queueb   sS    '#djjl*;;
#f8MNt))*{{:~~'7C7CCr   c                     UR                   " SSU R                  5       0U R                  D6nUR                  X"R                  5      $ )Nexchange )Producerr4   r   ensurepublish)r   
connectionproducers      r   _producer_publishKombuManager._producer_publishh   sH    && @0@ @)-)>)>@  +;+;<<r   c                 n   Sn  U R                  U R                  5      nU" U R                  R                  U5      5        g ! [        [
        R                  R                  4 aL    U(       a#  U R                  5       R                  S5        Sn O%U R                  5       R                  S5         g f = fM  )NTz&Cannot publish to rabbitmq... retryingFz'Cannot publish to rabbitmq... giving up)
rG   r   r   dumpsOSErrorr   
exceptions
KombuError_get_loggererror)r   dataretryproducer_publishs       r   _publishKombuManager._publishm   s    #'#9#9--$/  !67U--889 $$&,, .8 9!E$$&,,AC s   <A AB2B21B2c              #   2  #    Sn  U R                  5       nU R                  5        nUR                  U5       n UR                  SS9nUR	                  5         UR
                  v   SnM1  ! , (       d  f       O= fS S S 5        O! , (       d  f       O= f! [        [        R                  R                  4 aV    U R                  5       R                  SR                  U5      5        [        R                  " U5        [        US-  S5      n Of = fGM  7f)Nr   T)blockz3Cannot receive from rabbitmq... retrying in {} secs   <   )r=   r   SimpleQueuegetackpayloadrK   r   rL   rM   rN   rO   formattimesleepmin)r   retry_sleepreader_queuerE   queuemessages         r   _listenKombuManager._listen   s     7#{{}%%':#//="&+iidi&;G#KKM")//1*+K	 # >= ('' U--889 7  "((**0&*=? 

;'!+/267 s]   D B B3A,,
A:	6B=B D
BB DB A7DDDD)r   r   r   r   r   r   )	z#amqp://guest:guest@localhost:5672//socketioFNNNNNN)__name__
__module____qualname____firstlineno____doc__namer   r#   r   r4   r=   rG   rS   re   __static_attributes____classcell__)r   s   @r   r   r      sK    *V D@IM;?6:7"2E7
D=
$7 7r   r   )r^   r9   r   ImportErrorpubsub_managerr   r   rA   r   r   <module>rr      s<      *D7= D7  Es     ++