scheduler

class Scheduler(nodes: dict[str, noob.node.spec.NodeSpecification], edges: list[noob.node.base.Edge], source_nodes: list[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x71cb353ca980>), AfterValidator(func=<function _not_reserved at 0x71cb354109a0>)]] = <factory>, _logger: logging.Logger = <factory>, _clock: itertools.count = <factory>, _epochs: dict[noob.types.Epoch, noob.toposort.TopoSorter] = <factory>, _subepochs: dict[noob.types.Epoch, set[noob.types.Epoch]] = <factory>, _epoch_log: collections.deque[int] = <factory>, _subgraphs: dict[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x71cb353ca980>), AfterValidator(func=<function _not_reserved at 0x71cb354109a0>)], tuple[dict[str, noob.node.spec.NodeSpecification], list[noob.node.base.Edge]]] = <factory>, _frozen_sorters: dict[tuple[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x71cb353ca980>), AfterValidator(func=<function _not_reserved at 0x71cb354109a0>)], ...], noob.toposort.TopoSorter] = <factory>)[source]
nodes: dict[str, NodeSpecification]
edges: list[Edge]
source_nodes: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]]
classmethod from_specification(nodes: dict[str, NodeSpecification], edges: list[Edge]) Self[source]

Create an instance of a Scheduler from NodeSpecification and Edge

property subepochs: dict[Epoch, set[Epoch]]
property graph_signals: set[tuple[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Annotated[str, AfterValidator(func=_is_identifier)]]][source]

The set of (node id, signal) tuples that are depended on in the graph.

Nodes can have many more signals than we actually care about for structuring the graph, this set is only the ones that we care about.

add_epoch(epoch: int | Epoch | None = None) Epoch[source]

Add another epoch with a prepared graph to the scheduler.

add_subepoch(epoch: Epoch) Epoch[source]

Add subepoch!

Creates a topo sorter with all the nodes downstream of the node that created the epoch.

is_active(epoch: Epoch | None = None) bool[source]

Graph remains active while it holds at least one epoch that is active.

get_ready(epoch: Epoch | None = None, node_id: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)] | None = None) list[MetaEvent][source]

Output the set of nodes that are ready across different epochs.

Parameters:
  • epoch (Epoch | None) – if an Epoch, get ready events for that epoch, if None , get ready events for all epochs.

  • node_id (str | None) – If present, only get ready events for a single node

node_is_ready(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: Epoch | None = None) bool[source]

Check if a single node is ready in a single or any epoch

Parameters:
  • node (NodeID) – the node to check

  • epoch (int | None) – the epoch to check, if None , any epoch

node_is_done(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: Epoch) bool[source]

Node is expired or done in specified epoch

sources_finished(epoch: Epoch | None = None) bool[source]

Check the source nodes of the given epoch have been processed. If epoch is None, check the source nodes of the latest epoch.

update(events: MutableSequence[Event | MetaEvent] | MutableSequence[Event]) MutableSequence[Event] | MutableSequence[Event | MetaEvent][source]

When a set of events are received, update the graphs within the scheduler. Currently only has TopoSorter.done() implemented.

done(epoch: Epoch, node_id: str, signal: Annotated[str, AfterValidator(func=_is_identifier)] | None = None, with_signals: bool = True) MetaEvent | None[source]

Mark a node in a given epoch as done.

Parameters:

with_signals (bool) – When marking this node as done, also mark all its signals as done.

expire(epoch: Epoch, node_id: str, signal: Annotated[str, AfterValidator(func=_is_identifier)] | None = None, with_signals: bool = True, unlock_optionals: bool = True) MetaEvent | None[source]

Mark a node as having been completed without making its dependent nodes ready. i.e. when the node emitted NoEvent

epoch_completed(epoch: Epoch) bool[source]

Check if the epoch has been completed.

end_epoch(epoch: Epoch | int | None = None) MetaEvent | None[source]
enable_node(node_id: str) None[source]

Enable edges attached to the node and the NodeSpecification enable switches to True

disable_node(node_id: str) None[source]

Disable edges attached to the node and the NodeSpecification enable switches to False

clear() None[source]

Remove epoch records, restarting the scheduler

has_cycle() bool[source]

Checks that the graph is acyclic.

generations() list[tuple[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)] | NodeSignal, ...]][source]

Get the topological generations of the graph: tuples for each set of nodes that can be run at the same time.

Order within a generation is not guaranteed to be stable.

asset_generations() dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], list[tuple[str, ...]]][source]

generations() except only including nodes with direct dependencies on assets, to determine when the asset should be initialized vs. received in the ZMQ Runner.

Packed in a dictionary with the asset ID as the key, and the value as the generations for that asset.

upstream_nodes(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]) set[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]][source]

All the nodes that have an effect on the given node

From: * Dependencies * If the node has optional dependencies, nodes whose NoEvents it should listen to