store¶
Tube runners for running tubes
- EventDict¶
A nested dictionary to store events for rapid access (vs. the old implementation which was just a big list to filter).
Stores by epoch, node_id, and signal, like events = {‘epoch’: {‘node_id’: {‘signal’: […]}}}
Should be made with a defaultdict to avoid annoying nested indexing problems
alias of
dict[Epoch,dict[Annotated[str,AfterValidator(func=_is_identifier),AfterValidator(func=_not_reserved)],dict[Annotated[str,AfterValidator(func=_is_identifier)],list[Event]]]]
- class EventStore(events: dict[~noob.types.Epoch, dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier)], list[~noob.event.Event]]]] = <factory>, counter: ~itertools.count = <factory>, _event_condition: ~threading.Condition = <factory>, _subepochs: dict[~noob.types.Epoch, set[~noob.types.Epoch]] = <factory>)[source]¶
Container class for storing and retrieving events by node and slot
- events: dict[Epoch, dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], dict[Annotated[str, AfterValidator(func=_is_identifier)], list[Event]]]]¶
- counter: count¶
- add(event: Event) Event[source]¶
Add an existing event to the store, returning it.
Mostly an abstraction layer to give ourselves room above the events list in cast we want to change the internal implementation of how events are stored
- add_value(signals: list[Signal], value: Any, node_id: str, epoch: Epoch) list[Event][source]¶
Add the result of a
Node.process()call to the event store.Split the dictionary of values into separate
Events, store along with current timestamp.Note
Nodes can emit explicit, pre-created event objects, either a single
Eventor aSequenceof events. These events are passed through as-is rather than being wrapped in new events.This is an “advanced” feature and can cause unpredictable behavior if the emitted events have incorrect epochs, signals, etc.
- Parameters:
signals (
list[Signal]) – Signals from which the value was emitted by aNode.process()callvalue (
Any) – Value emitted by aNode.process()call. Gets wrapped with a list in case the length of signals is 1. Otherwise, it’s zipped with :signals:node_id (
str) – ID of the node that emitted the eventsepoch (
Epoch) – Epoch count that the signal was emitted in
- get(node_id: str, signal: str, epoch: Epoch | Literal[-1]) Event[source]¶
Get the event with the matching node_id and signal name from a given epoch.
If epoch is -1, return the most recent event.
- Raises:
KeyError – if no event with the matching node_id and signal name exists
- collect(edges: list[Edge], epoch: Epoch | Literal[-1], eventmap: str | None = None) dict | None[source]¶
Gather events into a form that can be consumed by a
Node.process()method, given the collection of inbound edges (usually fromTube.in_edges()).If none of the requested events have been emitted, return
None.If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present, exclude the keys from the returned dict (None is a valid value for an event, so if a key is present and the value is None, the event was emitted with a value of None)
If epoch is -1, get the the events from the most recent epoch where all events are present, and if no epochs are present with a full set of events, return None
- Parameters:
eventmap (
str) – If present, return anEventMapin this key
- collect_events(edges: list[Edge], epoch: Epoch | Literal[-1]) list[Event] | None[source]¶
Collect the event objects from a set of dependencies indicated by edges in a given epoch.
If none of the requested events are present, return None
If some of the requested events but others are present, return an incomplete list.
- Parameters:
edges (
list[Edge]) – List of edges from which to collect eventsepoch (
int) – Epoch to select from, if -1, get the latest complete set of events.
- clear(epoch: Epoch | None = None) None[source]¶
Clear events for a specific or all epochs.
Does not reset the counter (to continue giving unique ids to the next round’s events)
- static transform_events(edges: list[Edge], events: list[Event], as_events: Literal[False] = False) dict[source]¶
- static transform_events(edges: list[Edge], events: list[Event], as_events: Literal[True] = True) dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Event]
Transform the values of a set of events to a dict that can be consumed by the target node’s process method.
- i.e.: return a dictionary whose keys are the
target_signals of the edges using the
valueof the matching event.
- i.e.: return a dictionary whose keys are the