Kafka integration

ligo-gracedb includes a Kafka producer and consumer that authenticate with SciTokens over the OAUTHBEARER SASL mechanism. These classes are designed for long-running analysis pipelines that need to publish or consume GraceDB events in real time.

Note

The Kafka classes require the confluent-kafka C library. Install the optional dependency with:

pip install ligo-gracedb[kafka]

Authentication

Both the producer and the consumer discover a SciToken automatically using igwn-auth-utils. The token is located following the WLCG Bearer Token Discovery specification:

  1. $BEARER_TOKEN environment variable (token content)

  2. $BEARER_TOKEN_FILE environment variable (path to file)

  3. $XDG_RUNTIME_DIR/bt_u<uid> or /tmp/bt_u<uid>

If igwn-auth-utils cannot match the required audience or scope, a direct file read of the above paths is attempted as a fallback.

You can obtain a fresh SciToken with htgettoken:

htgettoken -a vault.ligo.org -i igwn

By default, both classes set reload_cred=True, which installs a SciTokenRefreshManager that monitors the token’s exp claim and re-reads the token from disk before it expires. This mirrors the reload_cred behaviour of the REST client and is essential for long-running processes.

Note

In addition to having a valid, non-expired SciToken, a user must have the gracedb.read scope in order to publish to Kafka. On event submission, GraceDB’s existing per-user/per-pipeline permission model will control which users can publish to individual pipelines.

Once changes in GraceDB scopes are implemented, access to pipeline-specific topics will be controlled on a per-user basis.

Write access to the test topic will remain available to all LVK users with the gracedb.read scope.

Topic namespacing

Kafka topics are instance-namespaced: each topic is prefixed with the GraceDB instance name so that multiple GraceDB deployments (development, test, playground, production) can share the same broker without message collisions. The format is <instance_name>.<pipeline>, for example:

  • gracedb-dev.gstlal

  • gracedb-test.cwb

  • gracedb.aframe

The instance name is derived automatically from the GraceDB service URL (using the first component of the hostname). For example, https://gracedb-dev.ligo.org/api/ yields instance name gracedb-dev.

Both the producer and consumer handle namespacing transparently: you pass pipeline names (e.g. "gstlal") and the classes prepend the instance name for you. You can supply the instance identity in two ways:

  • service_url — the GraceDB service URL; the instance name is extracted automatically.

  • instance_name — an explicit instance name string.

Exactly one of these must be provided.

Three utility functions are available for working with namespaced topics directly:

from ligo.gracedb.kafka import (
    derive_instance_name,
    make_topic_name,
    parse_topic_name,
)

derive_instance_name("https://gracedb-dev.ligo.org/api/")
# => "gracedb-dev"

make_topic_name("gracedb-dev", "gstlal")
# => "gracedb-dev.gstlal"

parse_topic_name("gracedb-dev.gstlal")
# => ("gracedb-dev", "gstlal")

Notes on Downstream Consumers

Topics on the GraceDB event submission Kafka broker correspond to approved and testing pipelines on GraceDB. A test topic is available to submit events to the Test group. Please see the above note regarding topic permissions.

Uploading a GW trigger via Kafka does not directly insert a new event into GraceDB, as was the use case of the create_event REST API call. Rather, publishing an event to Kafka allows the simultaneous processing of GW data products by multiple downstream processes.

Ahead of IR1, a dedicated service called the gracedb-kafka-supervisor is running in AWS alongside GraceDB. This application consumes Kafka messages and injects event triggers and data products into GraceDB. Please see the section below on Skipping server-side processing to control how events are handled by the gracedb-kafka-supervisor.

As the IR1 and beyond low-latency alert infrastructure evolves, follow-up pipelines may opt to delay uploading g-events until after time-critical periods of the alert generation process.

Producer

A GraceDbKafkaProducer wraps a confluent_kafka.Producer with SciToken authentication and sensible defaults for GraceDB usage.

Creating a producer

import certifi
from ligo.gracedb.kafka import GraceDbKafkaProducer

producer = GraceDbKafkaProducer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topic="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path=certifi.where(),
)

The topic argument is the pipeline name (e.g. "test", "gstlal"). It is automatically namespaced with the instance name derived from service_url, so the actual Kafka topic above would be gracedb-dev.test. See Topic namespacing for details.

During development, when the broker’s TLS certificate does not match its hostname, pass skip_hostname_verification=True.

Producing messages

The produce() method accepts a dict, str, or bytes value. Dicts are serialised to JSON automatically:

producer.produce({"graceid": "G12345", "group": "CBC"})
producer.flush()       # wait for delivery confirmation
producer.close()       # flush remaining messages and release resources

The producer also works as a context manager:

with GraceDbKafkaProducer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topic="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path=certifi.where(),
) as producer:
    producer.produce({"graceid": "G12345", "group": "CBC"})

Testing connectivity

Call ping() to verify end-to-end connectivity. This produces a small test message, waits for delivery confirmation, and returns the delivery metadata:

result = producer.ping(topic="test")
print(result)
# {'topic': 'gracedb-test', 'partition': 0, 'offset': 42,
#  'message': {'type': 'ping', ...}}

Uploading events with Kafka

The preferred way to upload events through Kafka is the two-step reserve + produce workflow provided by create_event(). Passing a producer to its kafka parameter causes create_event to:

  1. Reserve a graceid on the GraceDB server via an HTTP call to the reservations endpoint. This returns a graceid and submitter before any event data is transmitted.

  2. Produce a Kafka message containing the reserved graceid, the base64-encoded event file, pipeline metadata, and optional labels. The GraceDB server-side consumer receives this message and completes the upload.

This approach decouples the latency-sensitive part of event submission (obtaining a graceid) from the bulk data transfer, and provides an optional HTTP fallback if Kafka delivery fails.

Note

GraceDB will not dispatch an igwn-alert when a graceid is reserved. Once a reserved event is uploaded, then GraceDB will dispatch a “new”-type igwn-alert.

Basic example

import certifi
from ligo.gracedb.rest import GraceDb
from ligo.gracedb.kafka import GraceDbKafkaProducer

client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')

producer = GraceDbKafkaProducer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    service_url="https://gracedb-test.ligo.org/api/",
    ca_cert_path=certifi.where(),
)

response = client.create_event(
    "CBC", "gstlal", "coinc.xml",
    search="AllSky",
    kafka=producer,
)
# Message is published to gracedb-test.gstlal
print(response.json()["graceid"])   # e.g. "G123456"

producer.close()

In the above example, by not specifying topic in the GraceDbKafkaProducer, the producer publishes to a topic that is determined by the group and pipeline arguments passed to create_event. Events in the Test group are routed to {instance}.test; all other groups are routed to {instance}.{pipeline} (lowercased).

create_event automatically flushes the producer after publishing, so the message is guaranteed to be delivered (or an error raised) before the call returns. The flush timeout defaults to 10 seconds and can be configured via the flush_timeout parameter on the producer:

producer = GraceDbKafkaProducer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    service_url="https://gracedb-test.ligo.org/api/",
    ca_cert_path=certifi.where(),
    flush_timeout=30,  # wait up to 30s for delivery
)

Using the convenience factory

The create_kafka_producer() method on the client creates a pre-configured producer. The client’s service URL is passed automatically, so topics are namespaced without any extra arguments:

client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')
producer = client.create_kafka_producer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topic="test",
    ca_cert_path=certifi.where(),
)

HTTP fallback

If http_fallback=True and the Kafka path fails (reservation failure or produce failure), create_event automatically falls back to the standard HTTP POST:

response = client.create_event(
    "CBC", "gstlal", "coinc.xml",
    kafka=producer,
    http_fallback=True,
)

When a graceid has already been reserved but the Kafka produce fails, the fallback uses upload_reserved_event() to complete the upload over HTTP.

Skipping server-side processing

Two boolean flags control what the server does with the Kafka message:

  • gracedb_upload=False — publish the event to Kafka without reserving a graceid or processing it on the server. Useful when a downstream consumer other than GraceDB handles the message.

  • skip_supervisor=False — when True, the server-side supervisor will not process the message (i.e. will not call upload_reserved_event).

Consumer

A GraceDbKafkaConsumer wraps a confluent_kafka.Consumer with SciToken authentication.

Live streaming

By default the consumer generates an ephemeral group ID, reads from the end of the topic (auto.offset.reset=latest), and auto-commits offsets. This means it only sees messages produced after it starts:

import certifi
from ligo.gracedb.kafka import GraceDbKafkaConsumer

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path=certifi.where(),
) as consumer:
    for msg in consumer.stream():
        print(msg["value"])

Pressing Ctrl-C stops the stream cleanly; the consumer installs SIGINT and SIGTERM handlers that cause the current poll to finish and the generator to exit.

Persisting read position

Supply a stable group_id to resume from the last committed offset across restarts:

import certifi

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    group_id="my-pipeline-consumer",
    ca_cert_path=certifi.where(),
) as consumer:
    for msg in consumer.stream():
        process(msg)

Replaying history

Combine a fresh group_id with auto_offset_reset="earliest" to replay all messages from the beginning of the topic:

import certifi

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    group_id="my-replay-group",
    auto_offset_reset="earliest",
    ca_cert_path=certifi.where(),
) as consumer:
    for msg in consumer.stream():
        print(msg)

Polling manually

If the stream() generator is too opinionated, call poll() directly:

import certifi

consumer = GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path=certifi.where(),
)
consumer.subscribe("test")

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        handle(msg)

Token reloading

Long-running producers and consumers must cope with SciToken expiry. By default both classes set reload_cred=True, which creates a SciTokenRefreshManager. The manager parses the JWT exp claim and re-discovers a fresh token reload_buffer seconds before expiry (default: 300 s).

If an external process (e.g. htgettoken in a cron job) periodically writes a fresh token to disk, the refresh manager will pick it up transparently.

To disable proactive reloading:

producer = GraceDbKafkaProducer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    service_url="https://gracedb-dev.ligo.org/api/",
    reload_cred=False,
)

Error handling

All Kafka-specific exceptions inherit from KafkaError:

KafkaError
+-- KafkaAuthenticationError
|   +-- KafkaTokenError
+-- KafkaProducerError
|   +-- KafkaMessageDeliveryError
+-- KafkaConsumerError
+-- KafkaConnectionError

Authentication failures that occur inside librdkafka (for example, an expired token that was not refreshed in time) are surfaced as KafkaAuthenticationError on the next produce(), flush(), or poll() call.

API reference

Producer class

class ligo.gracedb.kafka.producer.GraceDbKafkaProducer(bootstrap_servers, topic=None, instance_name=None, service_url=None, use_ssl=True, ca_cert_path=None, skip_hostname_verification=False, token_scope='kafka.produce', token_audience=None, reload_cred=True, reload_buffer=300, flush_timeout=10, guaranteed_delivery=True, auto_refresh_token=None, **kafka_config)

Kafka producer with SciToken authentication for GraceDB events.

This producer automatically handles SciToken discovery and OAuth authentication for publishing messages to Kafka brokers.

Parameters:
  • bootstrap_servers (str) – Kafka broker address (e.g., “localhost:9092”)

  • topic (str, optional) – Default topic name for messages (pipeline name, e.g., "gstlal"). Will be automatically namespaced with the instance name to produce the actual Kafka topic (e.g., "gracedb-dev.gstlal"). When omitted, a topic must be specified at produce() time. When using the producer with create_event(), the topic is determined automatically from the event’s group and pipeline arguments, so setting a default here is not required.

  • instance_name (str, optional) – GraceDB instance name used to namespace topics (e.g., "gracedb-dev"). Exactly one of instance_name or service_url must be provided.

  • service_url (str, optional) – GraceDB service URL from which the instance name is derived (e.g., "https://gracedb-dev.ligo.org/api/").

  • use_ssl (bool) – Enable SSL/TLS encryption (default: True)

  • ca_cert_path (str, optional) – Path to CA certificate for SSL verification

  • skip_hostname_verification (bool) – Skip SSL hostname verification (default: False). Use for development/testing when cert CN doesn’t match broker hostname.

  • token_scope (str) – Required SciToken scope (default: “kafka.produce”)

  • token_audience (str, optional) – Required audience. Defaults to broker hostname

  • reload_cred (bool) – Proactively reload SciTokens before expiry (default: True). When enabled, a SciTokenRefreshManager monitors token expiration and re-reads the token from disk before it expires.

  • reload_buffer (int) – Seconds before token expiry to trigger a reload (default: 300). Only used when reload_cred=True.

  • flush_timeout (int) – Seconds to wait for message delivery when create_event() flushes the producer after publishing (default: 10).

  • guaranteed_delivery (bool) – When True (default), configures the producer for reliable delivery: acks=all, retries on transient failures, and delivery verification via callback. Set to False for higher throughput at the risk of silent message loss.

  • **kafka_config – Additional confluent_kafka.Producer configuration options

Example

Using with create_event (topic determined automatically):

producer = GraceDbKafkaProducer(
    bootstrap_servers="gracedb-dev.ligo.org:9092",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path="/tmp/cacert.pem",
)
gdb.create_event(
    group="CBC", pipeline="gstlal", filename="coinc.xml",
    filecontents=data, kafka=producer,
)
# Message is published to gracedb-dev.gstlal

Using with a default topic for direct produce() calls:

producer = GraceDbKafkaProducer(
    bootstrap_servers="gracedb-dev.ligo.org:9092",
    topic="test",
    service_url="https://gracedb-dev.ligo.org/api/",
    ca_cert_path="/tmp/cacert.pem",
)
producer.produce(value={"graceid": "G12345"})
# Message is published to gracedb-dev.test
close(timeout=30)

Close the producer and release resources.

This method flushes any pending messages before closing.

Parameters:

timeout (float) – Maximum time to wait for flush in seconds

flush(timeout=-1)

Wait for all messages in the producer queue to be delivered.

Parameters:

timeout (float) – Maximum time to wait in seconds. -1 = wait indefinitely (default) 0 = non-blocking check >0 = wait up to timeout seconds

Returns:

Number of messages still in queue after timeout

Return type:

int

Raises:

KafkaProducerError – If flush encounters an error

property pending_messages

Number of messages currently pending delivery.

ping(topic=None, timeout=10)

Send a test message to verify broker connectivity and authentication.

Produces a small JSON message whose body includes the sub and scope claims from the SciToken used to authenticate, then waits for delivery confirmation. This exercises the full path: token discovery, OAUTHBEARER handshake, SSL connection, and message delivery.

Parameters:
  • topic (str, optional) – Topic to ping. Uses default_topic, or 'gracedb-test' if no default is set.

  • timeout (float) – Seconds to wait for delivery confirmation (default: 10).

Returns:

Delivery details with keys 'topic', 'partition', 'offset', and 'message' (the test payload that was sent).

Return type:

dict

Raises:
produce(value, topic=None, key=None, headers=None, partition=None, on_delivery=None)

Produce a message to Kafka.

Parameters:
  • value – Message value. Can be: - dict: Automatically serialized to JSON - str: Encoded to UTF-8 bytes - bytes: Sent as-is

  • topic (str, optional) – Pipeline or topic name (e.g., "gstlal"). Automatically namespaced with the instance name (e.g., "gracedb-dev.gstlal"). Falls back to the default topic set at construction if not specified.

  • key (str or bytes, optional) – Message key for partitioning

  • headers (dict, optional) – Message headers as key-value pairs

  • partition (int, optional) – Specific partition to send to

  • on_delivery (callable, optional) – Callback for delivery reports Signature: on_delivery(err, msg)

Raises:
  • ValueError – If topic is not specified and no default was set

  • KafkaProducerError – If message production fails

Example

>>> producer.produce(
...     value={"graceid": "G12345"},
...     topic="gstlal",
...     key="G12345",
... )

Consumer class

class ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer(bootstrap_servers, topics=None, instance_name=None, service_url=None, group_id=None, use_ssl=True, ca_cert_path=None, skip_hostname_verification=False, token_scope='kafka.consume', token_audience=None, reload_cred=True, reload_buffer=300, auto_refresh_token=None, auto_offset_reset='latest', **kafka_config)

Kafka consumer with SciToken authentication for GraceDB events.

This consumer automatically handles SciToken discovery and OAuth authentication for subscribing to Kafka topics and reading messages.

By default, the consumer is configured for live streaming: it generates a unique ephemeral group ID, starts reading from the end of the topic (auto.offset.reset=latest), and automatically commits offsets. This means a fresh consumer only sees messages produced after it connects.

To persist read position across restarts, supply a stable group_id. Kafka will track committed offsets for that group, so subsequent runs resume where the previous one left off.

To replay all historical messages, pass auto_offset_reset="earliest" (useful with a new group_id that has no committed offsets yet).

Parameters:
  • bootstrap_servers (str) – Kafka broker address (e.g., “localhost:9092”)

  • topics (str or list, optional) – Topic(s) to subscribe to immediately. These should be pipeline names (e.g., "gstlal"); they are automatically namespaced with the instance name.

  • instance_name (str, optional) – GraceDB instance name used to namespace topics (e.g., "gracedb-dev"). Exactly one of instance_name or service_url must be provided.

  • service_url (str, optional) – GraceDB service URL from which the instance name is derived (e.g., "https://gracedb-dev.ligo.org/api/").

  • group_id (str, optional) – Consumer group ID. When None (the default), a unique ephemeral ID is generated from the hostname, PID, and current time so that each consumer instance operates independently. Provide a stable string to share offsets across restarts or multiple consumers.

  • use_ssl (bool) – Enable SSL/TLS encryption (default: True)

  • ca_cert_path (str, optional) – Path to CA certificate for SSL verification

  • skip_hostname_verification (bool) – Skip SSL hostname verification (default: False). Use for development/testing when cert CN doesn’t match broker hostname.

  • token_scope (str) – Required SciToken scope (default: “kafka.consume”)

  • token_audience (str, optional) – Required audience. Defaults to broker hostname

  • reload_cred (bool) – Proactively reload SciTokens before expiry (default: True). When enabled, a SciTokenRefreshManager monitors token expiration and re-reads the token from disk before it expires.

  • reload_buffer (int) – Seconds before token expiry to trigger a reload (default: 300). Only used when reload_cred=True.

  • auto_offset_reset (str) – Where to start reading when there is no committed offset for the consumer group. Default: "latest" (only new messages). Set to "earliest" to replay from the beginning of the topic.

  • **kafka_config – Additional confluent_kafka.Consumer configuration options. For example, pass **{"enable.auto.commit": False} to manage offset commits manually.

Example

Stream only new messages (default behaviour):

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    ca_cert_path="/tmp/cacert.pem",
) as consumer:
    for msg in consumer.stream():
        print(msg)

Resume from last committed offset across restarts:

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    group_id="my-pipeline-consumer",
    ca_cert_path="/tmp/cacert.pem",
) as consumer:
    for msg in consumer.stream():
        process(msg)

Replay all messages from the beginning of a topic:

with GraceDbKafkaConsumer(
    bootstrap_servers="kafka-dev.ligo.org:9092",
    topics="test",
    group_id="my-replay-group",
    auto_offset_reset="earliest",
    ca_cert_path="/tmp/cacert.pem",
) as consumer:
    for msg in consumer.stream():
        print(msg)
close()

Unsubscribe from topics and close the consumer.

poll(timeout=1.0)

Poll for a single message.

Parameters:

timeout (float) – Maximum time to wait for a message in seconds (default: 1.0)

Returns:

Message dict with keys topic, partition, offset, key, value, timestamp, headers, or None if no message is available.

Return type:

dict or None

Raises:

KafkaConsumerError – If a consumer error occurs

stop()

Signal the stream() loop to stop at the next poll iteration.

stream(timeout=1.0, count=None)

Generator that yields messages from subscribed topics.

Installs signal handlers for SIGINT and SIGTERM so that the first signal causes the loop to exit cleanly (within at most timeout seconds). Original handlers are restored when the generator exits.

Parameters:
  • timeout (float) – Poll timeout in seconds per iteration (default: 1.0)

  • count (int, optional) – Maximum number of messages to yield. None means stream indefinitely.

Yields:

dict – Message dicts (same format as poll())

subscribe(topics)

Subscribe to one or more Kafka topics.

Topic names are automatically namespaced with the instance name. For example, subscribing to "gstlal" with instance name "gracedb-dev" subscribes to "gracedb-dev.gstlal".

Parameters:

topics (str or list) – Pipeline name or list of pipeline names

Raises:

KafkaConsumerError – If subscription fails

Authentication helpers

ligo.gracedb.kafka.auth.get_scitoken_for_kafka(broker_url, scope='kafka.produce', audience=None)

Find and return a valid SciToken for Kafka authentication.

This function discovers a SciToken using igwn-auth-utils and returns the serialized JWT string suitable for Kafka OAUTHBEARER authentication.

Parameters:
  • broker_url (str) – Kafka broker URL (e.g., “localhost:9092”)

  • scope (str) – Required token scope (default: “kafka.produce”)

  • audience (str, optional) – Required audience. If None, extracted from broker_url

Returns:

Serialized JWT token string

Return type:

str

Raises:

KafkaTokenError – If no valid token can be found

ligo.gracedb.kafka.auth.create_oauth_callback(broker_url, scope='kafka.produce', audience=None)

Create an OAuth callback function for confluent_kafka Producer.

The callback is called by the Kafka client to obtain the OAuth token before establishing a connection.

Parameters:
  • broker_url (str) – Kafka broker URL

  • scope (str) – Required token scope

  • audience (str, optional) – Required audience

Returns:

OAuth callback function that returns (token_str, expiry_time)

Return type:

callable

class ligo.gracedb.kafka.auth.SciTokenRefreshManager(broker_url, scope='kafka.produce', audience=None, refresh_buffer=300)

Manages automatic SciToken refresh for long-running Kafka producers.

This class monitors token expiration and triggers callbacks when tokens need to be refreshed.

create_callback()

Create an OAuth callback that uses this manager.

Returns:

OAuth callback function

Return type:

callable

get_token()

Get current token, refreshing if necessary.

Returns:

Current valid token string

Return type:

str

Raises:

KafkaTokenError – If token cannot be obtained

Topic utilities

ligo.gracedb.kafka.utils.derive_instance_name(service_url)

Extract the instance name from a GraceDB service URL.

The instance name is the first component of the hostname (i.e., everything before the first dot, or the full hostname if there is no dot).

Parameters:

service_url (str) – GraceDB service URL (e.g., "https://gracedb-dev.ligo.org/api/")

Returns:

Instance name (e.g., "gracedb-dev")

Return type:

str

Raises:

ValueError – If the hostname cannot be parsed from the URL

Examples

>>> derive_instance_name("https://gracedb-dev.ligo.org/api/")
'gracedb-dev'
>>> derive_instance_name("https://gracedb.ligo.org/api/")
'gracedb'
>>> derive_instance_name("https://localhost:8080/api/")
'localhost'
ligo.gracedb.kafka.utils.make_topic_name(instance_name, pipeline)

Build an instance-namespaced Kafka topic name.

Parameters:
  • instance_name (str) – GraceDB instance name (e.g., "gracedb-dev")

  • pipeline (str) – Pipeline name (e.g., "gstlal")

Returns:

Namespaced topic name (e.g., "gracedb-dev.gstlal")

Return type:

str

ligo.gracedb.kafka.utils.parse_topic_name(namespaced_topic)

Split an instance-namespaced topic into its components.

Parameters:

namespaced_topic (str) – Namespaced topic name (e.g., "gracedb-dev.gstlal")

Returns:

(instance_name, pipeline)

Return type:

tuple

Raises:

ValueError – If the topic does not contain a dot separator

Exception classes

Exceptions for Kafka integration.

exception ligo.gracedb.kafka.exceptions.KafkaAuthenticationError

Exception raised when Kafka authentication fails.

exception ligo.gracedb.kafka.exceptions.KafkaConnectionError

Exception raised when connection to Kafka broker fails.

exception ligo.gracedb.kafka.exceptions.KafkaConsumerError

Exception raised when Kafka consumer encounters an error.

exception ligo.gracedb.kafka.exceptions.KafkaError

Base exception for Kafka-related errors.

exception ligo.gracedb.kafka.exceptions.KafkaMessageDeliveryError

Exception raised when message delivery to Kafka fails.

exception ligo.gracedb.kafka.exceptions.KafkaProducerError

Exception raised when Kafka producer encounters an error.

exception ligo.gracedb.kafka.exceptions.KafkaTokenError

Exception raised when SciToken cannot be obtained or is invalid.