Skip to content

Usage

sgn-arrakis provides two SGN pipeline elements for working with Arrakis data streams: a source for reading data, and a sink for writing it.

Both elements integrate with SGN pipelines via the sgn-ts timeseries framework.

ArrakisSource

ArrakisSource streams channel data from an Arrakis server into an SGN pipeline. Source pads are named after the channels they stream.

Key parameters

  • source_pad_names -- list of channel names to stream (e.g., ["L1:GDS-CALIB_STRAIN"])
  • start -- GPS start time in seconds, or current time if None
  • end or duration -- GPS end time or total duration in seconds; omit both for an endless stream
  • in_queue_timeout -- seconds to wait for a block from the server before timing out (default: 60)

Stream from the current time

When start is not specified, ArrakisSource begins streaming from the current GPS time:

from sgn_arrakis import ArrakisSource
from sgn import Pipeline, SignalEOS
from sgnts.sinks import NullSeriesSink

channels = ["L1:GDS-CALIB_STRAIN"]

src = ArrakisSource(
    source_pad_names=channels,
)
sink = NullSeriesSink(
    sink_pad_names=channels,
    verbose=True,
)

pipeline = Pipeline()
pipeline.connect(src, sink)
with SignalEOS():
    pipeline.run()

Stream a fixed duration

Set start and duration (or end) to stream a bounded segment:

src = ArrakisSource(
    source_pad_names=["L1:GDS-CALIB_STRAIN"],
    start=1187008882,
    duration=64,
)

Gap handling

When Arrakis delivers blocks containing null data, ArrakisSource creates gap buffers with no data payload. Downstream elements can detect these via the buffer's data_valid flag.

Key parameters

  • publisher_id -- admin-assigned publisher ID (required)
  • sink_pad_names -- list of channel names to publish; must match all channels registered to the publisher
  • block_duration -- duration of each published block in nanoseconds (default: 62,500,000 ns = 1/16 s)

Basic usage

from sgn_arrakis import ArrakisSink
from sgn import Pipeline, SignalEOS
from sgnts.sources import FakeSeriesSource

channels = ["H1:GDS-CALIB_STRAIN"]

source = FakeSeriesSource(
    source_pad_names=channels,
    signals={
        "H1:GDS-CALIB_STRAIN": {
            "signal-type": "const",
            "sample-shape": (1,),
            "sample-rate": 16384,
            "value": 0.0,
        },
    },
)
sink = ArrakisSink(
    publisher_id="my-publisher-id",
    sink_pad_names=channels,
)

pipeline = Pipeline()
pipeline.connect(source, sink)
with SignalEOS():
    pipeline.run()

Channel validation

On startup, ArrakisSink registers with the Arrakis server and verifies that the sink pad names exactly match the channels assigned to the publisher. A ValueError is raised if there is a mismatch.

Background publishing

ArrakisSink publishes data in a background thread so that the main pipeline execution is not blocked by network I/O. Data blocks are queued internally and published asynchronously.

CLI

sgn-arrakis includes a command-line tool for testing source and sink functionality.

Stream channels

Stream channel data from Arrakis and print to the console:

sgn-arrakis source L1:GDS-CALIB_STRAIN H1:GDS-CALIB_STRAIN

Publish test data

Publish constant-value test data to all channels registered to a publisher:

sgn-arrakis sink my-publisher-id

Use --include-gaps to include gap buffers in the published stream:

sgn-arrakis sink --include-gaps my-publisher-id

Options

  • --url / -u -- specify the Arrakis server URL
  • --version -- display the version
  • LOG_LEVEL environment variable -- set logging verbosity (default: DEBUG)