storage

alchemiscale.storage.models — data models for storage components

class alchemiscale.storage.models.ComputeIDBase

Bases: str

class alchemiscale.storage.models.ComputeManagerID(_value)

Bases: ComputeIDBase

class alchemiscale.storage.models.ComputeManagerInstruction(*values)

Bases: StrEnum

class alchemiscale.storage.models.ComputeManagerRegistration(*args: Any, **kwargs: Any)

Bases: BaseModel

class alchemiscale.storage.models.ComputeManagerStatus(*values)

Bases: StrEnum

class alchemiscale.storage.models.ComputeServiceID

Bases: ComputeIDBase

class alchemiscale.storage.models.ComputeServiceRegistration(*args: Any, **kwargs: Any)

Bases: BaseModel

Registration for AlchemiscaleComputeService instances.

class alchemiscale.storage.models.Mark(target: ScopedKey)

Bases: GufeTokenizable

class alchemiscale.storage.models.NetworkMark(target: ScopedKey, state: str | NetworkStateEnum = NetworkStateEnum.active)

Bases: Mark

Mark object for AlchemicalNetworks.

network

ScopedKey of the AlchemicalNetwork this NetworkMark corresponds to. Used to ensure that there is only one NetworkMark for a given AlchemicalNetwork using neo4j constraints.

Type:

str

state

State of the AlchemicalNetwork, stored on this NetworkMark.

Type:

NetworkStateEnum

class alchemiscale.storage.models.NetworkStateEnum(*values)

Bases: Enum

class alchemiscale.storage.models.ObjectStoreRef(*, location: str = None, obj_key: GufeKey = None, scope: Scope)

Bases: GufeTokenizable

class alchemiscale.storage.models.ProtocolDAGResultRef(*, location: str | None = None, obj_key: GufeKey, scope: Scope, ok: bool, datetime_created: datetime | None = None, creator: str | None = None)

Bases: ObjectStoreRef

class alchemiscale.storage.models.StrategyModeEnum(*values)

Bases: StrEnum

class alchemiscale.storage.models.StrategyState(*args: Any, **kwargs: Any)

Bases: BaseModel

State information for a Strategy on an AlchemicalNetwork.

class alchemiscale.storage.models.StrategyStatusEnum(*values)

Bases: StrEnum

class alchemiscale.storage.models.StrategyTaskScalingEnum(*values)

Bases: StrEnum

class alchemiscale.storage.models.Task(*, status: str | TaskStatusEnum = TaskStatusEnum.waiting, priority: int = 10, datetime_created: datetime | None = None, creator: str | None = None, extends: str | None = None, claim: str | None = None, _key: str = None)

Bases: GufeTokenizable

A Task that can be used to generate a ProtocolDAG on a compute node.

status

Status of the task.

Type:

alchemiscale.storage.models.TaskStatusEnum

priority

Priority of the task; 1 is highest, larger values indicate lower priority.

Type:

int

claim

Identifier of the compute service that has a claim on this task.

Type:

str | None

datetime_created
Type:

datetime.datetime | None

class alchemiscale.storage.models.TaskArchive(*args, **kwargs)

Bases: GufeTokenizable

class alchemiscale.storage.models.TaskHub(network: ScopedKey, weight: int = 0.5)

Bases: GufeTokenizable

network

ScopedKey of the AlchemicalNetwork this TaskHub corresponds to. Used to ensure that there is only one TaskHub for a given AlchemicalNetwork using neo4j constraints.

Type:

str

weight

Value between 0.0 and 1.0 giving the weight of this TaskHub. This number is used to allocate attention to this TaskHub relative to others by a ComputeService. TaskHub with equal weight will be given equal attention; a TaskHub with greater weight than another will receive more attention.

Setting the weight to 0.0 will give the TaskHub no attention, effectively disabling it.

Type:

float

class alchemiscale.storage.models.TaskProvenance(*args: Any, **kwargs: Any)

Bases: BaseModel

class alchemiscale.storage.models.TaskRestartPattern(pattern: str, max_retries: int, taskhub_scoped_key: str | ScopedKey)

Bases: GufeTokenizable

A pattern to compare returned Task tracebacks to.

pattern

A regular expression pattern that can match to returned tracebacks of errored Tasks.

Type:

str

max_retries

The number of times the pattern can trigger a restart for a Task.

Type:

int

taskhub_sk

The TaskHub the pattern is bound to. This is needed to properly set a unique Gufe key.

Type:

str

class alchemiscale.storage.models.TaskStatusEnum(*values)

Bases: Enum

class alchemiscale.storage.models.Tracebacks(tracebacks: list[str], source_keys: list[GufeKey], failure_keys: list[GufeKey])

Bases: GufeTokenizable

tracebacks

The tracebacks returned with the ProtocolUnitFailures.

Type:

list[str]

source_keys

The GufeKeys of the ProtocolUnits that failed.

Type:

list[GufeKey]

failure_keys

The GufeKeys of the ProtocolUnitFailures.

Type:

list[GufeKey]


alchemiscale.storage.objectstore — object store interface

class alchemiscale.storage.objectstore.S3ObjectStore(settings: S3ObjectStoreSettings)

Bases: object

Object storage for use with AWS S3.

check()

Check consistency of object store.

initialize()

Initialize object store.

Creates bucket if it does not exist.

iter_contents(prefix='')

Iterate over the labels in this storage.

Parameters:

prefix (str) – Only iterate over paths that start with the given prefix.

Returns:

Contents of this storage, which may include items without metadata.

Return type:

Iterator[str]

pull_protocoldagresult(protocoldagresult: ScopedKey | None = None, transformation: ScopedKey | None = None, location: str | None = None, ok=True) bytes

Pull the ProtocolDAGResult corresponding to the given ProtocolDAGResultRef.

Parameters:
  • protocoldagresult – ScopedKey for ProtocolDAGResult in the object store. Must be provided if location is None.

  • transformation – The ScopedKey of the Transformation this ProtocolDAGResult corresponds to. Must be provided if location is None.

  • location – The full path in the object store to the ProtocolDAGResult. If provided, this will be used to retrieve it.

Returns:

The ProtocolDAGResult corresponding to the given ProtocolDAGResultRef, in a bytes representation.

Return type:

ProtocolDAGResult

push_protocoldagresult(protocoldagresult: bytes, protocoldagresult_ok: bool, protocoldagresult_gufekey: GufeKey, transformation: ScopedKey, creator: str | None = None) ProtocolDAGResultRef

Push given ProtocolDAGResult to this ObjectStore.

Parameters:
  • protocoldagresult – ProtocolDAGResult to store, in some bytes representation.

  • protocoldagresult_okTrue if ProtocolDAGResult completed successfully; False if failed.

  • protocoldagresult_gufekey – The GufeKey of the ProtocolDAGResult.

  • transformation – The ScopedKey of the Transformation this ProtocolDAGResult corresponds to.

Returns:

Reference to the serialized ProtocolDAGResult in the object store.

Return type:

ProtocolDAGResultRef

reset()

Remove all data from object store.

Deletes all objects, including the bucket itself.

exception alchemiscale.storage.objectstore.S3ObjectStoreError

Bases: Exception

alchemiscale.storage.objectstore.get_s3os(settings: S3ObjectStoreSettings) S3ObjectStore

Convenience function for getting an S3ObjectStore directly from settings.


alchemiscale.storage.statestore — state store interface

class alchemiscale.storage.statestore.AlchemiscaleStateStore

Bases: ABC

exception alchemiscale.storage.statestore.Neo4JStoreError

Bases: Exception

class alchemiscale.storage.statestore.Neo4jStore(settings: Neo4jStoreSettings)

Bases: AlchemiscaleStateStore

action_tasks(tasks: list[ScopedKey], taskhub: ScopedKey) list[ScopedKey | None]

Add Tasks to the TaskHub for a given AlchemicalNetwork.

Note: the Tasks must be within the same scope as the AlchemicalNetwork, and must correspond to a Transformation in the AlchemicalNetwork.

A given compute task can be represented in any number of AlchemicalNetwork TaskHubs, or none at all.

Only Tasks with status ‘waiting’, ‘running’, or ‘error’ can be actioned.

add_scope(identifier: str, cls: type[CredentialedEntity], scope: Scope)

Add a scope to the given entity.

add_task_restart_patterns(taskhub: ScopedKey, patterns: list[str], number_of_retries: int)

Add a list of restart policy patterns to a TaskHub along with the number of retries allowed.

Parameters:
  • taskhub (ScopedKey) – TaskHub for the restart patterns to enforce.

  • patterns (list[str]) – Regular expression patterns that will be compared to tracebacks returned by ProtocolUnitFailures.

  • number_of_retries (int) – The number of times the given patterns will apply to a single Task, attempts to restart beyond this value will result in a canceled Task with an error status.

Raises:

KeyError – Raised when the provided TaskHub ScopedKey cannot be associated with a TaskHub in the database.

assemble_network(network: AlchemicalNetwork, scope: Scope, state: NetworkStateEnum | str = NetworkStateEnum.active) tuple[ScopedKey, ScopedKey, ScopedKey]

Create all nodes and relationships needed for an AlchemicalNetwork represented in an alchemiscale state store.

Parameters:
  • network – The AlchemicalNetwork to submit to the database.

  • scope – The Scope where the AlchemicalNetwork resides.

  • state – The starting state of the network as marked by the NetworkMark.

Returns:

  • A tuple containing the AlchemicalNetwork ScopedKey, the TaskHub

  • ScopedKey, and the NetworkMark ScopedKey.

cancel_tasks(tasks: list[ScopedKey], taskhub: ScopedKey, tx=None) list[ScopedKey | None]

Remove Tasks from the TaskHub for a given AlchemicalNetwork.

Note: Tasks must be within the same scope as the AlchemicalNetwork.

A given Task can be represented in many AlchemicalNetwork TaskHubs, or none at all.

check()

Check consistency of database.

Will raise Neo4JStoreError if any state check fails. If no check fails, will return without any exception.

claim_taskhub_tasks(taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1, protocols: list[Protocol | str] | None = None) list[ScopedKey | None]

Claim a TaskHub Task.

This method will claim Tasks from a TaskHub according to the following process:

  1. waiting Tasks with the highest priority are selected for consideration.

  2. Tasks with an EXTENDS relationship to an incomplete Task are dropped from consideration.

  3. Of those that remain, a Task is claimed stochastically based on the weight of its ACTIONS relationship with the TaskHub.

This process is repeated until count Tasks have been claimed. If no Task is available, then None is given in its place.

Parameters:
  • compute_service_id – Unique identifier for the compute service claiming the Tasks for execution.

  • count – Claim the given number of Tasks in a single transaction.

  • protocols – Protocols to restrict Task claiming to. None means no restriction. If an empty list, raises ValueError.

clear_errored_computemanager(compute_manager_id: ComputeManagerID, tx=None)

Remove a compute manager with an ERROR status.

Parameters:

compute_manager_id – The compute manager ID string containing the name and the UUID.

Raises:

ValueError – Raised when the ERROR compute manager cannot be found in the database

clear_task_restart_patterns(taskhub: ScopedKey)

Clear all restart patterns from a TaskHub.

Parameters:

taskhub (ScopedKey) – The ScopedKey of the TaskHub to clear of restart patterns.

close()

Close the Neo4j driver for this instance.

compute_service_can_claim(compute_service_id: ComputeServiceID, forgive_time: datetime, max_failures: int) bool

Check if a compute service is able to claim a Task.

Parameters:
  • compute_service_id – The compute service to validate.

  • forgive_time – The time cutoff used to filter failure time reports for the compute service. Only entries occuring after this time are considered.

  • max_failures – The number of failures allowed to occur between forgive_time and now. Any value greater than this denies the claim request.

compute_services_can_claim(compute_service_ids: list[ComputeServiceID], forgive_time: datetime, max_failures: int) list[bool]

Check compute services are able to claim tasks.

Parameters:
  • compute_service_ids – The compute services to validate.

  • forgive_time – The time cutoff used to filter failure time reports for the compute services. Only entries occuring after this time are considered.

  • max_failures – The number of failures allowed to occur between forgive_time and now. Any value greater than this denies the claim request.

create_credentialed_entity(entity: CredentialedEntity)

Create a new credentialed entity, such as a user or compute identity.

If an entity of this type with the same identifier already exists, then this will overwrite its properties, including credential.

create_network_mark_subgraph(network_node: Node, state=NetworkStateEnum.active)

Create the Subgraph for an AlchemicalNetwork’s NetworkMark.

Parameters:
  • network_node – A Node representing the target AlchemicalNetwork. This is a returned value from the create_network_subgraph() method.

  • state – The starting state for the AlchemicalNetwork.

Returns:

  • A tuple containing the NetworkMark Subgraph, the specific NetworkMark

  • Node within the Subgraph, and the ScopedKey of the NetworkMark.

create_network_subgraph(network: AlchemicalNetwork, scope: Scope)

Create a Subgraph for the given AlchemicalNetwork.

Parameters:
  • network – An AlchemicalNetwork to generate a Subgraph form of.

  • scope – The Scope where the AlchemicalNetwork resides.

Returns:

  • A tuple containing the AlchemicalNetwork Subgraph, the specific

  • AlchemicalNetwork Node within the Subgraph, and the ScopedKey of the

  • AlchemicalNetwork.

create_task(transformation: ScopedKey, extends: ScopedKey | None = None, creator: str | None = None) ScopedKey

Create a single Task for a Transformation.

This is a convenience method that wraps around the more general create_tasks method.

create_taskhub_subgraph(network_node: Node)

Create a Subgraph for an AlchemicalNetwork’s TaskHub.

Parameters:

network_node – A Node representing the target AlchemicalNetwork. This is a returned value from the create_network_subgraph() method.

Returns:

  • A tuple containing the TaskHub Subgraph, the specific TaskHub

  • Node within the Subgraph, and the ScopedKey of the TaskHub.

create_tasks(transformations: list[ScopedKey], extends: list[ScopedKey | None] | None = None, creator: str | None = None) list[ScopedKey]

Create Tasks for the given Transformations.

Note: this creates Tasks; it does not action them.

Parameters:
  • transformations – The Transformations to create Tasks for. One Task is created for each Transformation ScopedKey given; to create multiple Tasks for a given Transformation, provide its ScopedKey multiple times.

  • extends – The ScopedKeys of the Tasks to use as a starting point for the created Tasks, in the same order as transformations. If None given for a given Task, it will not extend any other Task. Will use the ProtocolDAGResult from the given Task as the extends input for the Task’s eventual call to Protocol.create.

  • (optional) (creator) – The creator of the Tasks.

delete_network(network: ScopedKey) ScopedKey

Delete the given AlchemicalNetwork from the database.

This will not remove any Transformations or ChemicalSystems associated with the AlchemicalNetwork, since these may be associated with other AlchemicalNetworks in the same Scope.

delete_task(task: ScopedKey) Task

Remove a compute Task from a Transformation.

This will also remove the Task from all TaskHubs it is a part of.

This method is intended for administrator use; generally Tasks should instead have their tasks set to ‘deleted’ and retained.

delete_taskhub(network: ScopedKey) ScopedKey

Delete a TaskHub for a given AlchemicalNetwork.

deregister_computemanager(compute_manager_id: ComputeManagerID)

Remove the compute manager registration from the statestore.

Uses the name and UUID from a ComputeManagerID to deregister a compute manager’s registration. First, the MANAGES relationship with compute services’ registration are removed. After this, the compute manager registration node is removed as long as the registration does not have the ERROR status.

Parameters:

compute_manager_id – The compute manager ID string containing the name and the UUID.

deregister_computeservice(compute_service_id: ComputeServiceID)

Remove the registration for the given ComputeServiceID from the state store.

This wil remove the ComputeServiceRegistration node, and all its CLAIMS relationships to Tasks.

All Tasks with CLAIMS relationships to the ComputeServiceRegistration and with status running will have their status set to waiting.

expire_computemanager_registrations(expire_time_ok: datetime, expire_time_error: datetime)

Remove expired compute managers from the statestore.

This method checks the status of compute managers and removes those that have expired based on their last status update time.

Parameters:
  • expire_time_ok – The expiration time for “OK” compute managers. Managers with a last update time earlier than the expiration cutoff will be removed.

  • expire_time_error – The expiration time for “ERROR” compute managers. Managers with a last update time earlier than the expiration cutoff will be removed.

expire_registrations(expire_time: datetime)

Remove all registrations with last heartbeat prior to the given expire_time.

get_chemicalsystem_networks(chemicalsystem: ScopedKey) list[ScopedKey]

List ScopedKeys for AlchemicalNetworks associated with the given ChemicalSystem.

get_chemicalsystem_transformations(chemicalsystem: ScopedKey) list[ScopedKey]

List ScopedKeys for the Transformations associated with the given ChemicalSystem.

get_computemanager_instruction(compute_manager_id: ComputeManagerID, forgive_time: datetime, max_failures: int, scopes: list[Scope], protocols: list[str]) tuple[ComputeManagerInstruction, dict]

Return an instruction for a compute manager based on the contents of the statestore.

This method returns one of three instructions along with supporting data:

  1. “OK” with a list of ComputeServiceIDs and the number of available tasks

  2. “SKIP” with a list of ComputeServiceIDs

  3. “SHUTDOWN” with an error message

Parameters:
  • compute_manager_id – The compute manager ID string containing the name and the UUID.

  • forgive_time

    The time at which a failure from a compute service is

    considered forgiven.

    max_failures

    The number of failures a compute service is allowed to have (before the forgive time) before it is no longer allowed to claim a task. If any managed compute services have failues that exceed this value, the returned instruction will be SKIP.

  • scopes – The scopes to consider when determining available tasks.

  • protocols – A list of Protocols to filter on. An empty list will not filter.

Returns:

  • A tuple with whose first value is the instruction enumeration

  • and whose second value is data associated with that

  • instruction.

get_credentialed_entity(identifier: str, cls: type[CredentialedEntity])

Get an existing credentialed entity, such as a user or compute identity.

get_keyed_chain(scoped_key: ScopedKey) KeyedChain

Retrieve the KeyedChain form of a GufeTokenizable from the database.

Parameters:

scoped_key – The ScopedKey of the GufeTokenizable to retrieve.

Returns:

The KeyedChain form of the tokenizable.

Return type:

KeyedChain

get_network_chemicalsystems(network: ScopedKey) list[ScopedKey]

List ScopedKeys for ChemicalSystems associated with the given AlchemicalNetwork.

get_network_state(networks: list[ScopedKey]) list[str | None]

Get the states of a group of networks.

Parameters:

networks – The list networks to get the states of.

Returns:

A list containing the states of the given networks, in the same order as they were provided. If a network was not found, None is returned at the corresponding index.

Return type:

List[Optional[str]]

get_network_status(networks: list[ScopedKey]) list[dict[str, int]]

Return status counts for all Tasks associated with the given AlchemicalNetworks.

get_network_strategy(network: ScopedKey) Strategy | None

Get the Strategy for the given AlchemicalNetwork.

Parameters:

network – ScopedKey of the AlchemicalNetwork.

Returns:

Strategy object or None if no strategy is set.

Return type:

GufeTokenizable | None

get_network_strategy_state(network: ScopedKey) StrategyState | None

Get the StrategyState for the given AlchemicalNetwork.

Parameters:

network – ScopedKey of the AlchemicalNetwork.

Returns:

Strategy state or None if no strategy is set.

Return type:

StrategyState | None

get_network_tasks(network: ScopedKey, status: TaskStatusEnum | None = None) list[ScopedKey]

List ScopedKeys for all Tasks associated with the given AlchemicalNetwork.

get_network_transformations(network: ScopedKey) list[ScopedKey]

List ScopedKeys for Transformations associated with the given AlchemicalNetwork.

get_scope_status(scope: Scope, network_state: NetworkStateEnum | str | None = NetworkStateEnum.active) dict[str, int]

Return status counts for all Tasks within the given Scope.

Parameters:
  • scope – Scope to get status for; may be non-specific.

  • network_state – Network state to restrict status returns for; may be a regex pattern.

get_strategies_for_execution(scopes: list[Scope] | None = None, min_sleep_interval: int = 0) list[tuple[ScopedKey, ScopedKey, StrategyState]]

Get strategies that are ready for execution by the Strategist service.

Returns strategies that are: - Not disabled - Not in error status - Due for execution based on sleep interval

Parameters:
  • scopes – List of scopes to filter by, if None returns all scopes

  • min_sleep_interval – Minimum sleep interval enforced by Strategist service

Returns:

List of (network_sk, strategy_sk, strategy_state) tuples

Return type:

list[tuple[ScopedKey, ScopedKey, StrategyState]]

get_task_actioned_networks(task: ScopedKey) dict[ScopedKey, float]

Get all AlchemicalNetwork ScopedKeys whose TaskHub ACTIONS a given Task.

Parameters:

task – The ScopedKey of the Task to obtain actioned AlchemicalNetworks for.

Returns:

A dict with AlchemicalNetwork ScopedKeys whose TaskHub actions a given Task as keys, Task weights as values.

Return type:

networks

get_task_networks(task: ScopedKey) list[ScopedKey]

List ScopedKeys for AlchemicalNetworks associated with the given Task.

get_task_priority(tasks: list[ScopedKey]) list[int | None]

Get the priority of a list of Tasks.

Parameters:

tasks – The list of Tasks to get the priority for.

Returns:

A list of priorities in the same order as the provided Tasks. If an element is None, the Task could not be found.

Return type:

List[Optional[int]]

get_task_restart_patterns(taskhubs: list[ScopedKey]) dict[ScopedKey, set[tuple[str, int]]]

For a list of TaskHub ScopedKeys, get the associated restart patterns along with the maximum number of retries for each pattern.

Parameters:

taskhubs (list[ScopedKey]) – The ScopedKeys of the TaskHubs to get the restart patterns of.

Returns:

A dictionary with ScopedKeys of the TaskHubs provided as keys, and a set of tuples containing the patterns enforcing each TaskHub along with their associated maximum number of retries as values.

Return type:

dict[ScopedKey, set[tuple[str, int]]]

get_task_status(tasks: list[ScopedKey]) list[TaskStatusEnum]

Get the status of a list of Tasks.

Parameters:

tasks – The list of Tasks to get the status for.

Returns:

  • A list of the Task statuses requested; None is given for any

  • Tasks that do not exist.

get_task_transformation(task: ScopedKey, return_gufe=True) tuple[Transformation, ProtocolDAGResultRef | None] | tuple[ScopedKey, ScopedKey | None]

Get the Transformation and ProtocolDAGResultRef to extend from (if present) for the given Task.

If return_gufe is True, returns actual Transformation and ProtocolDAGResultRef object (None if not present); if False, returns ScopedKeys for these instead.

get_task_weights(tasks: list[ScopedKey], taskhub: ScopedKey) list[float | None]

Get weights for the ACTIONS relationship between a TaskHub and a Task.

Parameters:
  • tasks – The ScopedKeys of the Tasks to get the weights for.

  • taskhub – The ScopedKey of the TaskHub associated with the Tasks.

Returns:

Weights for the list of Tasks, in the same order.

Return type:

weights

get_taskhub(network: ScopedKey, return_gufe: bool = False) ScopedKey | TaskHub

Get the TaskHub for the given AlchemicalNetwork.

Parameters:

return_gufe – If True, return a TaskHub instance. Otherwise, return a ScopedKey.

get_taskhub_actioned_tasks(taskhubs: list[ScopedKey]) list[dict[ScopedKey, float]]

Get the Tasks that the given TaskHubs ACTIONS.

Parameters:

taskhubs – The ScopedKeys of the TaskHubs to query.

Returns:

A list of dicts, one per TaskHub, which contains the Task ScopedKeys that are actioned on the given TaskHub as keys, with their weights as values.

Return type:

tasks

get_taskhub_tasks(taskhub: ScopedKey, return_gufe=False, tx=None) list[ScopedKey] | dict[ScopedKey, Task]

Get a list of Tasks on the TaskHub.

get_taskhub_unclaimed_tasks(taskhub: ScopedKey, return_gufe=False) list[ScopedKey] | dict[ScopedKey, Task]

Get a list of unclaimed Tasks in the TaskHub.

get_taskhub_weight(networks: list[ScopedKey]) list[float]

Get the weight for the TaskHubs associated with the given AlchemicalNetworks.

get_taskhubs(network_scoped_keys: list[ScopedKey], return_gufe: bool = False) list[ScopedKey | TaskHub]

Get the TaskHubs for the given AlchemicalNetworks.

Parameters:

return_gufe – If True, return TaskHub instances. Otherwise, return ScopedKeys.

get_transformation_actioned_tasks(transformation: ScopedKey, taskhub: ScopedKey) list[ScopedKey]

Get all Tasks for a Transformation that are actioned by the given TaskHub.

Parameters:
  • transformation – ScopedKey of the Transformation to retrieve actioned Tasks for.

  • taskhub – ScopedKey of the TaskHub to check for actioned Tasks.

Returns:

List of Task ScopedKeys that perform the given Transformation and are actioned by the given TaskHub.

Return type:

tasks

get_transformation_chemicalsystems(transformation: ScopedKey) list[ScopedKey]

List ScopedKeys for the ChemicalSystems associated with the given Transformation.

get_transformation_networks(transformation: ScopedKey) list[ScopedKey]

List ScopedKeys for AlchemicalNetworks associated with the given Transformation.

get_transformation_status(transformation: ScopedKey) dict[str, int]

Return status counts for all Tasks associated with the given Transformation.

get_transformation_tasks(transformation: ScopedKey, extends: ScopedKey | None = None, return_as: str = 'list', status: TaskStatusEnum | None = None) list[ScopedKey] | dict[ScopedKey, ScopedKey | None]

Get all Tasks that perform the given Transformation.

If a Task ScopedKey is given for extends, then only those Tasks that follow via any number of EXTENDS relationships will be returned.

return_as takes either list or graph as input. graph will yield a dict mapping each Task’s ScopedKey (as keys) to the Task ScopedKey it extends (as values).

Parameters:
  • transformation – ScopedKey of the Transformation to retrieve Tasks for.

  • extends

heartbeat_computeservice(compute_service_id: ComputeServiceID, heartbeat: datetime)

Update the heartbeat for the given ComputeServiceID.

initialize()

Initialize database.

Ensures that constraints and any other required structures are in place. Should be used on any Neo4j database prior to use for Alchemiscale.

list_credentialed_entities(cls: type[CredentialedEntity])

Get an existing credentialed entity, such as a user or compute identity.

list_scopes(identifier: str, cls: type[CredentialedEntity]) list[Scope]

List all scopes for which the given entity has access.

log_failure_compute_service(compute_service_id: ComputeServiceID, failure_time: datetime) ComputeServiceID

Add a reported compute service failure to the database.

Parameters:
  • compute_service_id – The identifier for the compute service that failed.

  • failure_time – The time the failure should be reported as.

query_chemicalsystems(*, name=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)

Query for ChemicalSystems matching given attributes.

query_networks(*, name=None, key=None, scope: Scope | None = None, state: str | None = None) list[ScopedKey]

Query for AlchemicalNetworks matching given attributes.

query_taskhubs(scope: ~alchemiscale.models.Scope | None = <Scope('*-*-*')>, return_gufe: bool = False) list[ScopedKey] | dict[ScopedKey, TaskHub]

Query for TaskHubs matching the given criteria.

Parameters:

return_gufe – If True, return a dict with ScopedKeys as keys, TaskHub instances as values. Otherwise, return a list of ScopedKeys.

query_tasks(*, status=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)

Query for Tasks matching given attributes.

query_transformations(*, name=None, key=None, scope: ~alchemiscale.models.Scope = <Scope('*-*-*')>)

Query for Transformations matching given attributes.

register_computemanager(compute_manager_registration: ComputeManagerRegistration, steal: bool = False) ComputeManagerID

Register a compute manager with the statestore.

Parameters:
  • compute_manager_registration – The compute manager registration.

  • steal – Whether or not to steal the registration from an existing registration.

Returns:

The compute manager ID string containing the name and the UUID.

Return type:

compute_manager_id

Raises:

ValueError – Raised when a compute manager is already registered with the provided name.

register_computeservice(compute_service_registration: ComputeServiceRegistration)

Register a ComputeServiceRegistration uniquely identifying a running ComputeService.

A ComputeServiceRegistration node is used for CLAIMS relationships on Tasks to avoid collisions in Task execution.

remove_credentialed_identity(identifier: str, cls: type[CredentialedEntity])

Remove a credentialed entity, such as a user or compute identity.

remove_scope(identifier: str, cls: type[CredentialedEntity], scope: Scope)

Remove a scope from the given entity.

remove_task_restart_patterns(taskhub: ScopedKey, patterns: list[str])

Remove a list of restart patterns enforcing a TaskHub from the database.

Parameters:
  • taskhub (ScopedKey) – The ScopedKey of the TaskHub that the patterns enforce.

  • patterns (list[str]) – The patterns to remove. Patterns not enforcing the TaskHub are ignored.

reset()

Remove all data from database; undo all components in initialize.

resolve_task_restarts(task_scoped_keys: Iterable[ScopedKey], *, tx=None)

Determine whether or not Tasks need to be restarted or canceled and perform that action.

Parameters:

task_scoped_keys (Iterable[ScopedKey]) – An iterable of Task ScopedKeys that need to be resolved. Tasks without the error status are filtered out and ignored.

set_network_state(networks: list[ScopedKey], states: list[str]) list[ScopedKey | None]

Set the state of a group of AlchemicalNetworks.

Parameters:
  • networks – A list networks to set the states for.

  • states – A list of states to set the networks to.

Returns:

The list of ScopedKeys for networks that were updated. If the network could not be found in the database, None is returned at the corresponding index.

Return type:

List[Optional[ScopedKey]]

set_network_strategy(network: ScopedKey, strategy: Strategy | None, strategy_state: StrategyState | None = None) ScopedKey | None

Set the compute Strategy for the given AlchemicalNetwork.

If strategy is None, removes the strategy from the network and cleans up orphaned Strategy nodes.

Parameters:
  • network – ScopedKey of the AlchemicalNetwork.

  • strategy – Strategy object (GufeTokenizable) or None to remove strategy.

  • strategy_state – Initial strategy state, if None uses defaults.

Returns:

ScopedKey of the Strategy that was set, or None if strategy was removed.

Return type:

ScopedKey

set_task_complete(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to complete.

Only running Tasks can be set to complete.

set_task_deleted(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to deleted.

Any Task can be set to deleted; a deleted Task cannot change to any other status.

set_task_error(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to error.

Only running Tasks can be set to error.

set_task_invalid(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to invalid.

Any Task can be set to invalid; an invalid Task cannot change to any other status.

set_task_priority(tasks: list[ScopedKey], priority: int) list[ScopedKey | None]

Set the priority of a list of Tasks.

Parameters:
  • tasks – The list of Tasks to set the priority of.

  • priority – The priority to set the Tasks to.

Returns:

A list of the Task ScopedKeys for which priority was changed; None is given for any Tasks for which the priority could not be changed.

Return type:

List[Optional[ScopedKey]]

set_task_restart_patterns_max_retries(taskhub_scoped_key: ScopedKey, patterns: list[str], max_retries: int)

Set the maximum number of retries of a pattern enforcing a TaskHub.

Parameters:
  • taskhub_scoped_key (ScopedKey) – The ScopedKey of the TaskHub that the patterns enforce.

  • patterns (list[str]) – The patterns to change the maximum retries value for.

  • max_retries (int) – The new maximum retries value.

set_task_result(task: ScopedKey, protocoldagresultref: ProtocolDAGResultRef) ScopedKey

Set a ProtocolDAGResultRef pointing to a ProtocolDAGResult for the given Task.

set_task_running(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to running.

Only Tasks with status waiting can be set to running.

set_task_status(tasks: list[ScopedKey], status: TaskStatusEnum, raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks.

This is a master method that calls the appropriate method for the status.

Parameters:
  • tasks – The list of Tasks to set the status of.

  • status – The status to set the Task to.

  • raise_error – If True, raise a ValueError if the status of a given Task cannot be changed.

Returns:

A list of the Task ScopedKeys for which status was changed; None is given for any Tasks for which the status could not be changed.

Return type:

List[Optional[ScopedKey]]

set_task_waiting(tasks: list[ScopedKey], raise_error: bool = False) list[ScopedKey | None]

Set the status of a list of Tasks to waiting.

Only Tasks with status error or running can be set to waiting.

set_task_weights(tasks: dict[ScopedKey, float] | list[ScopedKey], taskhub: ScopedKey, weight: float | None = None) list[ScopedKey | None]

Sets weights for the ACTIONS relationship between a TaskHub and a Task.

This is used to set the relative probabilistic execution order of a Task in a TaskHub. Note that this concept is orthogonal to priority in that tasks of higher priority will be executed before tasks of lower priority, but tasks of the same priority will be distributed according to their weights.

The weights can be set by either a list and a scalar, or a dict of {ScopedKey: weight} pairs.

Must be called after action_tasks to have any effect; otherwise, the TaskHub will not have an ACTIONS relationship to the Task.

Parameters:
  • tasks (Union[Dict[ScopedKey, float], List[ScopedKey]]) – If a dict, the keys are the ScopedKeys of the Tasks, and the values are the weights. If a list, the weights are set to the scalar value given by the weight argument.

  • taskhub (ScopedKey) – The ScopedKey of the TaskHub associated with the Tasks.

  • weight (Optional[float]) – If tasks is a list, this is the weight to set for each Task.

Returns:

A list of ScopedKeys for each Task whose weight was set. None is given for Tasks that weight was not set for; this could be because the TaskHub doesn’t have an ACTIONS relationship with the Task, or the Task doesn’t exist at all

Return type:

List[ScopedKey, None]

set_taskhub_weight(networks: list[ScopedKey], weights: list[float]) list[ScopedKey | None]

Set the weights for the TaskHubs associated with the given AlchemicalNetworks.

set_tasks(transformation: ScopedKey, extends: Task | None = None, count: int = 1) ScopedKey

Set a fixed number of Tasks against the given Transformation if not already present.

Note: Tasks created by this method are not added to any TaskHubs.

Parameters:
  • transformation – The Transformation to compute.

  • scope – The scope the Transformation is in; ignored if transformation is a ScopedKey.

  • extends – The Task to use as a starting point for this Task. Will use the ProtocolDAGResult from the given Task as the extends input for the Task’s eventual call to Protocol.create.

  • count – The total number of tasks that should exist corresponding to the specified transformation, scope, and extends.

transaction(ignore_exceptions=False) neo4j.Transaction

Context manager for a Neo4j Transaction.

update_compute_manager_status(compute_manager_id: ComputeManagerID, status: ComputeManagerStatus, detail: str | None = None, saturation: float | None = None, update_time: datetime | None = None)

Update the status of a compute manager.

Statuses can either be passed in as strings or instances of the ComputeManagerStatus enumeration, though the latter is safer and preferred.

Parameters:
  • compute_manager_id – The compute manager ID string containing the name and the UUID.

  • status – An instance of the ComputeManagerStatus string enumeration, whose supported values are “OK” and “ERROR”.

  • detail – A message to be included with the status update. This is only allowed and required by the ERROR status. This message should indicate to administrators why the compute manager entered the ERROR status.

  • update_time – The time to set as the last status update time in the database. Defaults to None, which will use the current time when updating.

update_strategy_state(network: ScopedKey, strategy_state: StrategyState) ScopedKey | None

Update the StrategyState for the given AlchemicalNetwork.

Parameters:
  • network – ScopedKey of the AlchemicalNetwork.

  • strategy_state – Updated strategy state.

Returns:

The ScopedKey of the AlchemicalNetwork if StrategyState successfully updated; None otherwise.

Return type:

ScopedKey | None

alchemiscale.storage.statestore.get_n4js(settings: Neo4jStoreSettings)

Convenience function for getting a Neo4jStore directly from settings.