Kafka integration
ligo-gracedb includes a Kafka producer and consumer that authenticate with SciTokens over the OAUTHBEARER SASL mechanism. These classes are designed for long-running analysis pipelines that need to publish or consume GraceDB events in real time.
Note
The Kafka classes require the confluent-kafka C library.
Install the optional dependency with:
pip install ligo-gracedb[kafka]
Authentication
Both the producer and the consumer discover a SciToken automatically using igwn-auth-utils. The token is located following the WLCG Bearer Token Discovery specification:
$BEARER_TOKENenvironment variable (token content)$BEARER_TOKEN_FILEenvironment variable (path to file)$XDG_RUNTIME_DIR/bt_u<uid>or/tmp/bt_u<uid>
If igwn-auth-utils cannot match the required audience or scope, a
direct file read of the above paths is attempted as a fallback.
You can obtain a fresh SciToken with htgettoken:
htgettoken -a vault.ligo.org -i igwn
By default, both classes set reload_cred=True, which installs a
SciTokenRefreshManager that
monitors the token’s exp claim and re-reads the token from disk
before it expires. This mirrors the reload_cred behaviour of the
REST client and is essential for long-running processes.
Note
In addition to having a valid, non-expired SciToken, a user must
have the gracedb.read scope in order to publish to Kafka. On
event submission, GraceDB’s existing per-user/per-pipeline permission
model will control which users can publish to individual pipelines.
Once changes in GraceDB scopes are implemented, access to pipeline-specific topics will be controlled on a per-user basis.
Write access to the test topic will remain available to all LVK users
with the gracedb.read scope.
Topic namespacing
Kafka topics are instance-namespaced: each topic is prefixed with
the GraceDB instance name so that multiple GraceDB deployments
(development, test, playground, production) can share the same broker
without message collisions. The format is
<instance_name>.<pipeline>, for example:
gracedb-dev.gstlalgracedb-test.cwbgracedb.aframe
The instance name is derived automatically from the GraceDB service URL
(using the first component of the hostname). For example,
https://gracedb-dev.ligo.org/api/ yields instance name
gracedb-dev.
Both the producer and consumer handle namespacing transparently: you
pass pipeline names (e.g. "gstlal") and the classes prepend the
instance name for you. You can supply the instance identity in two
ways:
service_url — the GraceDB service URL; the instance name is extracted automatically.
instance_name — an explicit instance name string.
Exactly one of these must be provided.
Three utility functions are available for working with namespaced topics directly:
from ligo.gracedb.kafka import (
derive_instance_name,
make_topic_name,
parse_topic_name,
)
derive_instance_name("https://gracedb-dev.ligo.org/api/")
# => "gracedb-dev"
make_topic_name("gracedb-dev", "gstlal")
# => "gracedb-dev.gstlal"
parse_topic_name("gracedb-dev.gstlal")
# => ("gracedb-dev", "gstlal")
Notes on Downstream Consumers
Topics on the GraceDB event submission Kafka broker correspond to approved
and testing pipelines
on GraceDB. A test topic is available to submit events to the Test
group. Please see the above note regarding topic permissions.
Uploading a GW trigger via Kafka does not directly insert a new event
into GraceDB, as was the use case of the create_event REST API call.
Rather, publishing an event to Kafka allows the simultaneous processing
of GW data products by multiple downstream processes.
Ahead of IR1, a dedicated service called the
gracedb-kafka-supervisor
is running in AWS alongside GraceDB. This application consumes Kafka
messages and injects event triggers and data products into GraceDB.
Please see the section below on Skipping server-side processing to
control how events are handled by the gracedb-kafka-supervisor.
As the IR1 and beyond low-latency alert infrastructure evolves, follow-up pipelines may opt to delay uploading g-events until after time-critical periods of the alert generation process.
Producer
A GraceDbKafkaProducer wraps a
confluent_kafka.Producer with SciToken authentication and sensible
defaults for GraceDB usage.
Creating a producer
import certifi
from ligo.gracedb.kafka import GraceDbKafkaProducer
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
)
The topic argument is the pipeline name (e.g. "test",
"gstlal"). It is automatically namespaced with the instance name
derived from service_url, so the actual Kafka topic above would be
gracedb-dev.test. See Topic namespacing for details.
During development, when the broker’s TLS certificate does not match
its hostname, pass skip_hostname_verification=True.
Producing messages
The produce()
method accepts a dict, str, or bytes value. Dicts are
serialised to JSON automatically:
producer.produce({"graceid": "G12345", "group": "CBC"})
producer.flush() # wait for delivery confirmation
producer.close() # flush remaining messages and release resources
The producer also works as a context manager:
with GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
) as producer:
producer.produce({"graceid": "G12345", "group": "CBC"})
Testing connectivity
Call ping()
to verify end-to-end connectivity. This produces a small test message,
waits for delivery confirmation, and returns the delivery metadata:
result = producer.ping(topic="test")
print(result)
# {'topic': 'gracedb-test', 'partition': 0, 'offset': 42,
# 'message': {'type': 'ping', ...}}
Uploading events with Kafka
The preferred way to upload events through Kafka is the two-step
reserve + produce workflow provided by
create_event(). Passing a
producer to its kafka parameter causes create_event to:
Reserve a graceid on the GraceDB server via an HTTP call to the reservations endpoint. This returns a
graceidandsubmitterbefore any event data is transmitted.Produce a Kafka message containing the reserved
graceid, the base64-encoded event file, pipeline metadata, and optional labels. The GraceDB server-side consumer receives this message and completes the upload.
This approach decouples the latency-sensitive part of event submission
(obtaining a graceid) from the bulk data transfer, and provides an
optional HTTP fallback if Kafka delivery fails.
Note
GraceDB will not dispatch an igwn-alert when a graceid is reserved.
Once a reserved event is uploaded, then GraceDB will dispatch a “new”-type
igwn-alert.
Basic example
import certifi
from ligo.gracedb.rest import GraceDb
from ligo.gracedb.kafka import GraceDbKafkaProducer
client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-test.ligo.org/api/",
ca_cert_path=certifi.where(),
)
response = client.create_event(
"CBC", "gstlal", "coinc.xml",
search="AllSky",
kafka=producer,
)
# Message is published to gracedb-test.gstlal
print(response.json()["graceid"]) # e.g. "G123456"
producer.close()
In the above example, by not specifying topic in the
GraceDbKafkaProducer, the producer publishes to a topic that is
determined by the group and pipeline arguments passed to
create_event. Events in the Test group are routed to
{instance}.test; all other groups are routed to
{instance}.{pipeline} (lowercased).
create_event automatically flushes the producer after publishing, so
the message is guaranteed to be delivered (or an error raised) before the
call returns. The flush timeout defaults to 10 seconds and can be
configured via the flush_timeout parameter on the producer:
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-test.ligo.org/api/",
ca_cert_path=certifi.where(),
flush_timeout=30, # wait up to 30s for delivery
)
Using the convenience factory
The create_kafka_producer() method
on the client creates a pre-configured producer. The client’s service
URL is passed automatically, so topics are namespaced without any extra
arguments:
client = GraceDb('https://gracedb-test.ligo.org/api/', api_version='v2')
producer = client.create_kafka_producer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topic="test",
ca_cert_path=certifi.where(),
)
HTTP fallback
If http_fallback=True and the Kafka path fails (reservation failure
or produce failure), create_event automatically falls back to the
standard HTTP POST:
response = client.create_event(
"CBC", "gstlal", "coinc.xml",
kafka=producer,
http_fallback=True,
)
When a graceid has already been reserved but the Kafka produce fails,
the fallback uses
upload_reserved_event() to
complete the upload over HTTP.
Skipping server-side processing
Two boolean flags control what the server does with the Kafka message:
gracedb_upload=False— publish the event to Kafka without reserving a graceid or processing it on the server. Useful when a downstream consumer other than GraceDB handles the message.skip_supervisor=False— whenTrue, the server-side supervisor will not process the message (i.e. will not callupload_reserved_event).
Consumer
A GraceDbKafkaConsumer wraps a
confluent_kafka.Consumer with SciToken authentication.
Live streaming
By default the consumer generates an ephemeral group ID, reads from
the end of the topic (auto.offset.reset=latest), and auto-commits
offsets. This means it only sees messages produced after it starts:
import certifi
from ligo.gracedb.kafka import GraceDbKafkaConsumer
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
print(msg["value"])
Pressing Ctrl-C stops the stream cleanly; the consumer installs
SIGINT and SIGTERM handlers that cause the current poll to
finish and the generator to exit.
Persisting read position
Supply a stable group_id to resume from the last committed offset
across restarts:
import certifi
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
group_id="my-pipeline-consumer",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
process(msg)
Replaying history
Combine a fresh group_id with auto_offset_reset="earliest" to
replay all messages from the beginning of the topic:
import certifi
with GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
topics="test",
service_url="https://gracedb-dev.ligo.org/api/",
group_id="my-replay-group",
auto_offset_reset="earliest",
ca_cert_path=certifi.where(),
) as consumer:
for msg in consumer.stream():
print(msg)
Polling manually
If the stream() generator is too opinionated, call
poll()
directly:
import certifi
consumer = GraceDbKafkaConsumer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-dev.ligo.org/api/",
ca_cert_path=certifi.where(),
)
consumer.subscribe("test")
while True:
msg = consumer.poll(timeout=1.0)
if msg is not None:
handle(msg)
Token reloading
Long-running producers and consumers must cope with SciToken expiry.
By default both classes set reload_cred=True, which creates a
SciTokenRefreshManager. The
manager parses the JWT exp claim and re-discovers a fresh token
reload_buffer seconds before expiry (default: 300 s).
If an external process (e.g. htgettoken in a cron job) periodically
writes a fresh token to disk, the refresh manager will pick it up
transparently.
To disable proactive reloading:
producer = GraceDbKafkaProducer(
bootstrap_servers="kafka-dev.ligo.org:9092",
service_url="https://gracedb-dev.ligo.org/api/",
reload_cred=False,
)
Error handling
All Kafka-specific exceptions inherit from
KafkaError:
KafkaError
+-- KafkaAuthenticationError
| +-- KafkaTokenError
+-- KafkaProducerError
| +-- KafkaMessageDeliveryError
+-- KafkaConsumerError
+-- KafkaConnectionError
Authentication failures that occur inside librdkafka (for example, an
expired token that was not refreshed in time) are surfaced as
KafkaAuthenticationError on the next
produce(),
flush(), or
poll() call.
API reference
Producer class
- class ligo.gracedb.kafka.producer.GraceDbKafkaProducer(bootstrap_servers, topic=None, instance_name=None, service_url=None, use_ssl=True, ca_cert_path=None, skip_hostname_verification=False, token_scope='kafka.produce', token_audience=None, reload_cred=True, reload_buffer=300, flush_timeout=10, guaranteed_delivery=True, auto_refresh_token=None, **kafka_config)
Kafka producer with SciToken authentication for GraceDB events.
This producer automatically handles SciToken discovery and OAuth authentication for publishing messages to Kafka brokers.
- Parameters:
bootstrap_servers (str) – Kafka broker address (e.g., “localhost:9092”)
topic (str, optional) – Default topic name for messages (pipeline name, e.g.,
"gstlal"). Will be automatically namespaced with the instance name to produce the actual Kafka topic (e.g.,"gracedb-dev.gstlal"). When omitted, a topic must be specified atproduce()time. When using the producer withcreate_event(), the topic is determined automatically from the event’sgroupandpipelinearguments, so setting a default here is not required.instance_name (str, optional) – GraceDB instance name used to namespace topics (e.g.,
"gracedb-dev"). Exactly one of instance_name or service_url must be provided.service_url (str, optional) – GraceDB service URL from which the instance name is derived (e.g.,
"https://gracedb-dev.ligo.org/api/").use_ssl (bool) – Enable SSL/TLS encryption (default: True)
ca_cert_path (str, optional) – Path to CA certificate for SSL verification
skip_hostname_verification (bool) – Skip SSL hostname verification (default: False). Use for development/testing when cert CN doesn’t match broker hostname.
token_scope (str) – Required SciToken scope (default: “kafka.produce”)
token_audience (str, optional) – Required audience. Defaults to broker hostname
reload_cred (bool) – Proactively reload SciTokens before expiry (default: True). When enabled, a
SciTokenRefreshManagermonitors token expiration and re-reads the token from disk before it expires.reload_buffer (int) – Seconds before token expiry to trigger a reload (default: 300). Only used when
reload_cred=True.flush_timeout (int) – Seconds to wait for message delivery when
create_event()flushes the producer after publishing (default: 10).guaranteed_delivery (bool) – When
True(default), configures the producer for reliable delivery:acks=all, retries on transient failures, and delivery verification via callback. Set toFalsefor higher throughput at the risk of silent message loss.**kafka_config – Additional confluent_kafka.Producer configuration options
Example
Using with
create_event(topic determined automatically):producer = GraceDbKafkaProducer( bootstrap_servers="gracedb-dev.ligo.org:9092", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path="/tmp/cacert.pem", ) gdb.create_event( group="CBC", pipeline="gstlal", filename="coinc.xml", filecontents=data, kafka=producer, ) # Message is published to gracedb-dev.gstlal
Using with a default topic for direct
produce()calls:producer = GraceDbKafkaProducer( bootstrap_servers="gracedb-dev.ligo.org:9092", topic="test", service_url="https://gracedb-dev.ligo.org/api/", ca_cert_path="/tmp/cacert.pem", ) producer.produce(value={"graceid": "G12345"}) # Message is published to gracedb-dev.test
- close(timeout=30)
Close the producer and release resources.
This method flushes any pending messages before closing.
- Parameters:
timeout (float) – Maximum time to wait for flush in seconds
- flush(timeout=-1)
Wait for all messages in the producer queue to be delivered.
- Parameters:
timeout (float) – Maximum time to wait in seconds. -1 = wait indefinitely (default) 0 = non-blocking check >0 = wait up to timeout seconds
- Returns:
Number of messages still in queue after timeout
- Return type:
int
- Raises:
KafkaProducerError – If flush encounters an error
- property pending_messages
Number of messages currently pending delivery.
- ping(topic=None, timeout=10)
Send a test message to verify broker connectivity and authentication.
Produces a small JSON message whose body includes the
subandscopeclaims from the SciToken used to authenticate, then waits for delivery confirmation. This exercises the full path: token discovery, OAUTHBEARER handshake, SSL connection, and message delivery.- Parameters:
topic (str, optional) – Topic to ping. Uses default_topic, or
'gracedb-test'if no default is set.timeout (float) – Seconds to wait for delivery confirmation (default: 10).
- Returns:
Delivery details with keys
'topic','partition','offset', and'message'(the test payload that was sent).- Return type:
dict
- Raises:
KafkaProducerError – If the message cannot be delivered.
KafkaConnectionError – If the broker is unreachable.
- produce(value, topic=None, key=None, headers=None, partition=None, on_delivery=None)
Produce a message to Kafka.
- Parameters:
value – Message value. Can be: - dict: Automatically serialized to JSON - str: Encoded to UTF-8 bytes - bytes: Sent as-is
topic (str, optional) – Pipeline or topic name (e.g.,
"gstlal"). Automatically namespaced with the instance name (e.g.,"gracedb-dev.gstlal"). Falls back to the default topic set at construction if not specified.key (str or bytes, optional) – Message key for partitioning
headers (dict, optional) – Message headers as key-value pairs
partition (int, optional) – Specific partition to send to
on_delivery (callable, optional) – Callback for delivery reports Signature: on_delivery(err, msg)
- Raises:
ValueError – If topic is not specified and no default was set
KafkaProducerError – If message production fails
Example
>>> producer.produce( ... value={"graceid": "G12345"}, ... topic="gstlal", ... key="G12345", ... )
Consumer class
- class ligo.gracedb.kafka.consumer.GraceDbKafkaConsumer(bootstrap_servers, topics=None, instance_name=None, service_url=None, group_id=None, use_ssl=True, ca_cert_path=None, skip_hostname_verification=False, token_scope='kafka.consume', token_audience=None, reload_cred=True, reload_buffer=300, auto_refresh_token=None, auto_offset_reset='latest', **kafka_config)
Kafka consumer with SciToken authentication for GraceDB events.
This consumer automatically handles SciToken discovery and OAuth authentication for subscribing to Kafka topics and reading messages.
By default, the consumer is configured for live streaming: it generates a unique ephemeral group ID, starts reading from the end of the topic (
auto.offset.reset=latest), and automatically commits offsets. This means a fresh consumer only sees messages produced after it connects.To persist read position across restarts, supply a stable
group_id. Kafka will track committed offsets for that group, so subsequent runs resume where the previous one left off.To replay all historical messages, pass
auto_offset_reset="earliest"(useful with a newgroup_idthat has no committed offsets yet).- Parameters:
bootstrap_servers (str) – Kafka broker address (e.g., “localhost:9092”)
topics (str or list, optional) – Topic(s) to subscribe to immediately. These should be pipeline names (e.g.,
"gstlal"); they are automatically namespaced with the instance name.instance_name (str, optional) – GraceDB instance name used to namespace topics (e.g.,
"gracedb-dev"). Exactly one of instance_name or service_url must be provided.service_url (str, optional) – GraceDB service URL from which the instance name is derived (e.g.,
"https://gracedb-dev.ligo.org/api/").group_id (str, optional) – Consumer group ID. When
None(the default), a unique ephemeral ID is generated from the hostname, PID, and current time so that each consumer instance operates independently. Provide a stable string to share offsets across restarts or multiple consumers.use_ssl (bool) – Enable SSL/TLS encryption (default: True)
ca_cert_path (str, optional) – Path to CA certificate for SSL verification
skip_hostname_verification (bool) – Skip SSL hostname verification (default: False). Use for development/testing when cert CN doesn’t match broker hostname.
token_scope (str) – Required SciToken scope (default: “kafka.consume”)
token_audience (str, optional) – Required audience. Defaults to broker hostname
reload_cred (bool) – Proactively reload SciTokens before expiry (default: True). When enabled, a
SciTokenRefreshManagermonitors token expiration and re-reads the token from disk before it expires.reload_buffer (int) – Seconds before token expiry to trigger a reload (default: 300). Only used when
reload_cred=True.auto_offset_reset (str) – Where to start reading when there is no committed offset for the consumer group. Default:
"latest"(only new messages). Set to"earliest"to replay from the beginning of the topic.**kafka_config – Additional confluent_kafka.Consumer configuration options. For example, pass
**{"enable.auto.commit": False}to manage offset commits manually.
Example
Stream only new messages (default behaviour):
with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", ca_cert_path="/tmp/cacert.pem", ) as consumer: for msg in consumer.stream(): print(msg)
Resume from last committed offset across restarts:
with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", group_id="my-pipeline-consumer", ca_cert_path="/tmp/cacert.pem", ) as consumer: for msg in consumer.stream(): process(msg)
Replay all messages from the beginning of a topic:
with GraceDbKafkaConsumer( bootstrap_servers="kafka-dev.ligo.org:9092", topics="test", group_id="my-replay-group", auto_offset_reset="earliest", ca_cert_path="/tmp/cacert.pem", ) as consumer: for msg in consumer.stream(): print(msg)
- close()
Unsubscribe from topics and close the consumer.
- poll(timeout=1.0)
Poll for a single message.
- Parameters:
timeout (float) – Maximum time to wait for a message in seconds (default: 1.0)
- Returns:
Message dict with keys
topic,partition,offset,key,value,timestamp,headers, or None if no message is available.- Return type:
dict or None
- Raises:
KafkaConsumerError – If a consumer error occurs
- stop()
Signal the stream() loop to stop at the next poll iteration.
- stream(timeout=1.0, count=None)
Generator that yields messages from subscribed topics.
Installs signal handlers for SIGINT and SIGTERM so that the first signal causes the loop to exit cleanly (within at most timeout seconds). Original handlers are restored when the generator exits.
- Parameters:
timeout (float) – Poll timeout in seconds per iteration (default: 1.0)
count (int, optional) – Maximum number of messages to yield. None means stream indefinitely.
- Yields:
dict – Message dicts (same format as
poll())
- subscribe(topics)
Subscribe to one or more Kafka topics.
Topic names are automatically namespaced with the instance name. For example, subscribing to
"gstlal"with instance name"gracedb-dev"subscribes to"gracedb-dev.gstlal".- Parameters:
topics (str or list) – Pipeline name or list of pipeline names
- Raises:
KafkaConsumerError – If subscription fails
Authentication helpers
- ligo.gracedb.kafka.auth.get_scitoken_for_kafka(broker_url, scope='kafka.produce', audience=None)
Find and return a valid SciToken for Kafka authentication.
This function discovers a SciToken using igwn-auth-utils and returns the serialized JWT string suitable for Kafka OAUTHBEARER authentication.
- Parameters:
broker_url (str) – Kafka broker URL (e.g., “localhost:9092”)
scope (str) – Required token scope (default: “kafka.produce”)
audience (str, optional) – Required audience. If None, extracted from broker_url
- Returns:
Serialized JWT token string
- Return type:
str
- Raises:
KafkaTokenError – If no valid token can be found
- ligo.gracedb.kafka.auth.create_oauth_callback(broker_url, scope='kafka.produce', audience=None)
Create an OAuth callback function for confluent_kafka Producer.
The callback is called by the Kafka client to obtain the OAuth token before establishing a connection.
- Parameters:
broker_url (str) – Kafka broker URL
scope (str) – Required token scope
audience (str, optional) – Required audience
- Returns:
OAuth callback function that returns (token_str, expiry_time)
- Return type:
callable
- class ligo.gracedb.kafka.auth.SciTokenRefreshManager(broker_url, scope='kafka.produce', audience=None, refresh_buffer=300)
Manages automatic SciToken refresh for long-running Kafka producers.
This class monitors token expiration and triggers callbacks when tokens need to be refreshed.
- create_callback()
Create an OAuth callback that uses this manager.
- Returns:
OAuth callback function
- Return type:
callable
- get_token()
Get current token, refreshing if necessary.
- Returns:
Current valid token string
- Return type:
str
- Raises:
KafkaTokenError – If token cannot be obtained
Topic utilities
- ligo.gracedb.kafka.utils.derive_instance_name(service_url)
Extract the instance name from a GraceDB service URL.
The instance name is the first component of the hostname (i.e., everything before the first dot, or the full hostname if there is no dot).
- Parameters:
service_url (str) – GraceDB service URL (e.g.,
"https://gracedb-dev.ligo.org/api/")- Returns:
Instance name (e.g.,
"gracedb-dev")- Return type:
str
- Raises:
ValueError – If the hostname cannot be parsed from the URL
Examples
>>> derive_instance_name("https://gracedb-dev.ligo.org/api/") 'gracedb-dev' >>> derive_instance_name("https://gracedb.ligo.org/api/") 'gracedb' >>> derive_instance_name("https://localhost:8080/api/") 'localhost'
- ligo.gracedb.kafka.utils.make_topic_name(instance_name, pipeline)
Build an instance-namespaced Kafka topic name.
- Parameters:
instance_name (str) – GraceDB instance name (e.g.,
"gracedb-dev")pipeline (str) – Pipeline name (e.g.,
"gstlal")
- Returns:
Namespaced topic name (e.g.,
"gracedb-dev.gstlal")- Return type:
str
- ligo.gracedb.kafka.utils.parse_topic_name(namespaced_topic)
Split an instance-namespaced topic into its components.
- Parameters:
namespaced_topic (str) – Namespaced topic name (e.g.,
"gracedb-dev.gstlal")- Returns:
(instance_name, pipeline)- Return type:
tuple
- Raises:
ValueError – If the topic does not contain a dot separator
Exception classes
Exceptions for Kafka integration.
- exception ligo.gracedb.kafka.exceptions.KafkaAuthenticationError
Exception raised when Kafka authentication fails.
- exception ligo.gracedb.kafka.exceptions.KafkaConnectionError
Exception raised when connection to Kafka broker fails.
- exception ligo.gracedb.kafka.exceptions.KafkaConsumerError
Exception raised when Kafka consumer encounters an error.
- exception ligo.gracedb.kafka.exceptions.KafkaError
Base exception for Kafka-related errors.
- exception ligo.gracedb.kafka.exceptions.KafkaMessageDeliveryError
Exception raised when message delivery to Kafka fails.
- exception ligo.gracedb.kafka.exceptions.KafkaProducerError
Exception raised when Kafka producer encounters an error.
- exception ligo.gracedb.kafka.exceptions.KafkaTokenError
Exception raised when SciToken cannot be obtained or is invalid.