Source code for noob.event
import sys
from enum import StrEnum
from typing import Annotated as A
from typing import Any, Generic, Literal
from pydantic import AfterValidator, Discriminator, Tag, TypeAdapter
from noob.const import META_SIGNAL
from noob.types import Epoch, Picklable, SerializableDatetime
if sys.version_info < (3, 12):
from typing_extensions import TypedDict, TypeVar
elif sys.version_info < (3, 13):
from typing import TypedDict
from typing_extensions import TypeVar
else:
from typing import TypedDict, TypeVar
_TEvent = TypeVar("_TEvent", default=Any)
[docs]
class Event(TypedDict, Generic[_TEvent]):
"""
Container for a single value returned from a single :meth:`.Node.process` call
"""
id: int
"""Unique ID for each event"""
timestamp: SerializableDatetime
"""Timestamp of when the event was received by the :class:`.TubeRunner`"""
node_id: str
"""ID of node that emitted the event"""
signal: str
"""name of the signal that emitted the event"""
epoch: Epoch
"""Epoch number the event was emitted in"""
value: Picklable[_TEvent]
"""Value emitted by the processing node"""
[docs]
def is_event(instance: Any) -> bool:
"""
TypedDicts don't support instancechecks by default,
we want to use typed dicts for perf's sake,
but we still also would like to check if something is an event
"""
if not isinstance(instance, dict):
return False
return all(key in instance for key in Event.__annotations__)
def _type_discriminator(v: dict | Event | MetaEvent) -> str:
if v.get("node_id", None) == "meta":
return "meta"
else:
return "event"
def _meta_signals_to_enum(evt: Event) -> Event:
"""If we are a meta signal, ensure the value is cast to the enum rather than string value"""
if (
isinstance(evt["value"], str)
and evt["value"].startswith(META_SIGNAL)
and (metaevt_key := evt["value"].replace(META_SIGNAL, "")) in MetaSignal.__members__
):
evt["value"] = MetaSignal.__members__[metaevt_key]
return evt
EventUnion = A[
A[
Event,
AfterValidator(_meta_signals_to_enum),
Tag("event"),
]
| A[MetaEvent, Tag("meta")],
Discriminator(_type_discriminator),
]
EventAdapter = TypeAdapter[EventUnion](EventUnion)