
    i*                     |    d dl Z 	 d dlZn# e$ r dZY nw xY wddlmZ  e j        d          Z G d de          ZdS )    N   )PubSubManagersocketioc                   >     e Zd ZdZdZ	 	 d fd	Zd Zd	 Zd
 Z xZ	S )KafkaManagera}  Kafka based client manager.

    This class implements a Kafka backend for event sharing across multiple
    processes.

    To use a Kafka backend, initialize the :class:`Server` instance as
    follows::

        url = 'kafka://hostname:port'
        server = socketio.Server(client_manager=socketio.KafkaManager(url))

    :param url: The connection URL for the Kafka server. For a default Kafka
                store running on the same host, use ``kafka://``. For a highly
                available deployment of Kafka, pass a list with all the
                connection URLs available in your cluster.
    :param channel: The channel name (topic) on which the server sends and
                    receives notifications. Must be the same in all the
                    servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving. A write-only instance can be used
                       independently of the server to emit to clients from an
                       external process.
    :param logger: a custom logger to log it. If not given, the server logger
                   is used.
    :param json: An alternative JSON module to use for encoding and decoding
                 packets. Custom json modules must have ``dumps`` and ``loads``
                 functions that are compatible with the standard library
                 versions. This setting is only used when ``write_only`` is set
                 to ``True``. Otherwise the JSON module configured in the
                 server is used.
    kafkakafka://localhost:9092r   FNc                 \   t           t          d          t                                          ||||           t	          |t
                    r|gn|}d |D             | _        t          j        | j                  | _        t          j	        | j
        | j                  | _        d S )NzZkafka-python package is not installed (Run "pip install kafka-python" in your virtualenv).)channel
write_onlyloggerjsonc                 2    g | ]}|d k    r
|dd         ndS )zkafka://   Nzlocalhost:9092 ).0urls     A/usr/local/lib/python3.11/dist-packages/socketio/kafka_manager.py
<listcomp>z)KafkaManager.__init__.<locals>.<listcomp>;   s?     , , ," '*Z&7&73qrr77=M , , ,    )bootstrap_servers)r   RuntimeErrorsuper__init__
isinstancestr
kafka_urlsKafkaProducerproducerKafkaConsumerr   consumer)selfr   r   r   r   r   urls	__class__s          r   r   zKafkaManager.__init__0   s    =  . / / / 	Z" 	 	$ 	$ 	$ #3,,5uu#, ,&*, , ,+doNNN+DL>BoO O Or   c                     | j                             | j        | j                            |                     | j                                          d S )N)value)r   sendr   r   dumpsflush)r"   datas     r   _publishzKafkaManager._publishA   sG    4<tyt/D/DEEEr   c              #   $   K   | j         E d {V  d S N)r!   )r"   s    r   _kafka_listenzKafkaManager._kafka_listenE   s&      =         r   c              #   j   K   |                                  D ]}|j        | j        k    r	|j        V  d S r-   )r.   topicr   r&   )r"   messages     r   _listenzKafkaManager._listenH   sI      ))++ 	$ 	$G},,m###	$ 	$r   )r	   r   FNN)
__name__
__module____qualname____doc__namer   r+   r.   r2   __classcell__)r$   s   @r   r   r      s         @ D=G59O O O O O O"  ! ! !$ $ $ $ $ $ $r   r   )loggingr   ImportErrorpubsub_managerr   	getLoggerr   r   r   r   r   <module>r=      s    LLLL   EEE * ) ) ) ) )		:	&	&>$ >$ >$ >$ >$= >$ >$ >$ >$ >$s    