
    i0                     V    S SK r S SKrS SKJr  S SKrSSKJr  SSKJr   " S S\5      r	g)    N)partial   )AsyncManager)Packetc                      ^  \ rS rSrSrSr  SU 4S jjrU 4S jr  SU 4S jjrU 4S jr	U 4S jr
SU 4S	 jjrU 4S
 jrSS jrS rS rU 4S jrS rS rS rU 4S jrU 4S jrU 4S jrS rSrU =r$ )AsyncPubSubManager
   at  Manage a client list attached to a pub/sub backend under asyncio.

This is a base class that enables multiple servers to share the list of
clients, with the servers communicating events through a pub/sub backend.
The use of a pub/sub backend also allows any client connected to the
backend to emit events addressed to Socket.IO clients.

The actual backends must be implemented by subclasses, this class only
provides a pub/sub generic framework for asyncio applications.

:param channel: The channel name on which the server sends and receives
                notifications.
:param write_only: If set to ``True``, only initialize to emit events. The
                   default of ``False`` initializes the class for emitting
                   and receiving. A write-only instance can be used
                   independently of the server to emit to clients from an
                   external process.
:param logger: a custom logger to log it. If not given, the server logger
               is used.
:param json: An alternative JSON module to use for encoding and decoding
             packets. Custom json modules must have ``dumps`` and ``loads``
             functions that are compatible with the standard library
             versions. This setting is only used when ``write_only`` is set
             to ``True``. Otherwise the JSON module configured in the
             server is used.
asyncpubsubc                    > [         TU ]  5         Xl        X l        [        R
                  " 5       R                  U l        X0l        Ub  X@l	        g g N)
super__init__channel
write_onlyuuiduuid4hexhost_idloggerjson)selfr   r   r   r   	__class__s        Z/home/admin/cozy_coffee/venv/lib/python3.13/site-packages/socketio/async_pubsub_manager.pyr   AsyncPubSubManager.__init__'   s@    $zz|''I     c                    > [         TU ]  5         U R                  (       d*  U R                  R	                  U R
                  5      U l        U R                  5       R                  U R                  S-   5        g )Nz backend initialized.)
r   
initializer   serverstart_background_task_threadthread_get_loggerinfoname)r   r   s    r   r   AsyncPubSubManager.initialize1   sP    ++;;DLLIDK		,C CDr   c           
        >#    U=(       d    UnUR                  S5      (       a  [        TU ]	  XX4UUS9I Sh  vN $ U=(       d    SnUb<  U R                  c  [	        S5      eUc  [        S5      eU R                  XF5      n	XCU	4nOSn[        U[        5      (       a  [        U5      nOU/n[        R                  " U5      n
U
(       aP  [        R                  " U5      u  p+U/U Vs/ s H'  n[        R                  " U5      R                  5       PM)     snQnSXXUXVU R                   S.	nU R#                  U5      I Sh  vN   U R%                  U5      I Sh  vN   g GN(s  snf  N& N7f)	a*  Emit a message to a single client, a room, or all the clients
connected to the namespace.

This method takes care or propagating the message to all the servers
that are connected through the message queue.

The parameters are the same as in :meth:`.Server.emit`.

Note: this method is a coroutine.
ignore_queue	namespaceroomskip_sidcallbackN/z:Callbacks can only be issued from the context of a server.z'Cannot use callback without a room set.emit)	methodeventdatabinaryr)   r*   r+   r,   r   )getr   r.   r   RuntimeError
ValueError_generate_ack_id
isinstancetuplelistr   data_is_binarydeconstruct_binarybase64	b64encodedecoder   _handle_emit_publish)r   r0   r1   r)   r*   r+   r,   tokwargsidr2   attachmentsamessager   s                 r   r.   AsyncPubSubManager.emit7   sl     zT::n%%yh! & # # # $	{{"" $: ; ;| !JKK&&t6B,HHdE"":D6D&&t, & 9 9$ ?DO+N+QF,,Q/668+NOD#e#T'"ll, (((mmG$$$7#* O
 	)$s@   5E,E B/E,(.E#+E,E(E,E*E,#E,*E,c                    >#    U R                  X5      (       a  [        TU ]	  X5      I S h  vN $ U R                  SUU=(       d    SU R                  S.5      I S h  vN   g  N6 N7f)N
disconnectr-   r/   sidr)   r   )is_connectedr   can_disconnectr@   r   )r   rK   r)   r   s      r   rM   !AsyncPubSubManager.can_disconnectb   se     S,,/??? --<.7.>3,0LL!: ; ; ; @;s!   )A'A#0A'A%A'%A'c                   >#    UR                  S5      (       a  [        TU ]	  XS9I S h  vN $ SUU=(       d    SU R                  S.nU R	                  U5      I S h  vN   U R                  U5      I S h  vN   g  NQ N  N	7f)Nr'   )r)   rI   r-   rJ   )r3   r   rI   r   _handle_disconnectr@   )r   rK   r)   rB   rF   r   s        r   rI   AsyncPubSubManager.disconnectl   s     ::n%%+ , * * *)# ) 0ST\\K%%g...mmG$$$* 	/$s3   'BA<2BA>B6B 7B>B Bc                    >#    U R                  X5      (       a  [        TU ]	  XUUS9I S h  vN $ SXU=(       d    SU R                  S.nU R	                  U5      I S h  vN   g  N8 N7f)N)eio_sid
enter_roomr-   r/   rK   r*   r)   r   )rL   r   rT   r   r@   )r   rK   r)   r*   rS   rF   r   s         r   rT   AsyncPubSubManager.enter_roomu   ss     S,,+CD4; , = = = ".c$-$4OG--(((=
 )s!   )A)A%2A)A' A)'A)c                    >#    U R                  X5      (       a  [        TU ]	  XU5      I S h  vN $ SXU=(       d    SU R                  S.nU R	                  U5      I S h  vN   g  N8 N7f)N
leave_roomr-   rU   )rL   r   rX   r   r@   )r   rK   r)   r*   rF   r   s        r   rX   AsyncPubSubManager.leave_room   sd     S,,+CDAAA!-c$-$4OG--(((	 B )s!   *A*A&2A* A(!A*(A*c                    #    SUU=(       d    SU R                   S.nU R                  U5      I S h  vN   U R                  U5      I S h  vN   g  N N7f)N
close_roomr-   )r/   r*   r)   r   )r   _handle_close_roomr@   )r   r*   r)   rF   s       r   r[   AsyncPubSubManager.close_room   sN     )4 ) 0ST\\K%%g...mmG$$$ 	/$s!   /AAA
AAAc                     #    [        S5      e7f)zPublish a message on the Socket.IO channel.

This method needs to be implemented by the different subclasses that
support pub/sub backends.
.This method must be implemented in a subclass.NotImplementedError)r   r1   s     r   r@   AsyncPubSubManager._publish   s      " #. / 	/   c                     #    [        S5      e7f)zReturn the next message published on the Socket.IO channel,
blocking until a message is available.

This method needs to be implemented by the different subclasses that
support pub/sub backends.
r_   r`   )r   s    r   _listenAsyncPubSubManager._listen   s      " #. / 	/rc   c           	      r  >#    UR                  S5      nUR                  S5      nUb'  [        U5      S:X  a  [        U R                  U/UQ76 nOS nUS   nUR                  S5      (       aC  USS   Vs/ s H  n[        R
                  " U5      PM     nn[        R                  " US   U5      n[        U[        5      (       a   [        U5      S:X  a  US   nO[        U5      n[        TU ]1  US   UUR                  S	5      UR                  S
5      UR                  S5      US9I S h  vN   g s  snf  N
7f)Nr,   r      r1   r2   r   r   r0   r)   r*   r+   r(   )r3   lenr   _return_callbackr<   	b64decoder   reconstruct_binaryr7   r9   r8   r   r.   )	r   rF   remote_callbackremote_host_idr,   r1   rE   rD   r   s	           r   r?   AsyncPubSubManager._handle_emit   s,     "++j1 Y/&3+?1+Dt44n 1 /1H Hv;;x  8<QRA16++A.KA,,T!WkBDdD!!4yA~AwT{gl77+T%,[[%= 'F 3$+KK
$;$,	  . 	. 	. B	.s   A1D74 D0BD7*D5+D7c                    #    U R                   UR                  S5      :X  a+   US   nUS   nUS   nU R                  X#U5      I S h  vN   g g ! [         a     g f = f N7f)Nr   rK   rC   args)r   r3   KeyErrortrigger_callback)r   rF   rK   rC   rq   s        r   _handle_callback#AsyncPubSubManager._handle_callback   sn     <<7;;y11enT]v ''666 2
  6s3    A A A AA 
AA AA c           	         #    XR                   :X  a  U R                  X$U5      I S h  vN   g U R                  SUX#XES.5      I S h  vN   g  N$ N7f)Nr,   )r/   r   rK   r)   rC   rq   )r   rs   r@   )r   r   rK   r)   callback_idrq   s         r   rj   #AsyncPubSubManager._return_callback   s\      ll"''$???--:'(+'2!B C C C @Cs!   %AAAAAAc                    #    U R                   R                  UR                  S5      UR                  S5      SS9I S h  vN   g  N7f)NrK   r)   T)rK   r)   r'   )r   rI   r3   )r   rF   s     r   rP   %AsyncPubSubManager._handle_disconnect   s@     kk$$U);/6{{;/G26 % 8 	8 	8s   =AA Ac                    >#    UR                  S5      nUR                  S5      nU R                  X#5      (       a(  [        TU ]  X#UR                  S5      5      I S h  vN   g g  N7fNrK   r)   r*   )r3   rL   r   rT   r   rF   rK   r)   r   s       r   _handle_enter_room%AsyncPubSubManager._handle_enter_room   Y     kk% KK,	S,,'$SW[[5HIII -I   AA'A%A'c                    >#    UR                  S5      nUR                  S5      nU R                  X#5      (       a(  [        TU ]  X#UR                  S5      5      I S h  vN   g g  N7fr|   )r3   rL   r   rX   r}   s       r   _handle_leave_room%AsyncPubSubManager._handle_leave_room   r   r   c                 z   >#    [         TU ]  UR                  S5      UR                  S5      S9I S h  vN   g  N7f)Nr*   r)   )r*   r)   )r   r[   r3   )r   rF   r   s     r   r\   %AsyncPubSubManager._handle_close_room   s<     g gkk&&9+2;;{+C ! E 	E 	Es   0;9;c                   #      U R                  5         S h  vN nS n[        U[        5      (       a  UnO U R                  R	                  U5      nU(       d  MH  SU;   d  MP  U R                  5       R                  SR                  US   5      5         US   S:X  a  U R                  U5      I S h  vN   M  UR                  S5      U R                  :w  a  US   S:X  a  U R                  U5      I S h  vN   M  US   S:X  a  U R                  U5      I S h  vN   GM  US   S:X  a  U R                  U5      I S h  vN   GM3  US   S:X  a  U R                  U5      I S h  vN   GMX  US   S	:X  a  U R                  U5      I S h  vN   GM}  GM  GM   GN!    GNL= f N N N Nh NE N"! [         R"                   a    e [$         a*    U R&                  R(                  R+                  S
5         GM  f = f
 U R&                  R(                  R-                  S5        g ! [         R"                   a     g [$         a(    U R&                  R(                  R+                  S5         Of = fGMj  7f)Nr/   zpubsub message: {}r,   r   r.   rI   rT   rX   r[   z(Handler error in pubsub listening threadz#pubsub listen() exited unexpectedlyz+Unexpected Error in pubsub listening thread)re   r7   dictr   loadsr"   debugformatrt   r3   r   r?   rP   r~   r   r\   asyncioCancelledError	Exceptionr   r   	exceptionerror)r   rF   r1   s      r   r    AsyncPubSubManager._thread   sD    &A%)\\^ L'D!'400&!#'99??7#;D tD 0((*001E1L1L N2, -L#H~;&*&;&;D&A A A!%)!4!D#'>V#;*.*;*;D*A$A$A%)(^|%C*.*A*A$*G$G$G%)(^|%C*.*A*A$*G$G$G%)(^|%C*.*A*A$*G$G$G%)(^|%C*.*A*A$*G$G$G &D "EL!  !B %B$G$G$G$G&55 "!( L KK..88 JL LL9 &4> ""(()NO))  A"",, .@ AAK s>  I.H! G:FG:H! FH! H! %1H! F/4F#5F/9H! ;<F/7F%8F/<H! >F/F'F/ H! #F/ F)F/H! F/%F+&F/*H! -F/
F-F/	H! G:F H! #F/%F/'F/)F/+F/-F//AG72H! 6G77)H!  I.!I(5I.7.I(%I.'I((I.)r   r   r   r   r!   r   )socketioFNN)NNNNNr   )__name__
__module____qualname____firstlineno____doc__r$   r   r   r.   rM   rI   rT   rX   r[   r@   re   r?   rt   rj   rP   r~   r   r\   r    __static_attributes____classcell__)r   s   @r   r   r   
   s    4 DDHE KO%))%V;%))%//.47	C8
JJE(A (Ar   r   )
r   r<   	functoolsr   r   async_managerr   packetr   r    r   r   <module>r      s'        ' CA CAr   