
    i-                     V    d dl Z d dlmZ d dlZddlmZ ddlmZ  G d de          ZdS )    N)partial   )Manager)Packetc                        e Zd ZdZdZ	 	 d fd	Z fdZ	 	 d fd	Z fd	Zd fd
	Z	d fd	Z
 fdZddZd Zd Z fdZd Zd Zd Z fdZ fdZ fdZd Z xZS )PubSubManagera  Manage a client list attached to a pub/sub backend.

    This is a base class that enables multiple servers to share the list of
    clients, with the servers communicating events through a pub/sub backend.
    The use of a pub/sub backend also allows any client connected to the
    backend to emit events addressed to Socket.IO clients.

    The actual backends must be implemented by subclasses, this class only
    provides a pub/sub generic framework.

    :param channel: The channel name on which the server sends and receives
                    notifications.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving. A write-only instance can be used
                       independently of the server to emit to clients from an
                       external process.
    :param logger: a custom logger to log it. If not given, the server logger
                   is used.
    :param json: An alternative JSON module to use for encoding and decoding
                 packets. Custom json modules must have ``dumps`` and ``loads``
                 functions that are compatible with the standard library
                 versions. This setting is only used when ``write_only`` is set
                 to ``True``. Otherwise the JSON module configured in the
                 server is used.
    pubsubsocketioFNc                     t                                                       || _        || _        t	          j                    j        | _        || _        |	|| _	        d S d S N)
super__init__channel
write_onlyuuiduuid4hexhost_idloggerjson)selfr   r   r   r   	__class__s        B/usr/local/lib/python3.11/dist-packages/socketio/pubsub_manager.pyr   zPubSubManager.__init__&   sW    $z||'DIII     c                     t                                                       | j        s$| j                            | j                  | _        |                                                     | j	        dz              d S )Nz backend initialized.)
r   
initializer   serverstart_background_task_threadthread_get_loggerinfoname)r   r   s    r   r   zPubSubManager.initialize0   sk     	J+;;DLIIDK	,C CDDDDDr   c           
      \   |p|}|                     d          r't                                          ||||||          S |pd}|C| j        t	          d          |t          d          |                     ||          }	|||	f}nd}t          |t                    rt          |          }n|g}t          j        |          }
|
r&t          j        |          \  }}|gd |D             }d|||
||||| j        d		}|                     |           |                     |           dS )
a/  Emit a message to a single client, a room, or all the clients
        connected to the namespace.

        This method takes care or propagating the message to all the servers
        that are connected through the message queue.

        The parameters are the same as in :meth:`.Server.emit`.
        ignore_queue	namespaceroomskip_sidcallback/Nz:Callbacks can only be issued from the context of a server.z'Cannot use callback without a room set.c                 Z    g | ](}t          j        |                                          )S  )base64	b64encodedecode.0as     r   
<listcomp>z&PubSubManager.emit.<locals>.<listcomp>W   s/    NNNQF,Q//6688NNNr   emit)	methodeventdatabinaryr'   r(   r)   r*   r   )getr   r5   r   RuntimeError
ValueError_generate_ack_id
isinstancetuplelistr   data_is_binarydeconstruct_binaryr   _handle_emit_publish)r   r7   r8   r'   r(   r)   r*   tokwargsidr9   attachmentsmessager   s                r   r5   zPubSubManager.emit6   s~    zT::n%% 	#77<<tyth!   # # # $	{"" $: ; ; ;| !JKKK&&tX66Bi,HHHdE"" 	::DD6D&t,, 	P & 9$ ? ?D+ONN+NNNOD#eT#)T'X"l, , 	'"""gr   c                     |                      ||          r"t                                          ||          S d||pd| j        d}|                     |           |                     |           d S )N
disconnectr+   r6   sidr'   r   )is_connectedr   can_disconnectr   _handle_disconnectrD   )r   rM   r'   rI   r   s       r   rO   zPubSubManager.can_disconnect_   s    S),, 	#77))#y999 ".c$-$4O OG##G,,,MM'"""""r   c                     |                     d          r#t                                          ||          S d||pd| j        d}|                     |           |                     |           d S )Nr%   )r'   rK   r+   rL   )r:   r   rK   r   rP   rD   )r   rM   r'   rF   rI   r   s        r   rK   zPubSubManager.disconnectj   s    ::n%% 	@77%%cY%???)# ) 0ST\K K(((gr   c                     |                      ||          r%t                                          ||||          S d|||pd| j        d}|                     |           d S )N)eio_sid
enter_roomr+   r6   rM   r(   r'   r   )rN   r   rT   r   rD   )r   rM   r'   r(   rS   rI   r   s         r   rT   zPubSubManager.enter_roomr   su    S),, 	#77%%c9dG%LLL!-c4$-$4O OGMM'"""""r   c                     |                      ||          r#t                                          |||          S d|||pd| j        d}|                     |           d S )N
leave_roomr+   rU   )rN   r   rW   r   rD   )r   rM   r'   r(   rI   r   s        r   rW   zPubSubManager.leave_room{   sp    S),, 	#77%%c9d;;;!-c4$-$4O OGMM'"""""r   c                 v    d||pd| j         d}|                     |           |                     |           d S )N
close_roomr+   )r6   r(   r'   r   )r   _handle_close_roomrD   )r   r(   r'   rI   s       r   rY   zPubSubManager.close_room   sM    )4 ) 0ST\K K(((gr   c                      t          d          )zPublish a message on the Socket.IO channel.

        This method needs to be implemented by the different subclasses that
        support pub/sub backends.
        .This method must be implemented in a subclass.NotImplementedError)r   r8   s     r   rD   zPubSubManager._publish   s     " #. / / 	/r   c                      t          d          )zReturn the next message published on the Socket.IO channel,
        blocking until a message is available.

        This method needs to be implemented by the different subclasses that
        support pub/sub backends.
        r\   r]   )r   s    r   _listenzPubSubManager._listen   s     " #. / / 	/r   c           	         |                     d          }|                     d          }|'t          |          dk    rt          | j        |g|R  }nd }|d         }|                     d          r/d |dd          D             }t	          j        |d         |          }t          |t                    r+t          |          dk    r	|d         }nt          |          }t                      
                    |d	         ||                     d
          |                     d          |                     d          |           d S )Nr*   r      r8   r9   c                 6    g | ]}t          j        |          S r-   )r.   	b64decoder1   s     r   r4   z.PubSubManager._handle_emit.<locals>.<listcomp>   s#    AAA16+A..AAAr   r   r   r7   r'   r(   r)   r&   )r:   lenr   _return_callbackr   reconstruct_binaryr>   r@   r?   r   r5   )r   rI   remote_callbackremote_host_idr*   r8   rH   r   s          r   rC   zPubSubManager._handle_emit   sW    "++j11 Y//&3+?+?1+D+Dt4n 1 /1 1 1HH Hv;;x   	CAAQRRAAAK,T!WkBBDdD!! 	#4yyA~~AwT{{WW%t&{{;77!++f--%kk*55 	 	J 	J 	J 	J 	Jr   c                     | j         |                    d          k    rD	 |d         }|d         }|d         }n# t          $ r Y d S w xY w|                     |||           d S d S )Nr   rM   rG   args)r   r:   KeyErrortrigger_callback)r   rI   rM   rG   rk   s        r   _handle_callbackzPubSubManager._handle_callback   s    <7;;y1111enT]v   !!#r400000 21s   9 
AAc           	          || j         k    r|                     |||           d S |                     d|||||d           d S )Nr*   )r6   r   rM   r'   rG   rk   )r   rm   rD   )r   r   rM   r'   callback_idrk   s         r   rf   zPubSubManager._return_callback   sg     dl""!!#{D99999MMZG"%I!,d< < = = = = =r   c                     | j                             |                    d          |                    d          d           d S )NrM   r'   T)rM   r'   r%   )r   rK   r:   )r   rI   s     r   rP   z PubSubManager._handle_disconnect   sK    7;;u#5#5)0[)A)A,0 	 	2 	2 	2 	2 	2r   c                     |                     d          }|                     d          }|                     ||          r8t                                          |||                     d                     d S d S NrM   r'   r(   )r:   rN   r   rT   r   rI   rM   r'   r   s       r   _handle_enter_roomz PubSubManager._handle_enter_room   x    kk%  KK,,	S),, 	DGGsIw{{6/B/BCCCCC	D 	Dr   c                     |                     d          }|                     d          }|                     ||          r8t                                          |||                     d                     d S d S rs   )r:   rN   r   rW   rt   s       r   _handle_leave_roomz PubSubManager._handle_leave_room   rv   r   c                     t                                          |                    d          |                    d                     d S )Nr(   r'   )r(   r'   )r   rY   r:   )r   rI   r   s     r   rZ   z PubSubManager._handle_close_room   sK    F 3 3%,[[%=%= 	 	? 	? 	? 	? 	?r   c                    	 	 |                                  D ]}d }t          |t                    r|}n#	 | j                            |          }n#  Y nxY w|rad|v r\|                                                     d                    |d                              	 |d         dk    r|                     |           n|	                    d          | j
        k    r|d         dk    r|                     |           n|d         dk    r|                     |           ne|d         dk    r|                     |           nC|d         d	k    r|                     |           n!|d         d
k    r|                     |           t# t           $ r# | j        j                            d           Y w xY w| j        j                            d           d S # t           $ r" | j        j                            d           Y nw xY w)NTr6   zpubsub message: {}r*   r   r5   rK   rT   rW   rY   z(Handler error in pubsub listening threadz#pubsub listen() exited unexpectedlyz+Unexpected Error in pubsub listening thread)r`   r>   dictr   loadsr!   debugformatrn   r:   r   rC   rP   ru   rx   rZ   	Exceptionr   r   	exceptionerror)r   rI   r8   s      r   r   zPubSubManager._thread   s   #	A"A#||~~ L LGD!'400 !&!#'9??7#;#;DD! D LD 0 0((**001E1L1L N2, 2, - - -L#H~;; $ 5 5d ; ; ; ;!%)!4!4!D!D#'>V#;#;$($5$5d$;$;$;$;%)(^|%C%C$($;$;D$A$A$A$A%)(^|%C%C$($;$;D$A$A$A$A%)(^|%C%C$($;$;D$A$A$A$A%)(^|%C%C$($;$;D$A$A$A( L L L K.88 JL L L L LL "(()NOOO A A A",, .@ A A A A AAC#	AsN   1G AG AAG  C)F	G )F84G 7F88$G )H
	H
)r
   FNN)NNNNNr   )__name__
__module____qualname____doc__r#   r   r   r5   rO   rK   rT   rW   rY   rD   r`   rC   rn   rf   rP   ru   rx   rZ   r   __classcell__)r   s   @r   r   r   	   s        4 DDH     E E E E E EI#' ' ' ' ' 'R	# 	# 	# 	# 	#     # # # # # ## # # # #   / / // / /J J J J J21 1 1= = =2 2 2
D D D D DD D D D D? ? ? ? ?$A $A $A $A $A $A $Ar   r   )	r.   	functoolsr   r   managerr   packetr   r   r-   r   r   <module>r      s                       zA zA zA zA zAG zA zA zA zA zAr   