message

class MessageType(*values)[source]
announce = 'announce'
identify = 'identify'
process = 'process'
init = 'init'
deinit = 'deinit'
ping = 'ping'
start = 'start'
status = 'status'
stop = 'stop'
event = 'event'
error = 'error'
epoch_ended = 'epoch_ended'
class NodeStatus(*values)[source]
stopped = 'stopped'

Node is deinitialized - does not have an instantiated node, etc., but is responsive.

waiting = 'waiting'

Node is waiting for its dependency nodes to be ready

ready = 'ready'

Node is ready to process events

running = 'running'

Node is running in free-run mode. Note that we do not update status for every process call at the moment, as that level of granularity is not relevant to the command node when sending commands

closed = 'closed'

Node is permanently gone, should not be expected to respond to further messages.

pydantic model Message[source]

Show JSON schema
{
   "title": "Message",
   "type": "object",
   "properties": {
      "type": {
         "$ref": "#/$defs/MessageType"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "default": null,
         "title": "Value"
      }
   },
   "$defs": {
      "MessageType": {
         "enum": [
            "announce",
            "identify",
            "process",
            "init",
            "deinit",
            "ping",
            "start",
            "status",
            "stop",
            "event",
            "error",
            "epoch_ended"
         ],
         "title": "MessageType",
         "type": "string"
      }
   },
   "required": [
      "type",
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field node_id: str [Required]
field timestamp: datetime [Optional]
field type_: MessageType [Required] (alias 'type')
field value: Any = None
classmethod from_bytes(msg: list[bytes]) Message[source]
to_bytes() bytes[source]
class IdentifyValue[source]
node_id: str
outbox: str
status: NodeStatus
signals: list[str] | None
slots: list[str] | None
class AnnounceValue[source]
inbox: str
nodes: dict[str, IdentifyValue]
class ErrorValue[source]
err_type: type[Exception]
err_args: tuple
traceback: str
class ProcessValue[source]
epoch: Epoch
input: dict | None
pydantic model AnnounceMsg[source]

Command node ‘announces’ identities of other peers and the events they emit

Show JSON schema
{
   "title": "AnnounceMsg",
   "description": "Command node 'announces' identities of other peers and the events they emit",
   "type": "object",
   "properties": {
      "type": {
         "const": "announce",
         "default": "announce",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "$ref": "#/$defs/AnnounceValue"
      }
   },
   "$defs": {
      "AnnounceValue": {
         "properties": {
            "inbox": {
               "title": "Inbox",
               "type": "string"
            },
            "nodes": {
               "additionalProperties": {
                  "$ref": "#/$defs/IdentifyValue"
               },
               "title": "Nodes",
               "type": "object"
            }
         },
         "required": [
            "inbox",
            "nodes"
         ],
         "title": "AnnounceValue",
         "type": "object"
      },
      "IdentifyValue": {
         "properties": {
            "node_id": {
               "title": "Node Id",
               "type": "string"
            },
            "outbox": {
               "title": "Outbox",
               "type": "string"
            },
            "status": {
               "$ref": "#/$defs/NodeStatus"
            },
            "signals": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Signals"
            },
            "slots": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Slots"
            }
         },
         "required": [
            "node_id",
            "outbox",
            "status",
            "signals",
            "slots"
         ],
         "title": "IdentifyValue",
         "type": "object"
      },
      "NodeStatus": {
         "enum": [
            "stopped",
            "waiting",
            "ready",
            "running",
            "closed"
         ],
         "title": "NodeStatus",
         "type": "string"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.announce] = MessageType.announce (alias 'type')
field value: AnnounceValue [Required]
pydantic model IdentifyMsg[source]

A node sends its configuration to the command node on initialization

Show JSON schema
{
   "title": "IdentifyMsg",
   "description": "A node sends its configuration to the command node on initialization",
   "type": "object",
   "properties": {
      "type": {
         "const": "identify",
         "default": "identify",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "$ref": "#/$defs/IdentifyValue"
      }
   },
   "$defs": {
      "IdentifyValue": {
         "properties": {
            "node_id": {
               "title": "Node Id",
               "type": "string"
            },
            "outbox": {
               "title": "Outbox",
               "type": "string"
            },
            "status": {
               "$ref": "#/$defs/NodeStatus"
            },
            "signals": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Signals"
            },
            "slots": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Slots"
            }
         },
         "required": [
            "node_id",
            "outbox",
            "status",
            "signals",
            "slots"
         ],
         "title": "IdentifyValue",
         "type": "object"
      },
      "NodeStatus": {
         "enum": [
            "stopped",
            "waiting",
            "ready",
            "running",
            "closed"
         ],
         "title": "NodeStatus",
         "type": "string"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.identify] = MessageType.identify (alias 'type')
field value: IdentifyValue [Required]
pydantic model PingMsg[source]

Request other nodes to identify themselves and report their status

Show JSON schema
{
   "title": "PingMsg",
   "description": "Request other nodes to identify themselves and report their status",
   "type": "object",
   "properties": {
      "type": {
         "const": "ping",
         "default": "ping",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "default": null,
         "title": "Value",
         "type": "null"
      }
   },
   "required": [
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.ping] = MessageType.ping (alias 'type')
field value: None = None
pydantic model ProcessMsg[source]

Process a single iteration of the graph

Show JSON schema
{
   "title": "ProcessMsg",
   "description": "Process a single iteration of the graph",
   "type": "object",
   "properties": {
      "type": {
         "const": "process",
         "default": "process",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "$ref": "#/$defs/ProcessValue"
      }
   },
   "$defs": {
      "EpochSegment": {
         "maxItems": 2,
         "minItems": 2,
         "prefixItems": [
            {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "const": "tube",
                     "type": "string"
                  }
               ],
               "title": "Node Id"
            },
            {
               "title": "Epoch",
               "type": "integer"
            }
         ],
         "type": "array"
      },
      "ProcessValue": {
         "properties": {
            "epoch": {
               "items": {
                  "$ref": "#/$defs/EpochSegment"
               },
               "title": "Epoch",
               "type": "array"
            },
            "input": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Input"
            }
         },
         "required": [
            "epoch",
            "input"
         ],
         "title": "ProcessValue",
         "type": "object"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.process] = MessageType.process (alias 'type')
field value: ProcessValue [Required]

Any process-scoped input passed to the process call

pydantic model InitMsg[source]

Initialize nodes within node runners

Show JSON schema
{
   "title": "InitMsg",
   "description": "Initialize nodes within node runners",
   "type": "object",
   "properties": {
      "type": {
         "const": "init",
         "default": "init",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "default": null,
         "title": "Value",
         "type": "null"
      }
   },
   "required": [
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.init] = MessageType.init (alias 'type')
field value: None = None
pydantic model DeinitMsg[source]

Deinitializes nodes within node runners

Show JSON schema
{
   "title": "DeinitMsg",
   "description": "Deinitializes nodes within node runners",
   "type": "object",
   "properties": {
      "type": {
         "const": "deinit",
         "default": "deinit",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "default": null,
         "title": "Value",
         "type": "null"
      }
   },
   "required": [
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.deinit] = MessageType.deinit (alias 'type')
field value: None = None
pydantic model StartMsg[source]

Start free-running nodes

Show JSON schema
{
   "title": "StartMsg",
   "description": "Start free-running nodes",
   "type": "object",
   "properties": {
      "type": {
         "const": "start",
         "default": "start",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Value"
      }
   },
   "required": [
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.start] = MessageType.start (alias 'type')
field value: int | None = None
pydantic model StatusMsg[source]

Node updating its current status

Show JSON schema
{
   "title": "StatusMsg",
   "description": "Node updating its current status",
   "type": "object",
   "properties": {
      "type": {
         "const": "status",
         "default": "status",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "$ref": "#/$defs/NodeStatus"
      }
   },
   "$defs": {
      "NodeStatus": {
         "enum": [
            "stopped",
            "waiting",
            "ready",
            "running",
            "closed"
         ],
         "title": "NodeStatus",
         "type": "string"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.status] = MessageType.status (alias 'type')
field value: NodeStatus [Required]
pydantic model StopMsg[source]

Stop processing

Show JSON schema
{
   "title": "StopMsg",
   "description": "Stop processing",
   "type": "object",
   "properties": {
      "type": {
         "const": "stop",
         "default": "stop",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "default": null,
         "title": "Value",
         "type": "null"
      }
   },
   "required": [
      "node_id"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.stop] = MessageType.stop (alias 'type')
field value: None = None
pydantic model ErrorMsg[source]

An error occurred in one of the processing nodes

Show JSON schema
{
   "title": "ErrorMsg",
   "description": "An error occurred in one of the processing nodes",
   "type": "object",
   "properties": {
      "type": {
         "const": "error",
         "default": "error",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "$ref": "#/$defs/ErrorValue"
      }
   },
   "$defs": {
      "ErrorValue": {
         "properties": {
            "err_type": {
               "title": "Err Type"
            },
            "err_args": {
               "items": {},
               "title": "Err Args",
               "type": "array"
            },
            "traceback": {
               "title": "Traceback",
               "type": "string"
            }
         },
         "required": [
            "err_type",
            "err_args",
            "traceback"
         ],
         "title": "ErrorValue",
         "type": "object"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

  • arbitrary_types_allowed: bool = True

Fields:
field type_: Literal[MessageType.error] = MessageType.error (alias 'type')
field value: Annotated[ErrorValue, BeforeValidator(func=_from_jsonable_pickle, json_schema_input_type=PydanticUndefined), WrapSerializer(func=_to_jsonable_pickle, return_type=PydanticUndefined, when_used=json)] [Required]
Constraints:
  • func = <function _to_jsonable_pickle at 0x71cb35410e00>

  • json_schema_input_type = PydanticUndefined

  • return_type = PydanticUndefined

  • when_used = json

to_exception() Exception[source]
pydantic model EpochEndedMsg[source]

Command node is signaling that an epoch has been completed to all nodes, which don’t have a complete picture of the tube’s state.

Show JSON schema
{
   "title": "EpochEndedMsg",
   "description": "Command node is signaling that an epoch has been completed to all nodes,\nwhich don't have a complete picture of the tube's state.",
   "type": "object",
   "properties": {
      "type": {
         "const": "epoch_ended",
         "default": "epoch_ended",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "items": {
            "$ref": "#/$defs/EpochSegment"
         },
         "title": "Value",
         "type": "array"
      }
   },
   "$defs": {
      "EpochSegment": {
         "maxItems": 2,
         "minItems": 2,
         "prefixItems": [
            {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "const": "tube",
                     "type": "string"
                  }
               ],
               "title": "Node Id"
            },
            {
               "title": "Epoch",
               "type": "integer"
            }
         ],
         "type": "array"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.epoch_ended] = MessageType.epoch_ended (alias 'type')
field value: Epoch [Required]
pydantic model EventMsg[source]

Show JSON schema
{
   "title": "EventMsg",
   "type": "object",
   "properties": {
      "type": {
         "const": "event",
         "default": "event",
         "title": "Type",
         "type": "string"
      },
      "node_id": {
         "title": "Node Id",
         "type": "string"
      },
      "timestamp": {
         "format": "date-time",
         "title": "Timestamp",
         "type": "string"
      },
      "value": {
         "items": {
            "oneOf": [
               {
                  "$ref": "#/$defs/Event"
               },
               {
                  "$ref": "#/$defs/MetaEvent"
               }
            ]
         },
         "title": "Value",
         "type": "array"
      }
   },
   "$defs": {
      "EpochSegment": {
         "maxItems": 2,
         "minItems": 2,
         "prefixItems": [
            {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "const": "tube",
                     "type": "string"
                  }
               ],
               "title": "Node Id"
            },
            {
               "title": "Epoch",
               "type": "integer"
            }
         ],
         "type": "array"
      },
      "Event": {
         "description": "Container for a single value returned from a single :meth:`.Node.process` call",
         "properties": {
            "id": {
               "title": "Id",
               "type": "integer"
            },
            "timestamp": {
               "format": "date-time",
               "title": "Timestamp",
               "type": "string"
            },
            "node_id": {
               "title": "Node Id",
               "type": "string"
            },
            "signal": {
               "title": "Signal",
               "type": "string"
            },
            "epoch": {
               "items": {
                  "$ref": "#/$defs/EpochSegment"
               },
               "title": "Epoch",
               "type": "array"
            },
            "value": {
               "title": "Value"
            }
         },
         "required": [
            "id",
            "timestamp",
            "node_id",
            "signal",
            "epoch",
            "value"
         ],
         "title": "Event",
         "type": "object"
      },
      "MetaEvent": {
         "description": "All events generated by internal processes rather than nodes.\n\nUsed to coordinate the tube as well as allow code to hook into tube execution.\n\nThese are not stored in the :class:`.EventStore`,\nbut emitted by callbacks and consumed internally.\n\nSee :class:`.MetaEventType` for descriptions of the types of MetaEvents.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "integer"
            },
            "timestamp": {
               "format": "date-time",
               "title": "Timestamp",
               "type": "string"
            },
            "node_id": {
               "const": "meta",
               "title": "Node Id",
               "type": "string"
            },
            "signal": {
               "$ref": "#/$defs/MetaEventType"
            },
            "epoch": {
               "items": {
                  "$ref": "#/$defs/EpochSegment"
               },
               "title": "Epoch",
               "type": "array"
            },
            "value": {
               "title": "Value"
            }
         },
         "required": [
            "id",
            "timestamp",
            "node_id",
            "signal",
            "epoch",
            "value"
         ],
         "title": "MetaEvent",
         "type": "object"
      },
      "MetaEventType": {
         "description": "Types of meta events emitted by tubes, schedulers, stores, and runners.",
         "enum": [
            "NodeReady",
            "EpochEnded"
         ],
         "title": "MetaEventType",
         "type": "string"
      }
   },
   "required": [
      "node_id",
      "value"
   ]
}

Config:
  • use_enum_values: bool = True

  • validate_by_alias: bool = True

  • serialize_by_alias: bool = True

Fields:
field type_: Literal[MessageType.event] = MessageType.event (alias 'type')
field value: list[Annotated[Annotated[Event, AfterValidator(func=_meta_signals_to_enum), Tag(tag=event)] | Annotated[MetaEvent, Tag(tag=meta)], Discriminator(discriminator=_type_discriminator, custom_error_type=None, custom_error_message=None, custom_error_context=None)]] [Required]