Skip to content

Events

EventData

Bases: BaseModel

Main container for an event provided to the worker function

Attributes:

Name Type Description
event_number EventNumber

Current event number relative to the trigger map provided

streams dict[StreamName, StreamData]

Data for each stream present in the event

Source code in dranspose/event.py
class EventData(BaseModel):
    """
    Main container for an event provided to the worker function

    Attributes:
        event_number:    Current event number relative to the trigger map provided
        streams:         Data for each stream present in the event
    """

    event_number: EventNumber
    streams: dict[StreamName, StreamData]

    @classmethod
    def from_internals(cls, msgs: list[InternalWorkerMessage]) -> "EventData":
        """
        Helper function to assemble an event from the internal messages received from the ingesters.
        This is a factory method

        Args:
             msgs: Internal messages each containing a subset of the streams for the event

        Returns:
            A new object with all data combined.
        """
        assert len(msgs) > 0, "merge at least one message"
        assert (
            len(set([m.event_number for m in msgs])) == 1
        ), f"Cannot merge data from events {[m.event_number for m in msgs]}"
        all_stream_names = [stream for m in msgs for stream in m.streams.keys()]
        assert len(all_stream_names) == len(
            set(all_stream_names)
        ), "Cannot merge data with duplicate streams"

        ret = EventData(event_number=msgs[0].event_number, streams={})
        for msg in msgs:
            ret.streams.update(msg.streams)
        return ret

from_internals(msgs) classmethod

Helper function to assemble an event from the internal messages received from the ingesters. This is a factory method

Parameters:

Name Type Description Default
msgs list[InternalWorkerMessage]

Internal messages each containing a subset of the streams for the event

required

Returns:

Type Description
EventData

A new object with all data combined.

Source code in dranspose/event.py
@classmethod
def from_internals(cls, msgs: list[InternalWorkerMessage]) -> "EventData":
    """
    Helper function to assemble an event from the internal messages received from the ingesters.
    This is a factory method

    Args:
         msgs: Internal messages each containing a subset of the streams for the event

    Returns:
        A new object with all data combined.
    """
    assert len(msgs) > 0, "merge at least one message"
    assert (
        len(set([m.event_number for m in msgs])) == 1
    ), f"Cannot merge data from events {[m.event_number for m in msgs]}"
    all_stream_names = [stream for m in msgs for stream in m.streams.keys()]
    assert len(all_stream_names) == len(
        set(all_stream_names)
    ), "Cannot merge data with duplicate streams"

    ret = EventData(event_number=msgs[0].event_number, streams={})
    for msg in msgs:
        ret.streams.update(msg.streams)
    return ret

InternalWorkerMessage

Bases: BaseModel

A container for partial events which carries one or more streams. This is the message between ingesters and workers.

Attributes:

Name Type Description
event_number EventNumber

event number

streams dict[StreamName, StreamData]

one or more streams from an ingester

Source code in dranspose/event.py
class InternalWorkerMessage(BaseModel):
    """
    A container for partial events which carries one or more streams. This is the message between ingesters and workers.

    Attributes:
        event_number: event number
        streams: one or more streams from an ingester
    """

    event_number: EventNumber
    streams: dict[StreamName, StreamData] = {}

    def get_all_frames(self) -> list[zmq.Frame | bytes]:
        return [frame for stream in self.streams.values() for frame in stream.frames]

ResultData

Bases: BaseModel

Container for transferring results from the worker to the reducer. In enhances the pure payload with useful meta data

Attributes:

Name Type Description
event_number EventNumber

which event the result belongs to. NB: results may arrive out of order

worker WorkerName

which worker processed it.

parameters_hash Optional[Digest]

which version of parameters was used to process the event

payload Any

the data return from the custom worker function: process_event

Source code in dranspose/event.py
class ResultData(BaseModel):
    """
    Container for transferring results from the worker to the reducer. In enhances the pure payload with useful meta data

    Attributes:
        event_number: which event the result belongs to. NB: results may arrive out of order
        worker: which worker processed it.
        parameters_hash: which version of parameters was used to process the event
        payload: the data return from the custom worker function: process_event
    """

    event_number: EventNumber
    worker: WorkerName
    parameters_hash: Optional[Digest]
    payload: Any

StreamData

Bases: BaseModel

Data container for a single stream with all zmq frames belonging to it

Attributes:

Name Type Description
typ str

arbitrary typ set by the ingester to common parsing

frames list[Frame] | list[bytes]

all frames received for this event for the stream

Source code in dranspose/event.py
class StreamData(BaseModel):
    """
    Data container for a single stream with all zmq frames belonging to it

    Attributes:
         typ: arbitrary typ set by the ingester to common parsing
         frames: all frames received for this event for the stream
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    typ: str
    frames: list[zmq.Frame] | list[bytes]

    @computed_field  # type: ignore[misc]
    @property
    def length(self) -> int:
        """
        Calculates the length of frames.

        Returns:
             length of frames
        """
        return len(self.frames)

    def get_bytes(self) -> "StreamData":
        """
        Copies the data from the zmq buffer

        Returns:
             An object with a list of bytes.
        """
        return StreamData(
            typ=self.typ,
            frames=[
                frame.bytes if isinstance(frame, zmq.Frame) else frame
                for frame in self.frames
            ],
        )

length: int property

Calculates the length of frames.

Returns:

Type Description
int

length of frames

get_bytes()

Copies the data from the zmq buffer

Returns:

Type Description
StreamData

An object with a list of bytes.

Source code in dranspose/event.py
def get_bytes(self) -> "StreamData":
    """
    Copies the data from the zmq buffer

    Returns:
         An object with a list of bytes.
    """
    return StreamData(
        typ=self.typ,
        frames=[
            frame.bytes if isinstance(frame, zmq.Frame) else frame
            for frame in self.frames
        ],
    )

EventNumber = NewType('EventNumber', int) module-attribute

strongly typed event number (int)

StreamName = NewType('StreamName', str) module-attribute

strongly typed stream name (str)

VirtualConstraint = NewType('VirtualConstraint', int) module-attribute

Stronly typed constraint for workers (int)

WorkerTag = NewType('WorkerTag', _WorkerTagT) module-attribute

Strongly typed worker tag (str)

VirtualWorker

Bases: BaseModel

virtual worker with a number and tags

Attributes:

Name Type Description
tags set[WorkerTag]

set of tags which a worker must have to get this event

constraint Optional[VirtualConstraint]

a VirtualConstraint to which worker this event should be delivered, if None, deliver to all workers with matching tags

Source code in dranspose/protocol.py
class VirtualWorker(BaseModel):
    """
    virtual worker with a number and tags

    Attributes:
        tags: set of tags which a worker must have to get this event
        constraint: a VirtualConstraint to which worker this event should be delivered, if None, deliver to all workers with matching tags
    """

    tags: set[WorkerTag] = {GENERIC_WORKER}
    constraint: Optional[VirtualConstraint] = None