Kafka integration ================= ligo-gracedb includes a Kafka producer and consumer that authenticate with `SciTokens `_ over the OAUTHBEARER SASL mechanism. These classes are designed for long-running analysis pipelines that need to publish or consume GraceDB events in real time. .. note:: The Kafka classes require the ``confluent-kafka`` C library. Install the optional dependency with:: pip install ligo-gracedb[kafka] Authentication -------------- Both the producer and the consumer discover a SciToken automatically using `igwn-auth-utils `_. The token is located following the `WLCG Bearer Token Discovery `_ specification: #. ``$BEARER_TOKEN`` environment variable (token content) #. ``$BEARER_TOKEN_FILE`` environment variable (path to file) #. ``$XDG_RUNTIME_DIR/bt_u`` or ``/tmp/bt_u`` If ``igwn-auth-utils`` cannot match the required audience or scope, a direct file read of the above paths is attempted as a fallback. You can obtain a fresh SciToken with ``htgettoken``:: htgettoken -a vault.ligo.org -i igwn By default, both classes set ``reload_cred=True``, which installs a :py:class:`~ligo.gracedb.kafka.auth.SciTokenRefreshManager` that monitors the token's ``exp`` claim and re-reads the token from disk before it expires. This mirrors the ``reload_cred`` behaviour of the REST client and is essential for long-running processes. .. note:: In addition to having a valid, non-expired SciToken, a user must have the ``gracedb.read`` scope in order to publish to Kafka. On event submission, GraceDB's existing per-user/per-pipeline permission model will control which users can publish to individual pipelines. Once `changes in GraceDB scopes `_ are implemented, access to pipeline-specific topics will be controlled on a per-user basis. Write access to the ``test`` topic will remain available to all LVK users with the ``gracedb.read`` scope. Topic namespacing ----------------- Kafka topics are **instance-namespaced**: each topic is prefixed with the GraceDB instance name so that multiple GraceDB deployments (development, test, playground, production) can share the same broker without message collisions. The format is ``.``, for example: - ``gracedb-dev.gstlal`` - ``gracedb-test.cwb`` - ``gracedb.aframe`` The instance name is derived automatically from the GraceDB service URL (using the first component of the hostname). For example, ``https://gracedb-dev.ligo.org/api/`` yields instance name ``gracedb-dev``. Both the producer and consumer handle namespacing transparently: you pass pipeline names (e.g. ``"gstlal"``) and the classes prepend the instance name for you. You can supply the instance identity in two ways: - *service_url* — the GraceDB service URL; the instance name is extracted automatically. - *instance_name* — an explicit instance name string. Exactly one of these must be provided. Three utility functions are available for working with namespaced topics directly: .. code-block:: python from ligo.gracedb.kafka import ( derive_instance_name, make_topic_name, parse_topic_name, ) derive_instance_name("https://gracedb-dev.ligo.org/api/") # => "gracedb-dev" make_topic_name("gracedb-dev", "gstlal") # => "gracedb-dev.gstlal" parse_topic_name("gracedb-dev.gstlal") # => ("gracedb-dev", "gstlal") Notes on Downstream Consumers ----------------------------- Topics on the GraceDB event submission Kafka broker correspond to approved and testing `pipelines `_ on GraceDB. A ``test`` topic is available to submit events to the ``Test`` group. Please see the above note regarding topic permissions. Uploading a GW trigger via Kafka does not directly insert a new event into GraceDB, as was the use case of the ``create_event`` REST API call. Rather, publishing an event to Kafka allows the simultaneous processing of GW data products by multiple downstream processes. Ahead of IR1, a dedicated service called the `gracedb-kafka-supervisor `_ is running in AWS alongside GraceDB. This application consumes Kafka messages and injects event triggers and data products into GraceDB. Please see the section below on :ref:`kafka:Skipping server-side processing` to control how events are handled by the ``gracedb-kafka-supervisor``. As the IR1 and beyond low-latency alert infrastructure evolves, `follow-up pipelines `_ may opt to delay uploading g-events until after time-critical periods of the alert generation process. Producer -------- A :py:class:`~ligo.gracedb.kafka.GraceDbKafkaProducer` wraps a ``confluent_kafka.Producer`` with SciToken authentication and sensible defaults for GraceDB usage. Creating a producer ~~~~~~~~~~~~~~~~~~~ .. code-block:: python import certifi from ligo.gracedb.kafka import GraceDbKafkaProducer producer = GraceDbKafkaProducer( bootstrap_servers="kafka-dev.ligo.org:9092", topic="test", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path=certifi.where(), ) The ``topic`` argument is the pipeline name (e.g. ``"test"``, ``"gstlal"``). It is automatically namespaced with the instance name derived from ``service_url``, so the actual Kafka topic above would be ``gracedb-dev.test``. See :ref:`kafka:Topic namespacing` for details. During development, when the broker's TLS certificate does not match its hostname, pass ``skip_hostname_verification=True``. Producing messages ~~~~~~~~~~~~~~~~~~ The :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.produce` method accepts a ``dict``, ``str``, or ``bytes`` value. Dicts are serialised to JSON automatically: .. code-block:: python producer.produce({"graceid": "G12345", "group": "CBC"}) producer.flush() # wait for delivery confirmation producer.close() # flush remaining messages and release resources The producer also works as a context manager: .. code-block:: python with GraceDbKafkaProducer( bootstrap_servers="kafka-dev.ligo.org:9092", topic="test", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path=certifi.where(), ) as producer: producer.produce({"graceid": "G12345", "group": "CBC"}) Testing connectivity ~~~~~~~~~~~~~~~~~~~~ Call :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.ping` to verify end-to-end connectivity. This produces a small test message, waits for delivery confirmation, and returns the delivery metadata: .. code-block:: python result = producer.ping(topic="test") print(result) # {'topic': 'gracedb-test', 'partition': 0, 'offset': 42, # 'message': {'type': 'ping', ...}} Uploading events with Kafka --------------------------- The preferred way to upload events through Kafka is the two-step **reserve + produce** workflow provided by :py:meth:`~ligo.gracedb.rest.GraceDb.create_event`. Passing a producer to its ``kafka`` parameter causes ``create_event`` to: #. **Reserve a graceid** on the GraceDB server via an HTTP call to the reservations endpoint. This returns a ``graceid`` and ``submitter`` before any event data is transmitted. #. **Produce a Kafka message** containing the reserved ``graceid``, the base64-encoded event file, pipeline metadata, and optional labels. The GraceDB server-side consumer receives this message and completes the upload. This approach decouples the latency-sensitive part of event submission (obtaining a ``graceid``) from the bulk data transfer, and provides an optional HTTP fallback if Kafka delivery fails. .. note:: GraceDB will not dispatch an ``igwn-alert`` when a graceid is reserved. Once a reserved event is uploaded, then GraceDB will dispatch a "new"-type ``igwn-alert``. Basic example ~~~~~~~~~~~~~ .. code-block:: python import certifi from ligo.gracedb.rest import GraceDb from ligo.gracedb.kafka import GraceDbKafkaProducer client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2') producer = GraceDbKafkaProducer( bootstrap_servers="kafka-dev.ligo.org:9092", service_url="https://gracedb-test.ligo.org/api/", ca_cert_path=certifi.where(), ) response = client.create_event( "CBC", "gstlal", "coinc.xml", search="AllSky", kafka=producer, ) # Message is published to gracedb-test.gstlal print(response.json()["graceid"]) # e.g. "G123456" producer.close() In the above example, by not specifying ``topic`` in the ``GraceDbKafkaProducer``, the producer publishes to a topic that is determined by the ``group`` and ``pipeline`` arguments passed to ``create_event``. Events in the ``Test`` group are routed to ``{instance}.test``; all other groups are routed to ``{instance}.{pipeline}`` (lowercased). ``create_event`` automatically flushes the producer after publishing, so the message is guaranteed to be delivered (or an error raised) before the call returns. The flush timeout defaults to 10 seconds and can be configured via the ``flush_timeout`` parameter on the producer:: producer = GraceDbKafkaProducer( bootstrap_servers="kafka-dev.ligo.org:9092", service_url="https://gracedb-test.ligo.org/api/", ca_cert_path=certifi.where(), flush_timeout=30, # wait up to 30s for delivery ) Using the convenience factory ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:meth:`~ligo.gracedb.rest.GraceDb.create_kafka_producer` method on the client creates a pre-configured producer. The client's service URL is passed automatically, so topics are namespaced without any extra arguments: .. code-block:: python client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2') producer = client.create_kafka_producer( bootstrap_servers="kafka-dev.ligo.org:9092", topic="test", ca_cert_path=certifi.where(), ) HTTP fallback ~~~~~~~~~~~~~ If ``http_fallback=True`` and the Kafka path fails (reservation failure or produce failure), ``create_event`` automatically falls back to the standard HTTP POST: .. code-block:: python response = client.create_event( "CBC", "gstlal", "coinc.xml", kafka=producer, http_fallback=True, ) When a graceid has already been reserved but the Kafka produce fails, the fallback uses :py:meth:`~ligo.gracedb.rest.GraceDb.upload_reserved_event` to complete the upload over HTTP. Skipping server-side processing ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Two boolean flags control what the server does with the Kafka message: - ``gracedb_upload=False`` — publish the event to Kafka *without* reserving a graceid or processing it on the server. Useful when a downstream consumer other than GraceDB handles the message. - ``skip_supervisor=False`` — when ``True``, the server-side supervisor will not process the message (i.e. will not call ``upload_reserved_event``). Consumer -------- A :py:class:`~ligo.gracedb.kafka.GraceDbKafkaConsumer` wraps a ``confluent_kafka.Consumer`` with SciToken authentication. Live streaming ~~~~~~~~~~~~~~ By default the consumer generates an ephemeral group ID, reads from the end of the topic (``auto.offset.reset=latest``), and auto-commits offsets. This means it only sees messages produced *after* it starts: .. code-block:: python import certifi from ligo.gracedb.kafka import GraceDbKafkaConsumer with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path=certifi.where(), ) as consumer: for msg in consumer.stream(): print(msg["value"]) Pressing ``Ctrl-C`` stops the stream cleanly; the consumer installs ``SIGINT`` and ``SIGTERM`` handlers that cause the current poll to finish and the generator to exit. Persisting read position ~~~~~~~~~~~~~~~~~~~~~~~~ Supply a stable ``group_id`` to resume from the last committed offset across restarts: .. code-block:: python import certifi with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", service_url="https://gracedb-dev.ligo.org/api/", group_id="my-pipeline-consumer", ca_cert_path=certifi.where(), ) as consumer: for msg in consumer.stream(): process(msg) Replaying history ~~~~~~~~~~~~~~~~~ Combine a fresh ``group_id`` with ``auto_offset_reset="earliest"`` to replay all messages from the beginning of the topic: .. code-block:: python import certifi with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", service_url="https://gracedb-dev.ligo.org/api/", group_id="my-replay-group", auto_offset_reset="earliest", ca_cert_path=certifi.where(), ) as consumer: for msg in consumer.stream(): print(msg) Polling manually ~~~~~~~~~~~~~~~~ If the ``stream()`` generator is too opinionated, call :py:meth:`~ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer.poll` directly: .. code-block:: python import certifi consumer = GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path=certifi.where(), ) consumer.subscribe("test") while True: msg = consumer.poll(timeout=1.0) if msg is not None: handle(msg) Token reloading --------------- Long-running producers and consumers must cope with SciToken expiry. By default both classes set ``reload_cred=True``, which creates a :py:class:`~ligo.gracedb.kafka.auth.SciTokenRefreshManager`. The manager parses the JWT ``exp`` claim and re-discovers a fresh token ``reload_buffer`` seconds before expiry (default: 300 s). If an external process (e.g. ``htgettoken`` in a cron job) periodically writes a fresh token to disk, the refresh manager will pick it up transparently. To disable proactive reloading: .. code-block:: python producer = GraceDbKafkaProducer( bootstrap_servers="kafka-dev.ligo.org:9092", service_url="https://gracedb-dev.ligo.org/api/", reload_cred=False, ) Error handling -------------- All Kafka-specific exceptions inherit from :py:class:`~ligo.gracedb.kafka.KafkaError`: .. code-block:: text KafkaError +-- KafkaAuthenticationError | +-- KafkaTokenError +-- KafkaProducerError | +-- KafkaMessageDeliveryError +-- KafkaConsumerError +-- KafkaConnectionError Authentication failures that occur inside librdkafka (for example, an expired token that was not refreshed in time) are surfaced as :py:class:`~ligo.gracedb.kafka.KafkaAuthenticationError` on the next :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.produce`, :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.flush`, or :py:meth:`~ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer.poll` call. API reference ------------- Producer class ~~~~~~~~~~~~~~ .. autoclass:: ligo.gracedb.kafka.producer.GraceDbKafkaProducer :members: :undoc-members: Consumer class ~~~~~~~~~~~~~~ .. autoclass:: ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer :members: :undoc-members: Authentication helpers ~~~~~~~~~~~~~~~~~~~~~~ .. autofunction:: ligo.gracedb.kafka.auth.get_scitoken_for_kafka .. autofunction:: ligo.gracedb.kafka.auth.create_oauth_callback .. autoclass:: ligo.gracedb.kafka.auth.SciTokenRefreshManager :members: Topic utilities ~~~~~~~~~~~~~~~ .. autofunction:: ligo.gracedb.kafka.utils.derive_instance_name .. autofunction:: ligo.gracedb.kafka.utils.make_topic_name .. autofunction:: ligo.gracedb.kafka.utils.parse_topic_name Exception classes ~~~~~~~~~~~~~~~~~ .. automodule:: ligo.gracedb.kafka.exceptions :members: