Skip to content

source

Arrakis source element.

ArrakisSource dataclass

ArrakisSource()

Bases: TSResourceSource

Source element that streams channel data from Arrakis.

Source pads should be named after the channel they will stream from Arrakis.

Parameters:

Name Type Description Default
start float | None

Start time of stream in GPS seconds, or "now" if None.

required
duration float | None

Duration of stream in seconds, or endless stream if None.

required
end float | None

End time of stream in GPS seconds. Cannot be combined with duration.

required
in_queue_timeout int

How long to wait for a block from the Arrakis server before timing out with an error.

60

worker_process

worker_process(context, *, source_pad_names, srcs, start, end)

Worker process method for streaming data from Arrakis.

This method is called repeatedly by the TSResourceSource framework. Each call should process one block of data and return.

Source code in sgn_arrakis/source.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def worker_process(
    self,
    context: WorkerContext,
    *,
    source_pad_names: list[str],
    srcs: dict[str, SourcePad],
    start: float | None,
    end: float | None,
) -> None:
    """Worker process method for streaming data from Arrakis.

    This method is called repeatedly by the TSResourceSource framework.
    Each call should process one block of data and return.
    """
    # Initialize the stream iterator on first call
    if "stream_iter" not in context.state:
        context.state["stream_iter"] = arrakis.stream(source_pad_names, start, end)

    stream_iter = context.state["stream_iter"]

    # Check if we should stop
    if context.should_stop():
        # Close the generator to clean up threads
        if stream_iter is not None and hasattr(stream_iter, "close"):
            with contextlib.suppress(Exception):
                stream_iter.close()
        return

    try:
        # Get one block from the stream
        block = next(stream_iter)

        # Process all channels in this block
        for name, series in block.items():
            channel = series.channel

            # FIXME: should we do this for every block?
            assert channel.sample_rate in Offset.ALLOWED_RATES, (
                f"channel {name} has an invalid sample rate: {channel.sample_rate}"
            )
            # FIXME: should we do other checks?

            if series.has_nulls:
                # create a gap buffer
                # FIXME: this should take the masked array and produce a
                # set of gap and non-gap buffers. however, this isn't
                # possible by returning buffers (instead of frames) since
                # this would break the continuity assumption. once this is
                # addressed upstream we should be able to handle this
                # better
                buf = SeriesBuffer(
                    offset=Offset.fromns(series.time_ns),
                    shape=series.data.shape,
                    sample_rate=int(series.sample_rate),
                )
            else:
                buf = SeriesBuffer(
                    offset=Offset.fromns(series.time_ns),
                    data=series.data,
                    sample_rate=int(series.sample_rate),
                )
            pad = srcs[name]

            # Put data into the queue for the main thread
            context.output_queue.put((pad, buf))

            # Check if we should stop after sending each buffer
            if context.should_stop():
                # Close the generator before returning
                if stream_iter is not None and hasattr(stream_iter, "close"):
                    with contextlib.suppress(Exception):
                        stream_iter.close()
                return

    except StopIteration:
        # Stream has ended - signal shutdown
        context.shutdown_event.set()
        return
    except Exception:
        # Log the original error with full context before cleanup
        logger.exception("ArrakisSource worker error")

        # On any error, ensure we close the generator
        if stream_iter is not None and hasattr(stream_iter, "close"):
            try:
                stream_iter.close()
            except (AttributeError, RuntimeError) as close_error:
                logger.warning("Failed to close stream iterator: %s", close_error)
        # Re-raise the exception
        raise