alchemiscale.compute.client — client for interacting with compute API

class alchemiscale.compute.client.AlchemiscaleComputeClient(api_url: str, identifier: str, key: str, max_retries: int = 5, retry_base_seconds: float = 2.0, retry_max_seconds: float = 60.0, verify: bool = True)

Bases: AlchemiscaleBaseClient

Client for compute service interaction with compute API service.

claim_taskhub_tasks(taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1) Task

Claim a Task from the specified TaskHub

query_taskhubs(scopes: List[Scope], return_gufe=False) List[ScopedKey] | Dict[ScopedKey, TaskHub]

Return all TaskHub`s corresponding to given `Scope.

exception alchemiscale.compute.client.AlchemiscaleComputeClientError(*args, **kwargs)

Bases: AlchemiscaleBaseClientError

alchemiscale.compute.service — compute services for FEC execution

class alchemiscale.compute.service.AsynchronousComputeService(api_url)

Bases: SynchronousComputeService

Asynchronous compute service.

This service can be used in production cases, though it does not make use of Folding@Home.


Start the service; will keep going until told to stop.

class alchemiscale.compute.service.InterruptableSleep

Bases: object

A class for sleeping, but interruptable

This class uses threading Events to wake up from a sleep before the entire sleep duration has run. If the sleep is interrupted, then an SleepInterrupted exception is raised.

This class is a functor, so an instance can be passed as the delay function to a python sched.scheduler

exception alchemiscale.compute.service.SleepInterrupted

Bases: BaseException

Exception class used to signal that an InterruptableSleep was interrupted

This (like KeyboardInterrupt) derives from BaseException to prevent it from being handled with “except Exception”.

class alchemiscale.compute.service.SynchronousComputeService(api_url: str, identifier: str, key: str, name: str, shared_basedir: PathLike, scratch_basedir: PathLike, keep_shared: bool = False, keep_scratch: bool = False, n_retries: int = 3, sleep_interval: int = 30, heartbeat_interval: int = 300, scopes: List[Scope] | None = None, claim_limit: int = 1, loglevel='WARN', logfile: Path | None = None, client_max_retries=5, client_retry_base_seconds=2.0, client_retry_max_seconds=60.0, client_verify=True)

Bases: object

Fully synchronous compute service.

This service is intended for use as a reference implementation, and for testing/debugging protocols.


Deliver a heartbeat to the compute API, indicating this service is still alive.

claim_tasks(count=1) List[ScopedKey | None]

Get a Task to execute from compute API.

Returns None if no Task was available matching service configuration.

execute(task: ScopedKey) ScopedKey

Executes given Task.

Returns ScopedKey of ProtocolDAGResultRef following push to database.


Start up the heartbeat, sleeping for self.heartbeat_interval

start(max_tasks: int | None = None, max_time: int | None = None)

Start the service.

Limits to the maximum number of executed tasks or seconds to run for can be set. The first maximum to be hit will trigger the service to exit.

  • max_tasks – Max number of Tasks to execute before exiting. If None, the service will have no task limit.

  • max_time – Max number of seconds to run before exiting. If None, the service will have no time limit.

task_to_protocoldag(task: ScopedKey) Tuple[gufe.protocols.protocoldag.ProtocolDAG, gufe.Transformation, gufe.protocols.protocoldag.ProtocolDAGResult | None]

Given a Task, produce a corresponding ProtocolDAG that can be executed.

Also gives the Transformation that this ProtocolDAG corresponds to. If the Task extends another Task, then the ProtocolDAGResult for that other Task is also given; otherwise None given.