
    iD                     d    d dl Z d dlZ	 d dlZn# e$ r dZY nw xY wddlmZ  G d de          ZdS )    N   )PubSubManagerc                   ^     e Zd ZdZdZ	 	 	 	 d fd	Z fdZd	 Zd
 Zd Z	d Z
d Zd Z xZS )KombuManagera	  Client manager that uses kombu for inter-process messaging.

    This class implements a client manager backend for event sharing across
    multiple processes, using RabbitMQ, Redis or any other messaging mechanism
    supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.

    To use a kombu backend, initialize the :class:`Server` instance as
    follows::

        url = 'amqp://user:password@hostname:port//'
        server = socketio.Server(client_manager=socketio.KombuManager(url))

    :param url: The connection URL for the backend messaging queue. Example
                connection URLs are ``'amqp://guest:guest@localhost:5672//'``
                and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
                respectively. Consult the `kombu documentation
                <http://kombu.readthedocs.org/en/latest/userguide                /connections.html#urls>`_ for more on how to construct
                connection URLs.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving. A write-only instance can be used
                       independently of the server to emit to clients from an
                       external process.
    :param logger: a custom logger to log it. If not given, the server logger
                   is used.
    :param json: An alternative JSON module to use for encoding and decoding
                 packets. Custom json modules must have ``dumps`` and ``loads``
                 functions that are compatible with the standard library
                 versions. This setting is only used when ``write_only`` is set
                 to ``True``. Otherwise the JSON module configured in the
                 server is used.
    :param connection_options: additional keyword arguments to be passed to
                               ``kombu.Connection()``.
    :param exchange_options: additional keyword arguments to be passed to
                             ``kombu.Exchange()``.
    :param queue_options: additional keyword arguments to be passed to
                          ``kombu.Queue()``.
    :param producer_options: additional keyword arguments to be passed to
                             ``kombu.Producer()``.
    kombu#amqp://guest:guest@localhost:5672//socketioFNc
                    t           t          d          t                                          ||||           || _        |pi | _        |pi | _        |pi | _        |	pi | _        | 	                                | _
        d S )NzLKombu package is not installed (Run "pip install kombu" in your virtualenv).)channel
write_onlyloggerjson)r   RuntimeErrorsuper__init__urlconnection_optionsexchange_optionsqueue_optionsproducer_options_connectionpublisher_connection)selfr   r   r   r   r   r   r   r   r   	__class__s             A/usr/local/lib/python3.11/dist-packages/socketio/kombu_manager.pyr   zKombuManager.__init__:   s     =  . / / / 	Z" 	 	$ 	$ 	$"4": 0 6B*0b 0 6B$($4$4$6$6!!!    c                 
   t                                                       d}| j        j        dk    rddlm}  |d          }nd| j        j        v rddlm}  |d          }|st          d| j        j        z             d S )	NTeventletr   )is_monkey_patchedsocketgevent)is_module_patchedz<Kombu requires a monkey patched socket library to work with )	r   
initializeserver
async_modeeventlet.patcherr   gevent.monkeyr"   r   )r   monkey_patchedr   r"   r   s       r   r#   zKombuManager.initializeK   s    ;!Z//::::::..x88NN///777777..x88N 	2+012 2 2	2 	2r   c                 :    t          j        | j        fi | j        S )N)r   
Connectionr   r   )r   s    r   r   zKombuManager._connectionZ   s     DDD,CDDDr   c                 n    ddd}|                     | j                   t          j        | j        fi |S )NfanoutF)typedurable)updater   r   Exchanger   )r   optionss     r   	_exchangezKombuManager._exchange]   s>    #66t,---~dl66g666r   c                     dt          t          j                              z   }dddid}|                    | j                   t          j        ||                                 fi |S )Nzpython-socketio.Fz	x-expiresi )r.   queue_arguments)struuiduuid4r/   r   r   Queuer2   )r   
queue_namer1   s      r   _queuezKombuManager._queueb   se    '#djll*;*;;
#f8MNNt)***{:t~~'7'7CC7CCCr   c                      |j         dd|                                 i| j        }|                    ||j                  S )Nexchange )Producerr2   r   ensurepublish)r   
connectionproducers      r   _producer_publishzKombuManager._producer_publishh   sU    &:& @ @0@0@ @)-)>@ @  8+;<<<r   c                 x   d}	 	 |                      | j                  } || j                            |                     d S # t          t
          j        j        f$ rY |r*|                                 	                    d           d}n*|                                 	                    d           Y d S Y nw xY w)NTz&Cannot publish to rabbitmq... retryingFz'Cannot publish to rabbitmq... giving up)
rC   r   r   dumpsOSErrorr   
exceptions
KombuError_get_loggererror)r   dataretryproducer_publishs       r   _publishzKombuManager._publishm   s    	#'#9#9-$/ $/   !6!6777U-89    $$&&,, .8 9 9 9!EE$$&&,,AC C CEE	 E		s   =A A.B87B8c              #   J  K   d}	 	 |                                  }|                                 5 }|                    |          5 }	 |                    d          }|                                 |j        V  d}6# 1 swxY w Y   	 d d d            n# 1 swxY w Y   n# t          t          j        j	        f$ rd | 
                                                    d                    |                     t          j        |           t          |dz  d          }Y nw xY w)Nr   T)blockz3Cannot receive from rabbitmq... retrying in {} secs   <   )r:   r   SimpleQueuegetackpayloadrF   r   rG   rH   rI   rJ   formattimesleepmin)r   retry_sleepreader_queuerA   queuemessages         r   _listenzKombuManager._listen   s     	77#{{}}%%'' ,:#//== ,,&+iidi&;&;G#KKMMM")/111*+K	,, , , , , , , , ,, , , , , , , , , , , , , , , U-89 7 7 7  ""((**0&*=*=? ? ? 
;'''!+/2667	7sS   (B! B8A==B	BB	B	B! BB! BB! !A<D D )	r   r	   FNNNNNN)__name__
__module____qualname____doc__namer   r#   r   r2   r:   rC   rN   r_   __classcell__)r   s   @r   r   r      s        * *V D@IM;?6:7 7 7 7 7 7"2 2 2 2 2E E E7 7 7
D D D= = =
  $7 7 7 7 7 7 7r   r   )rX   r6   r   ImportErrorpubsub_managerr   r   r=   r   r   <module>rh      s     LLLL   EEE * ) ) ) ) )D7 D7 D7 D7 D7= D7 D7 D7 D7 D7s    