Kafka integration
=================
ligo-gracedb includes a Kafka producer and consumer that authenticate
with `SciTokens `_ over the OAUTHBEARER SASL
mechanism. These classes are designed for long-running analysis
pipelines that need to publish or consume GraceDB events in real time.
.. note::
The Kafka classes require the ``confluent-kafka`` C library.
Install the optional dependency with::
pip install ligo-gracedb[kafka]
Authentication
--------------
Both the producer and the consumer discover a SciToken automatically
using `igwn-auth-utils `_.
The token is located following the `WLCG Bearer Token Discovery
`_ specification:
#. ``$BEARER_TOKEN`` environment variable (token content)
#. ``$BEARER_TOKEN_FILE`` environment variable (path to file)
#. ``$XDG_RUNTIME_DIR/bt_u`` or ``/tmp/bt_u``
If ``igwn-auth-utils`` cannot match the required audience or scope, a
direct file read of the above paths is attempted as a fallback.
You can obtain a fresh SciToken with ``htgettoken``::
htgettoken -a vault.ligo.org -i igwn
By default, both classes set ``reload_cred=True``, which installs a
:py:class:`~ligo.gracedb.kafka.auth.SciTokenRefreshManager` that
monitors the token's ``exp`` claim and re-reads the token from disk
before it expires. This mirrors the ``reload_cred`` behaviour of the
REST client and is essential for long-running processes.
.. note::
In addition to having a valid, non-expired SciToken, a user must
have the ``gracedb.read`` scope in order to publish to Kafka. On
event submission, GraceDB's existing per-user/per-pipeline permission
model will control which users can publish to individual pipelines.
Once `changes in GraceDB scopes
`_
are implemented, access to pipeline-specific topics will be controlled on a
per-user basis.
Write access to the ``test`` topic will remain available to all LVK users
with the ``gracedb.read`` scope.
Topic namespacing
-----------------
Kafka topics are **instance-namespaced**: each topic is prefixed with
the GraceDB instance name so that multiple GraceDB deployments
(development, test, playground, production) can share the same broker
without message collisions. The format is
``.``, for example:
- ``gracedb-dev.gstlal``
- ``gracedb-test.cwb``
- ``gracedb.aframe``
The instance name is derived automatically from the GraceDB service URL
(using the first component of the hostname). For example,
``https://gracedb-dev.ligo.org/api/`` yields instance name
``gracedb-dev``.
Both the producer and consumer handle namespacing transparently: you
pass pipeline names (e.g. ``"gstlal"``) and the classes prepend the
instance name for you. You can supply the instance identity in two
ways:
- *service_url* — the GraceDB service URL; the instance name is
extracted automatically.
- *instance_name* — an explicit instance name string.
Exactly one of these must be provided.
Three utility functions are available for working with namespaced topics
directly:
.. code-block:: python
from ligo.gracedb.kafka import (
derive_instance_name,
make_topic_name,
parse_topic_name,
)
derive_instance_name("https://gracedb-dev.ligo.org/api/")
# => "gracedb-dev"
make_topic_name("gracedb-dev", "gstlal")
# => "gracedb-dev.gstlal"
parse_topic_name("gracedb-dev.gstlal")
# => ("gracedb-dev", "gstlal")
Notes on Downstream Consumers
-----------------------------
Topics on the GraceDB event submission Kafka broker correspond to approved
and testing `pipelines `_
on GraceDB. A ``test`` topic is available to submit events to the ``Test``
group. Please see the above note regarding topic permissions.
Uploading a GW trigger via Kafka does not directly insert a new event
into GraceDB, as was the use case of the ``create_event`` REST API call.
Rather, publishing an event to Kafka allows the simultaneous processing
of GW data products by multiple downstream processes.
Ahead of IR1, a dedicated service called the
`gracedb-kafka-supervisor `_
is running in AWS alongside GraceDB. This application consumes Kafka
messages and injects event triggers and data products into GraceDB.
Please see the section below on :ref:`kafka:Skipping server-side processing` to
control how events are handled by the ``gracedb-kafka-supervisor``.
As the IR1 and beyond low-latency alert infrastructure evolves,
`follow-up pipelines `_ may opt to
delay uploading g-events until after time-critical periods of the alert
generation process.
Producer
--------
A :py:class:`~ligo.gracedb.kafka.GraceDbKafkaProducer` wraps a
``confluent_kafka.Producer`` with SciToken authentication and sensible
defaults for GraceDB usage.
Creating a producer
~~~~~~~~~~~~~~~~~~~
.. code-block:: python
import certifi
from ligo.gracedb.kafka import GraceDbKafkaProducer
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
)
The ``topic`` argument is the pipeline name (e.g. ``"test"``,
``"gstlal"``). It is automatically namespaced with the instance name
derived from ``service_url``, so the actual Kafka topic above would be
``gracedb-dev.test``. See :ref:`kafka:Topic namespacing` for details.
During development, when the broker's TLS certificate does not match
its hostname, pass ``skip_hostname_verification=True``.
Producing messages
~~~~~~~~~~~~~~~~~~
The :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.produce`
method accepts a ``dict``, ``str``, or ``bytes`` value. Dicts are
serialised to JSON automatically:
.. code-block:: python
producer.produce({"graceid": "G12345", "group": "CBC"})
producer.flush() # wait for delivery confirmation
producer.close() # flush remaining messages and release resources
The producer also works as a context manager:
.. code-block:: python
with GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
) as producer:
producer.produce({"graceid": "G12345", "group": "CBC"})
Testing connectivity
~~~~~~~~~~~~~~~~~~~~
Call :py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.ping`
to verify end-to-end connectivity. This produces a small test message,
waits for delivery confirmation, and returns the delivery metadata:
.. code-block:: python
result = producer.ping(topic="test")
print(result)
# {'topic': 'gracedb-test', 'partition': 0, 'offset': 42,
# 'message': {'type': 'ping', ...}}
Uploading events with Kafka
---------------------------
The preferred way to upload events through Kafka is the two-step
**reserve + produce** workflow provided by
:py:meth:`~ligo.gracedb.rest.GraceDb.create_event`. Passing a
producer to its ``kafka`` parameter causes ``create_event`` to:
#. **Reserve a graceid** on the GraceDB server via an HTTP call to the
reservations endpoint. This returns a ``graceid`` and ``submitter``
before any event data is transmitted.
#. **Produce a Kafka message** containing the reserved ``graceid``,
the base64-encoded event file, pipeline metadata, and optional
labels. The GraceDB server-side consumer receives this message and
completes the upload.
This approach decouples the latency-sensitive part of event submission
(obtaining a ``graceid``) from the bulk data transfer, and provides an
optional HTTP fallback if Kafka delivery fails.
.. note::
GraceDB will not dispatch an ``igwn-alert`` when a graceid is reserved.
Once a reserved event is uploaded, then GraceDB will dispatch a "new"-type
``igwn-alert``.
Basic example
~~~~~~~~~~~~~
.. code-block:: python
import certifi
from ligo.gracedb.rest import GraceDb
from ligo.gracedb.kafka import GraceDbKafkaProducer
client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-test.ligo.org/api/",
ca_cert_path=certifi.where(),
)
response = client.create_event(
"CBC", "gstlal", "coinc.xml",
search="AllSky",
kafka=producer,
)
# Message is published to gracedb-test.gstlal
print(response.json()["graceid"]) # e.g. "G123456"
producer.close()
In the above example, by not specifying ``topic`` in the
``GraceDbKafkaProducer``, the producer publishes to a topic that is
determined by the ``group`` and ``pipeline`` arguments passed to
``create_event``. Events in the ``Test`` group are routed to
``{instance}.test``; all other groups are routed to
``{instance}.{pipeline}`` (lowercased).
``create_event`` automatically flushes the producer after publishing, so
the message is guaranteed to be delivered (or an error raised) before the
call returns. The flush timeout defaults to 10 seconds and can be
configured via the ``flush_timeout`` parameter on the producer::
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-test.ligo.org/api/",
ca_cert_path=certifi.where(),
flush_timeout=30, # wait up to 30s for delivery
)
Using the convenience factory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The :py:meth:`~ligo.gracedb.rest.GraceDb.create_kafka_producer` method
on the client creates a pre-configured producer. The client's service
URL is passed automatically, so topics are namespaced without any extra
arguments:
.. code-block:: python
client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')
producer = client.create_kafka_producer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
ca_cert_path=certifi.where(),
)
HTTP fallback
~~~~~~~~~~~~~
If ``http_fallback=True`` and the Kafka path fails (reservation failure
or produce failure), ``create_event`` automatically falls back to the
standard HTTP POST:
.. code-block:: python
response = client.create_event(
"CBC", "gstlal", "coinc.xml",
kafka=producer,
http_fallback=True,
)
When a graceid has already been reserved but the Kafka produce fails,
the fallback uses
:py:meth:`~ligo.gracedb.rest.GraceDb.upload_reserved_event` to
complete the upload over HTTP.
Skipping server-side processing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Two boolean flags control what the server does with the Kafka message:
- ``gracedb_upload=False`` — publish the event to Kafka *without*
reserving a graceid or processing it on the server. Useful when a
downstream consumer other than GraceDB handles the message.
- ``skip_supervisor=False`` — when ``True``, the server-side
supervisor will not process the message (i.e. will not call
``upload_reserved_event``).
Consumer
--------
A :py:class:`~ligo.gracedb.kafka.GraceDbKafkaConsumer` wraps a
``confluent_kafka.Consumer`` with SciToken authentication.
Live streaming
~~~~~~~~~~~~~~
By default the consumer generates an ephemeral group ID, reads from
the end of the topic (``auto.offset.reset=latest``), and auto-commits
offsets. This means it only sees messages produced *after* it starts:
.. code-block:: python
import certifi
from ligo.gracedb.kafka import GraceDbKafkaConsumer
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
print(msg["value"])
Pressing ``Ctrl-C`` stops the stream cleanly; the consumer installs
``SIGINT`` and ``SIGTERM`` handlers that cause the current poll to
finish and the generator to exit.
Persisting read position
~~~~~~~~~~~~~~~~~~~~~~~~
Supply a stable ``group_id`` to resume from the last committed offset
across restarts:
.. code-block:: python
import certifi
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
group_id="my-pipeline-consumer",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
process(msg)
Replaying history
~~~~~~~~~~~~~~~~~
Combine a fresh ``group_id`` with ``auto_offset_reset="earliest"`` to
replay all messages from the beginning of the topic:
.. code-block:: python
import certifi
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
group_id="my-replay-group",
auto_offset_reset="earliest",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
print(msg)
Polling manually
~~~~~~~~~~~~~~~~
If the ``stream()`` generator is too opinionated, call
:py:meth:`~ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer.poll`
directly:
.. code-block:: python
import certifi
consumer = GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
)
consumer.subscribe("test")
while True:
msg = consumer.poll(timeout=1.0)
if msg is not None:
handle(msg)
Token reloading
---------------
Long-running producers and consumers must cope with SciToken expiry.
By default both classes set ``reload_cred=True``, which creates a
:py:class:`~ligo.gracedb.kafka.auth.SciTokenRefreshManager`. The
manager parses the JWT ``exp`` claim and re-discovers a fresh token
``reload_buffer`` seconds before expiry (default: 300 s).
If an external process (e.g. ``htgettoken`` in a cron job) periodically
writes a fresh token to disk, the refresh manager will pick it up
transparently.
To disable proactive reloading:
.. code-block:: python
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-dev.ligo.org/api/",
reload_cred=False,
)
Error handling
--------------
All Kafka-specific exceptions inherit from
:py:class:`~ligo.gracedb.kafka.KafkaError`:
.. code-block:: text
KafkaError
+-- KafkaAuthenticationError
| +-- KafkaTokenError
+-- KafkaProducerError
| +-- KafkaMessageDeliveryError
+-- KafkaConsumerError
+-- KafkaConnectionError
Authentication failures that occur inside librdkafka (for example, an
expired token that was not refreshed in time) are surfaced as
:py:class:`~ligo.gracedb.kafka.KafkaAuthenticationError` on the next
:py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.produce`,
:py:meth:`~ligo.gracedb.kafka.producer.GraceDbKafkaProducer.flush`, or
:py:meth:`~ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer.poll` call.
API reference
-------------
Producer class
~~~~~~~~~~~~~~
.. autoclass:: ligo.gracedb.kafka.producer.GraceDbKafkaProducer
:members:
:undoc-members:
Consumer class
~~~~~~~~~~~~~~
.. autoclass:: ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer
:members:
:undoc-members:
Authentication helpers
~~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: ligo.gracedb.kafka.auth.get_scitoken_for_kafka
.. autofunction:: ligo.gracedb.kafka.auth.create_oauth_callback
.. autoclass:: ligo.gracedb.kafka.auth.SciTokenRefreshManager
:members:
Topic utilities
~~~~~~~~~~~~~~~
.. autofunction:: ligo.gracedb.kafka.utils.derive_instance_name
.. autofunction:: ligo.gracedb.kafka.utils.make_topic_name
.. autofunction:: ligo.gracedb.kafka.utils.parse_topic_name
Exception classes
~~~~~~~~~~~~~~~~~
.. automodule:: ligo.gracedb.kafka.exceptions
:members: