state¶
- pydantic model State[source]¶
A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube.
The
Statemodel is a container for a set of assets that are fully instantiated. It does not handle processing the assets – that is handled by a TubeRunner.Show JSON schema
{ "title": "State", "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.", "type": "object", "properties": { "assets": { "additionalProperties": { "$ref": "#/$defs/Asset" }, "title": "Assets", "type": "object" }, "dependencies": { "additionalProperties": { "$ref": "#/$defs/_AssetDependency" }, "title": "Dependencies", "type": "object" }, "scope_to_assets": { "additionalProperties": { "items": { "$ref": "#/$defs/Asset" }, "type": "array" }, "propertyNames": { "$ref": "#/$defs/AssetScope" }, "title": "Scope To Assets", "type": "object" }, "specs": { "additionalProperties": { "$ref": "#/$defs/AssetSpecification" }, "title": "Specs", "type": "object" }, "nocopy_deps": { "items": { "type": "string" }, "title": "Nocopy Deps", "type": "array", "uniqueItems": true } }, "$defs": { "Asset": { "additionalProperties": false, "description": "An asset within a processing tube.", "properties": { "id": { "title": "Id", "type": "string" }, "spec": { "$ref": "#/$defs/AssetSpecification" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "items": {}, "type": "array" } ], "title": "Params" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Depends" }, "obj": { "anyOf": [ {}, { "type": "null" } ], "default": null, "title": "Obj" }, "stored_at": { "default": [ [ "tube", -1 ] ], "items": { "$ref": "#/$defs/EpochSegment" }, "title": "Stored At", "type": "array" } }, "required": [ "id", "spec", "scope", "depends" ], "title": "Asset", "type": "object" }, "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "additionalProperties": false, "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "type", "scope" ], "title": "AssetSpecification", "type": "object" }, "EpochSegment": { "maxItems": 2, "minItems": 2, "prefixItems": [ { "anyOf": [ { "type": "string" }, { "const": "tube", "type": "string" } ], "title": "Node Id" }, { "title": "Epoch", "type": "integer" } ], "type": "array" }, "_AssetDependency": { "properties": { "asset_id": { "title": "Asset Id", "type": "string" }, "signal": { "title": "Signal", "type": "string" } }, "required": [ "asset_id", "signal" ], "title": "_AssetDependency", "type": "object" } } }
- Fields:
- field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Asset] [Optional]¶
- field dependencies: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], _AssetDependency] [Optional]¶
Map from node signals that assets depend on to the asset and signal ids. See
AssetSpecification.depends.Only those dependencies that require copying are included here (assets which are not used after the node that is depended on emits them don’t need to be copied to protect against mutation within the same epoch after they are stored).
- field nocopy_deps: set[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]] [Optional]¶
When we depend on updating an asset from a node, but nothing else in the tube depends on that signal, we don’t need to deepcopy the asset before storing it, since there’s no chance for it to be mutated after we store it. Store a set of the assets that don’t need to be copied!
- field scope_to_assets: dict[AssetScope, list[Asset]] [Optional]¶
Map from
AssetScopetoAssetto circumvent querying scope for each asset inState.init()andState.deinit()
- field specs: dict[str, AssetSpecification] [Optional]¶
- classmethod from_specification(specs: dict[str, AssetSpecification], edges: list[Edge] | None = None, input_collection: InputCollection | None = None) Self[source]¶
Instantiate a
Statemodel from its configuration- Parameters:
spec (
dict[str,AssetSpecification]) – theStateconfig to instantiateedges (
list[Edge] | None) – If present, edges for the whole graph, used to reduce copying for assets using dependencies to store values between epochs. If there are no other nodes that depend on the value that the asset depends on, then we don’t have to copy.
- collect(edges: list[Edge]) dict | None[source]¶
Gather events into a form that can be consumed by a
Node.process()method, given the collection of inbound edges (usually fromTube.in_edges()).If none of the requested events have been emitted, return
None.If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present, return
Nonefor any missing events.Todo
Add an example
- deinit(scope: AssetScope, edges: list[Edge] | None = None) None[source]¶
run
Asset.deinit()for assets that correspond to the given scope. Usually means thatAsset.objattribute is cleared to None.For
AssetScope.node, should provide the nodes edges to determine which assets to deinitialize, if any. If not passed, all node-scoped assets are deinitialized
- init(scope: AssetScope, edges: list[Edge] | None = None) None[source]¶
run
Asset.init()for assets that correspond to the given scope. Usually means thatAsset.objattribute gets populated.For
AssetScope.node, should provide the nodes edges to determine which assets to initialize, if any. If not passed, all node-scoped assets are initialized