storage¶
alchemiscale.storage.models
— data models for storage components¶
- class alchemiscale.storage.models.ComputeServiceRegistration(*args: Any, **kwargs: Any)¶
Bases:
BaseModel
Registration for AlchemiscaleComputeService instances.
- class alchemiscale.storage.models.ObjectStoreRef(*args: Any, **kwargs: Any)¶
Bases:
GufeTokenizable
- class alchemiscale.storage.models.ProtocolDAGResultRef(*args: Any, **kwargs: Any)¶
Bases:
ObjectStoreRef
- class alchemiscale.storage.models.Task(*args: Any, **kwargs: Any)¶
Bases:
GufeTokenizable
A Task that can be used to generate a ProtocolDAG on a compute node.
- status¶
Status of the task.
- datetime_created¶
- Type:
datetime.datetime | None
- class alchemiscale.storage.models.TaskArchive(*args: Any, **kwargs: Any)¶
Bases:
GufeTokenizable
- class alchemiscale.storage.models.TaskHub(*args: Any, **kwargs: Any)¶
Bases:
GufeTokenizable
- network¶
ScopedKey of the AlchemicalNetwork this TaskHub corresponds to. Used to ensure that there is only one TaskHub for a given AlchemicalNetwork using neo4j constraints.
- Type:
- weight¶
Value between 0.0 and 1.0 giving the weight of this TaskHub. This number is used to allocate attention to this TaskHub relative to others by a ComputeService. TaskHub with equal weight will be given equal attention; a TaskHub with greater weight than another will receive more attention.
Setting the weight to 0.0 will give the TaskHub no attention, effectively disabling it.
- Type:
alchemiscale.storage.objectstore
— object store interface¶
- class alchemiscale.storage.objectstore.S3ObjectStore(session: boto3.Session, bucket: str, prefix: str, endpoint_url=None)¶
Bases:
object
Object storage for use with AWS S3.
- check()¶
Check consistency of object store.
- initialize()¶
Initialize object store.
Creates bucket if it does not exist.
- iter_contents(prefix='')¶
Iterate over the labels in this storage.
- pull_protocoldagresult(protocoldagresult: ScopedKey | None = None, transformation: ScopedKey | None = None, location: str | None = None, return_as='gufe', ok=True) gufe.protocols.ProtocolDAGResult | dict | str ¶
Pull the ProtocolDAGResult corresponding to the given ProtocolDAGResultRef.
- Parameters:
protocoldagresult – ScopedKey for ProtocolDAGResult in the object store. Must be provided if location is
None
.transformation – The ScopedKey of the Transformation this ProtocolDAGResult corresponds to. Must be provided if location is
None
.location – The full path in the object store to the ProtocolDAGResult. If provided, this will be used to retrieve it.
return_as (['gufe', 'dict', 'json']) – Form in which to return result; this is provided to avoid unnecessary deserializations where desired.
- Returns:
The ProtocolDAGResult corresponding to the given ProtocolDAGResultRef.
- Return type:
ProtocolDAGResult
- push_protocoldagresult(protocoldagresult: gufe.protocols.ProtocolDAGResult, transformation: ScopedKey, creator: str | None = None) ProtocolDAGResultRef ¶
Push given ProtocolDAGResult to this ObjectStore.
- Parameters:
protocoldagresult – ProtocolDAGResult to store.
transformation – The ScopedKey of the Transformation this ProtocolDAGResult corresponds to.
- Returns:
Reference to the serialized ProtocolDAGResult in the object store.
- Return type:
- reset()¶
Remove all data from object store.
Deletes all objects, including the bucket itself.
- alchemiscale.storage.objectstore.get_s3os(settings: S3ObjectStoreSettings, endpoint_url=None) S3ObjectStore ¶
Convenience function for getting an S3ObjectStore directly from settings.
alchemiscale.storage.statestore
— state store interface¶
- class alchemiscale.storage.statestore.Neo4jStore(graph: py2neo.Graph)¶
Bases:
AlchemiscaleStateStore
- action_tasks(tasks: List[ScopedKey], taskhub: ScopedKey) List[ScopedKey | None] ¶
Add Tasks to the TaskHub for a given AlchemicalNetwork.
Note: the Tasks must be within the same scope as the AlchemicalNetwork, and must correspond to a Transformation in the AlchemicalNetwork.
A given compute task can be represented in any number of AlchemicalNetwork TaskHubs, or none at all.
Only Tasks with status ‘waiting’, ‘running’, or ‘error’ can be actioned.
- add_scope(identifier: str, cls: type[alchemiscale.security.models.CredentialedEntity], scope: Scope)¶
Add a scope to the given entity.
- cancel_tasks(tasks: List[ScopedKey], taskhub: ScopedKey) List[ScopedKey | None] ¶
Remove Tasks from the TaskHub for a given AlchemicalNetwork.
Note: Tasks must be within the same scope as the AlchemicalNetwork.
A given Task can be represented in many AlchemicalNetwork TaskHubs, or none at all.
- check()¶
Check consistency of database.
Will raise Neo4JStoreError if any state check fails. If no check fails, will return without any exception.
- claim_taskhub_tasks(taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1) List[ScopedKey | None] ¶
Claim a TaskHub Task.
This method will claim Tasks from a TaskHub according to the following process:
waiting Tasks with the highest priority are selected for consideration.
Tasks with an EXTENDS relationship to an incomplete Task are dropped from consideration.
Of those that remain, a Task is claimed stochastically based on the weight of its ACTIONS relationship with the TaskHub.
This process is repeated until count Tasks have been claimed. If no Task is available, then None is given in its place.
- Parameters:
compute_service_id – Unique identifier for the compute service claiming the Tasks for execution.
count – Claim the given number of Tasks in a single transaction.
- create_credentialed_entity(entity: CredentialedEntity)¶
Create a new credentialed entity, such as a user or compute identity.
If an entity of this type with the same identifier already exists, then this will overwrite its properties, including credential.
- create_network(network: gufe.AlchemicalNetwork, scope: Scope)¶
Add an AlchemicalNetwork to the target neo4j database, even if some of its components already exist in the database.
- create_task(transformation: ScopedKey, extends: ScopedKey | None = None, creator: str | None = None) ScopedKey ¶
Add a compute Task to a Transformation.
Note: this creates a compute Task, but does not add it to any TaskHubs.
- Parameters:
transformation – The Transformation to compute.
scope – The scope the Transformation is in; ignored if transformation is a ScopedKey.
extends – The ScopedKey of the Task to use as a starting point for this Task. Will use the ProtocolDAGResult from the given Task as the extends input for the Task’s eventual call to Protocol.create.
- create_taskhub(network: ScopedKey) ScopedKey ¶
Create a TaskHub for the given AlchemicalNetwork.
An AlchemicalNetwork can have only one associated TaskHub. A TaskHub is required to action Tasks for a given AlchemicalNetwork.
This method will only create a TaskHub for an AlchemicalNetwork if it doesn’t already exist; it will return the scoped key for the TaskHub either way.
- delete_network(network: ScopedKey) ScopedKey ¶
Delete the given AlchemicalNetwork from the database.
This will not remove any Transformations or ChemicalSystems associated with the AlchemicalNetwork, since these may be associated with other AlchemicalNetwork`s in the same `Scope.
- delete_task(task: ScopedKey) Task ¶
Remove a compute Task from a Transformation.
This will also remove the Task from all TaskHubs it is a part of.
This method is intended for administrator use; generally Tasks should instead have their tasks set to ‘deleted’ and retained.
- deregister_computeservice(compute_service_id: ComputeServiceID)¶
Remove the registration for the given ComputeServiceID from the state store.
This wil remove the ComputeServiceRegistration node, and all its CLAIMS relationships to Tasks.
All Tasks with CLAIMS relationships to the ComputeServiceRegistration and with status running will have their status set to waiting.
- expire_registrations(expire_time: datetime)¶
Remove all registrations with last heartbeat prior to the given expire_time.
- get_chemicalsystem_networks(chemicalsystem: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for AlchemicalNetworks associated with the given ChemicalSystem.
- get_chemicalsystem_transformations(chemicalsystem: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for the Transformations associated with the given ChemicalSystem.
- get_credentialed_entity(identifier: str, cls: type[alchemiscale.security.models.CredentialedEntity])¶
Get an existing credentialed entity, such as a user or compute identity.
- get_network_chemicalsystems(network: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for ChemicalSystems associated with the given AlchemicalNetwork.
- get_network_status(network: ScopedKey) Dict[str, int] ¶
Return status counts for all Tasks associated with the given AlchemicalNetwork.
- get_network_tasks(network: ScopedKey, status: TaskStatusEnum | None = None) List[ScopedKey] ¶
List ScopedKeys for all Tasks associated with the given AlchemicalNetwork.
- get_network_transformations(network: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for Transformations associated with the given AlchemicalNetwork.
- get_scope_status(scope: Scope) Dict[str, int] ¶
Return status counts for all Tasks within the given Scope.
- get_task_networks(task: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for AlchemicalNetworks associated with the given Task.
- get_task_status(tasks: List[ScopedKey]) List[TaskStatusEnum] ¶
Get the status of a list of Tasks.
- Parameters:
tasks – The list of Tasks to get the status for.
- Returns:
A dictionary of Tasks and their statuses.
- Return type:
Dict[ScopedKey,TaskStatusEnum]
- get_task_transformation(task: ScopedKey, return_gufe=True) Tuple[gufe.Transformation, ProtocolDAGResultRef | None] | Tuple[ScopedKey, ScopedKey | None] ¶
Get the Transformation and ProtocolDAGResultRef to extend from (if present) for the given Task.
If return_gufe is True, returns actual Transformation and ProtocolDAGResultRef object (None if not present); if False, returns ScopedKeys for these instead.
- get_task_weights(tasks: List[ScopedKey], taskhub: ScopedKey) List[float | None] ¶
Get weights for the ACTIONS relationship between a TaskHub and a Task.
- Parameters:
tasks – The ScopedKeys of the Tasks to get the weights for.
taskhub – The ScopedKey of the TaskHub associated with the Tasks.
- Returns:
Weights for the list of Tasks, in the same order.
- Return type:
weights
- get_taskhub(network: ScopedKey, return_gufe: bool = False) ScopedKey | TaskHub ¶
Get the TaskHub for the given AlchemicalNetwork.
- Parameters:
return_gufe – If True, return a TaskHub instance. Otherwise, return a ScopedKey.
- get_taskhub_tasks(taskhub: ScopedKey, return_gufe=False) List[ScopedKey] | Dict[ScopedKey, Task] ¶
Get a list of Tasks on the TaskHub.
- get_taskhub_unclaimed_tasks(taskhub: ScopedKey, return_gufe=False) List[ScopedKey] | Dict[ScopedKey, Task] ¶
Get a list of unclaimed Tasks in the TaskHub.
- get_transformation_chemicalsystems(transformation: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for the ChemicalSystems associated with the given Transformation.
- get_transformation_networks(transformation: ScopedKey) List[ScopedKey] ¶
List ScopedKeys for AlchemicalNetworks associated with the given Transformation.
- get_transformation_status(transformation: ScopedKey) Dict[str, int] ¶
Return status counts for all Tasks associated with the given Transformation.
- get_transformation_tasks(transformation: ScopedKey, extends: ScopedKey | None = None, return_as: str = 'list', status: TaskStatusEnum | None = None) List[ScopedKey] | Dict[ScopedKey, ScopedKey | None] ¶
Get all Tasks that perform the given Transformation.
If a Task ScopedKey is given for extends, then only those Tasks that follow via any number of EXTENDS relationships will be returned.
return_as takes either list or graph as input. graph will yield a dict mapping each Task’s ScopedKey (as keys) to the Task ScopedKey it extends (as values).
- Parameters:
transformation – ScopedKey of the Transformation to retrieve Tasks for.
extends –
- heartbeat_computeservice(compute_service_id: ComputeServiceID, heartbeat: datetime)¶
Update the heartbeat for the given ComputeServiceID.
- initialize()¶
Initialize database.
Ensures that constraints and any other required structures are in place. Should be used on any Neo4j database prior to use for Alchemiscale.
- list_credentialed_entities(cls: type[alchemiscale.security.models.CredentialedEntity])¶
Get an existing credentialed entity, such as a user or compute identity.
- list_scopes(identifier: str, cls: type[alchemiscale.security.models.CredentialedEntity]) List[Scope] ¶
List all scopes for which the given entity has access.
- query_chemicalsystems(*, name=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)¶
Query for ChemicalSystems matching given attributes.
- query_networks(*, name=None, key=None, scope: ~alchemiscale.models.Scope | None = <Scope('*-*-*')>, return_gufe: bool = False)¶
Query for AlchemicalNetworks matching given attributes.
- query_taskhubs(scope: ~alchemiscale.models.Scope | None = <Scope('*-*-*')>, return_gufe: bool = False) List[ScopedKey] | Dict[ScopedKey, TaskHub] ¶
Query for TaskHubs matching the given criteria.
- Parameters:
return_gufe – If True, return a dict with ScopedKey`s as keys, `TaskHub instances as values. Otherwise, return a list of ScopedKeys.
- query_tasks(*, status=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)¶
Query for Tasks matching given attributes.
- query_transformations(*, name=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)¶
Query for Transformations matching given attributes.
- register_computeservice(compute_service_registration: ComputeServiceRegistration)¶
Register a ComputeServiceRegistration uniquely identifying a running ComputeService.
A ComputeServiceRegistration node is used for CLAIMS relationships on Tasks to avoid collisions in Task execution.
- remove_credentialed_identity(identifier: str, cls: type[alchemiscale.security.models.CredentialedEntity])¶
Remove a credentialed entity, such as a user or compute identity.
- remove_scope(identifier: str, cls: type[alchemiscale.security.models.CredentialedEntity], scope: Scope)¶
Remove a scope from the given entity.
- reset()¶
Remove all data from database; undo all components in initialize.
- set_strategy(strategy: Strategy, network: ScopedKey) ScopedKey ¶
Set the compute Strategy for the given AlchemicalNetwork.
- set_task_complete(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to complete.
Only running Tasks can be set to complete.
- set_task_deleted(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to deleted.
Any Task can be set to deleted; a deleted Task cannot change to any other status.
- set_task_error(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to error.
Only running Tasks can be set to error.
- set_task_invalid(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to invalid.
Any Task can be set to invalid; an invalid Task cannot change to any other status.
- set_task_result(task: ScopedKey, protocoldagresultref: ProtocolDAGResultRef) ScopedKey ¶
Set a ProtocolDAGResultRef pointing to a ProtocolDAGResult for the given Task.
- set_task_running(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to running.
Only Tasks with status waiting can be set to running.
- set_task_status(tasks: List[ScopedKey], status: TaskStatusEnum, raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks.
This is a master method that calls the appropriate method for the status.
- Parameters:
tasks – The list of Tasks to set the status of.
status – The status to set the Task to.
raise_error – If True, raise a ValueError if the status of a given Task cannot be changed.
- Returns:
A list of the Task ScopedKeys for which status was changed; None is given for any Tasks for which the status could not be changed.
- Return type:
List[Optional[ScopedKey]]
- set_task_waiting(tasks: List[ScopedKey], raise_error: bool = False) List[ScopedKey | None] ¶
Set the status of a list of Tasks to waiting.
Only Tasks with status error or running can be set to waiting.
- set_task_weights(tasks: Dict[ScopedKey, float] | List[ScopedKey], taskhub: ScopedKey, weight: float | None = None) List[ScopedKey | None] ¶
Sets weights for the ACTIONS relationship between a TaskHub and a Task.
This is used to set the relative probabilistic execution order of a Task in a TaskHub. Note that this concept is orthogonal to priority in that tasks of higher priority will be executed before tasks of lower priority, but tasks of the same priority will be distributed according to their weights.
The weights can be set by either a list and a scalar, or a dict of {ScopedKey: weight} pairs.
Must be called after action_tasks to have any effect; otherwise, the TaskHub will not have an ACTIONS relationship to the Task.
- Parameters:
tasks (Union[Dict[ScopedKey, float], List[ScopedKey]]) – If a dict, the keys are the ScopedKeys of the Tasks, and the values are the weights. If a list, the weights are set to the scalar value given by the weight argument.
taskhub (ScopedKey) – The ScopedKey of the TaskHub associated with the Tasks.
weight (Optional[float]) – If tasks is a list, this is the weight to set for each Task.
- Returns:
A list of ScopedKeys for each Task whose weight was set. None is given for Tasks that weight was not set for; this could be because the TaskHub doesn’t have an ACTIONS relationship with the Task, or the Task doesn’t exist at all
- Return type:
List[ScopedKey, None]
- set_taskhub_weight(network: ScopedKey, weight: float)¶
Set the weight for the TaskHub associated with the given AlchemicalNetwork.
- set_tasks(transformation: ScopedKey, extends: Task | None = None, count: int = 1) ScopedKey ¶
Set a fixed number of Tasks against the given Transformation if not already present.
Note: Tasks created by this method are not added to any TaskHubs.
- Parameters:
transformation – The Transformation to compute.
scope – The scope the Transformation is in; ignored if transformation is a ScopedKey.
extends – The Task to use as a starting point for this Task. Will use the ProtocolDAGResult from the given Task as the extends input for the Task’s eventual call to Protocol.create.
count – The total number of tasks that should exist corresponding to the specified transformation, scope, and extends.
- transaction(readonly=False, ignore_exceptions=False) py2neo.database.Transaction ¶
Context manager for a py2neo Transaction.
- alchemiscale.storage.statestore.get_n4js(settings: Neo4jStoreSettings)¶
Convenience function for getting a Neo4jStore directly from settings.