Source code for geodesic.tesseract.job
import time
import warnings
from typing import TYPE_CHECKING, Optional, Union, List, Tuple, Literal
import numpy as np
from geodesic.account.projects import _ProjectDescr, get_active_project, get_project
from geodesic.descriptors import (
_BBoxDescr,
_GeometryDescr,
_IntDescr,
_ListDescr,
_StringDescr,
_TypeConstrainedDescr,
)
from geodesic.service import ServiceClient
from geodesic.tesseract.components import (
AssetSpecListDescr,
GlobalProperties,
JobResponse,
Step,
Bucket,
Webhook,
StepInput,
StepOutput,
Container,
Alert,
MultiscaleOptions,
parse_container,
)
from geodesic.tesseract.regex import job_id_re
from geodesic.utils import DeferredImport, MockImport, hamming_distance, deprecated
from geodesic.bases import _APIObject
from geodesic.client import raise_on_error
from geodesic import Dataset
from geodesic.stac import FeatureCollection, Item, Feature
if TYPE_CHECKING:
import datetime
from geodesic.account import Project
from geodesic.tesseract import (
AssetBands,
OutputTimeBins,
PixelsOptions,
WarpOptions,
RasterizeOptions,
)
tqdm = DeferredImport("tqdm")
ipywidgets = DeferredImport("ipywidgets")
ipyleaflet = DeferredImport("ipyleaflet")
mapping = DeferredImport("geodesic.mapping")
display = DeferredImport("IPython.display")
try:
import networkx as nx
except ImportError:
warnings.warn(
"networkx is not installed. topological checks on tesseract jobs will not be performed"
)
nx = None
try:
import zarr
except ImportError:
zarr = MockImport("zarr")
tesseract_client = ServiceClient("tesseract", 1)
[docs]def list_jobs(
search: str = None,
project=None,
status: Literal["completed", "running", "deleted", "all"] = "all",
) -> "JobList":
"""depreciated function, use `get_jobs()` instead"""
warnings.warn(
"list_jobs() is depreciated and will be removed in v1.0.0, use get_jobs() instead",
UserWarning,
)
return get_jobs(search=search, project=project, status=status)
[docs]def get_jobs(
search: str = None,
project=None,
status: Literal["completed", "running", "deleted", "all"] = "all",
) -> "JobList":
"""returns a list of Tesseract Jobs
Args:
search: a search string used to search within the jobs name, alias or description. This will do simple partial string matching.
project: the name of the project to search jobs. Defaults to the current active project.
status: return only jobs that have this status. this can be 'completed, 'running', 'deleted', or 'all' (default) to return jobs in any state.
Returns:
a :class:`JobList` of matching Jobs
Example:
To list Tesseract jobs related to Hurricane Ida we could run the `list_jobs()` function like so:
>>> jobs = list_jobs(
... search="ida",
... project="ida-demo",
... status="completed"
... )
>>> jobs[0].alias
"Ida Road Model"
"""
if project is None:
project = get_active_project()
else:
if isinstance(project, str):
project = get_project(project)
elif not isinstance(project, Project):
raise ValueError("project must be string or Project")
if status.lower() not in ["completed", "running", "deleted", "all"]:
raise ValueError("status must be one of ['completed', 'running', 'deleted', 'all']")
params = {}
if search is not None:
params["search"] = search
params["project"] = project.uid
resp = tesseract_client.get(f"jobs", **params)
raise_on_error(resp)
js = resp.json()
try:
if js["jobs"] is None:
return JobList([])
except KeyError:
if js == {}:
return JobList([])
jl = [Job(**j) for j in js["jobs"]]
jobs = JobList(jl)
if status.lower() == "all":
return jobs
# Since there isnt a way to filter serverside by state we need
# to check each of the return jobs and remove ones that dont match.
filtered_jobs = {}
for name, job in jobs.items():
st = job.status()["state"]
if st == status.lower():
filtered_jobs[name] = job
return filtered_jobs
list_jobs = deprecated("v1.0.0", "list_jobs")(get_jobs)
[docs]class Job(_APIObject):
"""represents a Tesseract Job
The class can be initialized either with a dictionary (\\*\\*spec) that represents the request for the particular
type, or can be given an job ID. If a job ID is provided it will query for that job on the tesseract service
and then update this class with the specifics of that job.
Args:
\\*\\*spec: A dictionary representing the job request.
job_id: The job ID string. If provided the job will be initialized
with info by making a request to tesseract.
"""
name = _StringDescr(doc="a unique name for the dataset created by this job.")
alias = _StringDescr(doc="a human readable name for the dataset created by this job")
description = _StringDescr(doc="a longer description for the dataset created by this job")
project = _ProjectDescr(doc="the project that this job will be assigned to")
workers = _IntDescr(doc="the number of workers to use for this job")
bbox = _BBoxDescr(
doc="the rectangular extent of this job. Can be further filtered by a geometry"
)
bbox_epsg = _IntDescr(doc="the EPSG code of the bounding box spatial reference.")
output_epsg = _IntDescr(
doc="the EPSG code of the output spatial reference. Pixel size will be with respect to this."
)
geometry = _GeometryDescr(
doc="A geometry to filter the job with only assets intersecting this will be processed. \
Inputs can be WKT, WKB, GeoJSON, or a anything that implements a __geo_interface__"
)
global_properties = _TypeConstrainedDescr(
(GlobalProperties, dict),
doc="DEPRECATED. Will be removed in v1.0.0. Properties applied to unspecified fields in an asset spec",
deprecated=True,
)
asset_specs = AssetSpecListDescr(
doc="the initial assets to compute in the job",
deprecated=True,
)
workers = _IntDescr(
doc="Number of workers to use for each step in the job. Can also be specified on each step individually."
)
steps = _ListDescr(
item_type=(Step, dict),
doc="A list of steps to execute",
coerce_items=True,
)
hooks = _ListDescr(
item_type=(Webhook, dict),
doc="NOT YET IMPLEMENTED. A list of webhooks to execute when job is complete",
)
output = _TypeConstrainedDescr(
(Bucket, dict),
doc="the output, other than default storage",
)
def __init__(self, job_id: str = None, **spec):
self.project = get_active_project()
self._submitted = False
self._dataset = None
self._item = None
self._bounds = None
self._widget = None
self._service = tesseract_client
# status values
self._state = None
self._n_quarks = None
self._n_completed = None
# geometries
self._query_geom = None
self._quark_geoms = None
self.job_id = None
if job_id is not None and len(spec) == 0:
self.load(job_id=job_id)
if len(spec) > 0:
self.job_id = job_id
super().__init__(**spec)
[docs] def load(self, job_id: str, dry_run: bool = False) -> None:
"""Loads job information for `job_id` if the job exists
Args:
job_id (str): The job ID to load
dry_run (bool): If True, only loads the job information, not the dataset or item.
"""
job_resp = raise_on_error(
self._service.get(f"jobs/{job_id}", project=self.project.uid)
).json()
if "jobs" not in job_resp or len(job_resp["jobs"]) == 0:
raise ValueError(f"job '{job_id}' not found in project '{self.project.uid}'")
self.update(job_resp["jobs"][0])
self.job_id = job_resp["jobs"][0].get("job_id", job_id)
if dry_run:
return
# If this isn't a dry run, load the other data.
ds = raise_on_error(
self._service.get(f"jobs/{job_id}/dataset", project=self.project.uid)
).json()
self._dataset = Dataset(**ds)
si = raise_on_error(
self._service.get(f"jobs/{job_id}/item", project=self.project.uid)
).json()
self._item = Item(**si)
self._query_geom = getattr(self._item, "geometry", None)
self.status(return_quark_geoms=True)
[docs] def submit(
self,
overwrite: bool = False,
dry_run: bool = False,
timeout_seconds: float = 30.0,
) -> JobResponse:
"""Submits a job to be processed by tesseract
This function will take the job defined by this class and submit it to the tesseract api for processing.
Once submitted the dataset and items fields will be populated containing the SeerAI dataset and STAC item
respectively. Keep in mind that even though the links to files in the STAC item will be populated, the job
may not yet be completed and so some of the chunks may not be finished.
Args:
overwrite: if the job exists, deletes it and creates a new one
dry_run: runs this as a dry run (no work submitted, only estimated.)
timeout_seconds: how long to wait for the job to be submitted before timing out.
"""
# If this job has a job_id, delete the existing job
if self.job_id is not None:
if overwrite:
self.delete_and_wait()
else:
self.status()
# If the current job state is "dry_run" and we're submitting a non-dry run, delete the job and wait.
if self.state == "dry_run" and not dry_run:
self.delete_and_wait(timeout_seconds=timeout_seconds)
else:
self.load(self.job_id, dry_run=self.state == "dry_run")
return
req = dict(self)
req["dry_run"] = dry_run
# submit the job
response = self._service.post("submit", **req)
res = response.json()
# If there's an error, get the job ID from that error
if "error" in res:
detail = res["error"].get("detail", "")
job_id_match = job_id_re.search(detail)
# If the job already exists and we don't already have the job_id set, get and set the job_id
if "exists" in detail and job_id_match and self.job_id is None:
job_id = job_id_match.group(1)
self.job_id = job_id
# Recursively call this, now that we have the job_id
return self.submit(
overwrite=overwrite,
dry_run=dry_run,
timeout_seconds=timeout_seconds,
)
else:
raise_on_error(response)
res = JobResponse(**res)
job_id = res.get("job_id", None)
if job_id is None:
raise ValueError("no job_id was returned, something went wrong")
self.job_id = job_id
self.load(job_id, dry_run=dry_run)
self._submitted = True
res.warn()
return res
def delete_and_wait(self, timeout_seconds=30.0):
self.delete(remove_data=True)
timeout = time.time() + timeout_seconds
timed_out = True
while time.time() < timeout:
self.status()
if self._state == "deleted":
timed_out = False
break
time.sleep(1.0)
if timed_out:
raise ValueError(
"Job submission timed out waiting for deletion to complete. Job is still deleting,"
" please try again later"
)
@property
def dataset(self):
return self._dataset
@property
def item(self):
return self._item
@property
def state(self):
return self._state
[docs] def zarr(self, asset_name: str = None):
"""
Returns the Zarr group for the corresponding asset name
Args:
asset_name: name of the asset to open and return
Returns:
zarr file pointing to the results.
"""
if self._item is None or self._n_completed != self._n_quarks:
raise ValueError("computation not completed")
try:
assets = self._item.assets
except AttributeError:
raise AttributeError("item has no assets")
try:
asset = assets[asset_name]
except KeyError:
raise KeyError(f"asset {asset_name} does not exist")
href = asset.href
return zarr.open(href)
[docs] def ndarray(self, asset_name: str):
"""
Returns a numpy.ndarray for specified asset name.
USE WITH CAUTION! RETURNS ALL OF WHAT COULD BE A
HUGE ARRAY
Args:
asset_name: name of the asset to open and return
Returns:
numpy array of all the results.
"""
return self.zarr(asset_name)["tesseract"][:]
[docs] def status(
self,
return_quark_geoms: bool = False,
return_quark_status: bool = False,
return_alerts: bool = False,
warn: bool = False,
):
"""Status queries the tesseract service for the jobs status.
Args:
return_quark_geoms(bool): Should the query to the service ask for all of the quarks geometries.
If True it will populate the geometry in this class.
return_quark_status(bool): If True will query for the status of each individual quark associated with
the job.
return_alerts(bool): If True, will return all alerts (planning errors, warnings, etc) for the job.
warn(bool): If any alerts are returned, warns the user with a Python warning
Returns:
A dictionary with the response from the Tesseract service
"""
if not self.job_id:
raise Exception("job_id not set, cannot get status")
q = {
"return_quark_geoms": return_quark_geoms,
"return_quark_status": return_quark_status,
"return_alerts": return_alerts,
"project": self.project.uid,
}
res = raise_on_error(self._service.get(f"jobs/{self.job_id}/status", **q)).json()
status = res.get("job_status", None)
if status is None:
print(res)
raise Exception("could not get job status")
self._n_quarks = status.get("n_quarks", None)
self._n_completed = status.get("n_quarks_completed", 0)
self._state = status.get("state", None)
if return_quark_geoms:
quark_geoms = status.get("features", None)
if quark_geoms is None:
raise Exception("job status returned no geometries")
self.quark_geoms = FeatureCollection(**quark_geoms)
self._status = status
self.alerts = [Alert(**w) for w in status.get("alerts", [])]
if warn:
for alert in self.alerts:
alert.warn()
return status
[docs] def add_create_assets_step(
self,
name: str,
asset_name: str,
workers: int = 1,
dataset: Optional[Union["Dataset", str]] = None,
dataset_project: Optional[Union["Project", str]] = None,
stac_items: Optional[List[Union[dict, "Feature", "Item"]]] = None,
asset_bands: Optional[List[Union["AssetBands", dict]]] = None,
output_time_bins: Optional[Union["OutputTimeBins", dict]] = None,
pixels_options: Optional[Union["PixelsOptions", dict]] = None,
warp_options: Optional[Union["WarpOptions", dict]] = None,
rasterize_options: Optional[Union["RasterizeOptions", dict]] = None,
no_data: Optional[List[Union[int, float, complex, str]]] = None,
pixel_dtype: Optional[Union[str, np.dtype]] = None,
fill_value: Optional[Union[str, int, float, complex]] = None,
ids: Optional[List[str]] = None,
filter: Optional[dict] = None,
datetime: Optional[Union[Tuple[Union["datetime.datetime", str]], str]] = None,
chip_size: Optional[int] = 512,
output_bands: Optional[List[str]] = None,
compression: Optional[str] = "blosc",
page_size: Optional[int] = 1000,
) -> "Job":
"""add a Data input to this Tesseract Job
This adds a data input to this Job. Although there are many arguments, many of them don't
need to be specified. The following rules apply
* You MUST specified either a ``dataset`` or ``stac_items``, but not both. Specifying both is undefined and will
raise an Exception.
* You MUST specify ``pixels_options``, or ``rasterize_options``, or leave both as None. Specifying both is undefined
and will raise an exception. If you specify neither, the Features/Items will be added in vector/GeoJSON format
* If pixels_options is specified, you must specify ``asset_bands`` as there is no general way to know what asset
and band list from the dataset is desired.
* You do not need to specify the ``dataset_project`` unless the ``dataset``'\s project is ambiguous based on the
name alone. This will check the ``active_project`` first, followed by ``global``, and raise an exception if the
``dataset`` is not in either. If you specify a ``Dataset`` object, the ``dataset_project`` will be pulled from that.
This method returns ``self`` so that it can be chained together with other methods.
Args:
name: the name of the step. This must be unqiue across the whole job.
asset_name: the name of the output ``asset`` in Tesseract that this will create. This name can be referenced \
by future ``Step``'\s in the job.
workers: the number of workers to use for this step. (default=1)
dataset: the name of the ``Dataset`` or a ``Dataset`` object that has been saved in Entanglement.
dataset_project: the project that the ``Dataset`` belongs to. This is to resolve ambiguity between ``Dataset``'\s \
that have the same name as each other.
stac_items: A list of Features or STAC Items that used in lieu of a ``Dataset`` as this step's inputs. Do not \
specify more than a handful of features via this method as the job performance may suffer or the ``Job`` \
may fail to submit successfully.
asset_bands: a list of ``asset``/``bands`` combinations. The combination of the ``asset`` and the list of ``bands`` \
will be extracted from the dataset, if available. It's not always possible for Tesseract to guarantee \
that the asset/bands are available without starting the job. Double check your arguments to avoid ``Job`` \
failure after the job has been submitted.
output_time_times: a specification of how to create the output time bins for the job.
pixels_options: If this is set, Tesseract will assume that this step will create a tensor output from either \
the specified ``dataset`` or the ``stac_items`` provided.
warp_options (deprecated): If this is set, Tesseract will assume that this step will create a tensor output \
from either the specified ``dataset`` or the ``stac_items`` provided.
rasterize_options: If this is set, Tesseract will assume that this step will create a tensor output by \
rasterizing either the feature outputs from querying the ``dataset`` or using the provided ``stac_items``.
no_data: For pixels jobs, this will be used as the no_data value for the input rasters.
pixel_dtype: The data type of tensor outputs. Not needed for features.
fill_value: The value to set as the no data value for tensor output. This will be set as the "fill_value" in \
the resulting zarr output file.
ids: A list of IDs to filter the dataset to. Useful if you know exactly what data you wish for Tesseract \
to use.
filter: a CQL2 JSON filter as a Python dict. This will be used to filter the data if the ``dataset`` supports \
filtering.
datetime: The range to query for input items from the ``dataset``. This may be specified either as a tuple of \
datetimes/rfc3339 strings or as a STAC style range, \
'YYYY-mm-ddTHH:MM:SS[Z][+-HH:MM]/YYYY-mm-ddTHH:MM:SS[Z][+-HH:MM]', 'YYYY-mm-ddTHH:MM:SS[Z][+-HH:MM]/..' \
or '../YYYY-mm-ddTHH:MM:SS[Z][+-HH:MM]'
chip_size: for tensor outputs, what size in pixels should each of the chips be? This can be \
256>= ``chip_size`` >= 2048
output_bands: a list of string names of what the output bands should be called. Length must match the \
asset_bans total count of bins.
compression: what compression algorithm to use on compressed tensor chunks. 'blosc' is default and usually \
very effective.
page_size: how many items to query at a time from Boson. For complex features, this may need to be a smaller \
value (default 1000 is usually fine), but for simpler features using a large value will speed up \
the processing.
Returns:
This ``Job`` after this step has been added. This is so that these can be chained. If you want to suppress
the output, call like so: _ = job.add_data_input(...)
Examples:
Add an ``asset`` from the "srtm-gl1" dataset. This will use the pixels functionality to reproject/resample
>>> job = Job()
>>> _ = job.add_data_input(
... name="add-srtm",
... asset_name="elevation",
... dataset="srtm-gl1",
... asset_bands=[{"asset": "elevation", "bands": [0]}],
... pixels_options={
... "pixel_size": 30.0
... },
... chip_size=2048
... )
Add an ``asset`` from a feature dataset. This will use the rasterize functionality to rasterize the features
>>> job = Job()
>>> _ = job.add_data_input(
... name="add-usa-counties",
... asset_name="counties",
... dataset="usa-counties",
... rasterize_options={
... "pixel_size": [500.0, 500.0],
... "value": "FIPS"
... },
... chip_size=1024
... )
Add the same as the previous step, but do not rasterize
>>> job = Job()
>>> _ = job.add_data_input(
... name="add-usa-counties",
... asset_name="counties",
... dataset="usa-counties",
... )
"""
if warp_options is not None and pixels_options is None:
pixels_options = warp_options
# Check dataset OR stac_items is specified
if dataset is None and stac_items is None:
raise ValueError("must specify either 'dataset' or 'stac_items'")
if dataset is not None and stac_items is not None:
raise ValueError("must specify either 'dataset' or 'stac_items', but not both")
_type = "tensor"
# Check that pixels_options OR rasterize_options are not None or BOTH are None
if pixels_options is not None and rasterize_options is not None:
raise ValueError(
"must specify only one of pixels_options or rasterize_options, but not both"
)
# Check pixels dependent settings
if pixels_options is not None and asset_bands is None:
raise ValueError("must specify 'asset_bands' for pixels requests")
if pixels_options is None and rasterize_options is None:
chip_size = None
compression = None
_type = "features"
if pixels_options is not None:
pixel_size = pixels_options.get("pixel_size")
try:
len(pixel_size)
except TypeError:
pixel_size = [pixel_size, pixel_size]
pixels_options["pixel_size"] = pixel_size
# Prepare the StepInput kwargs for everything that's not None
input_kwargs = {
k: v
for k, v in [
("dataset", dataset),
("dataset_project", dataset_project),
("stac_items", stac_items),
("asset_bands", asset_bands),
("filter", filter),
("datetime", datetime),
("ids", ids),
("no_data", no_data),
("page_size", page_size),
]
if v is not None
}
band_count = 0
if asset_bands is not None:
for ab in asset_bands:
band_count += len(ab.get("bands", []))
if output_bands is not None:
if len(output_bands) != band_count:
raise ValueError(
f"len(output_bands) ({len(output_bands)}) must equal the total band count ({band_count})"
)
# Prepare the StepOutput kwargs for everything that's not None
output_kwargs = {"asset_name": asset_name}
output_kwargs.update(
{
k: v
for k, v in [
("output_time_bins", output_time_bins),
("pixels_options", pixels_options),
("rasterize_options", rasterize_options),
("chip_size", chip_size),
("compression", compression),
("pixel_dtype", pixel_dtype),
("fill_value", fill_value),
("type", _type),
("output_bands", output_bands),
]
if v is not None
}
)
# Create the actual Step
step = Step(
name=name,
type="create-assets",
workers=workers,
inputs=[StepInput(**input_kwargs)],
outputs=[StepOutput(**output_kwargs)],
)
return self.add_step(step)
add_data_input = add_create_assets_step
[docs] def add_model_step(
self,
name: str,
container: Union[str, Container, dict],
inputs: List[Union[StepInput, dict]],
outputs: List[Union[StepOutput, dict]],
args: dict = {},
gpu: bool = False,
workers: int = 1,
):
"""add a Model step to this Tesseract Job
This adds a model step to this Job and runs some validation.
This method returns `self` so that it can be chained together with other methods.
Args:
name: the name to give this step
container: either a Container object or the image tag of the container for the model
inputs: a list of StepInputs. Must refer to previous steps in the model
outputs: a list of StepOutputs detailing the output of this model
args: an optional list of arguements for this container at runtime. These will be provided to the user
inference func if written so-as to accept arguments
gpu: if this model requires a GPU to run, set to True. Unless your code is specifically configured for an
NVIDIA GPU and your image has the appropriate drivers, this will not be necessary or improve
performance of non-GPU optimized code.
workers: How many workers to split this step over
Returns:
self - this Job
Examples:
Add a step that runs a harmonic regression model using a previous asset step called 'landsat'.
>>> from geodesic.tesseract import Job, Container, StepInput, StepOutput
>>> job = Job()
... job.add_model_step(
... name="run-har-reg",
... container=Container(
... repository="us-central1-docker.pkg.dev/double-catfish-291717/seerai-docker/images/",
... image="har-reg",
... tag="v0.0.7",
... args={"forder": 4}
... ),
... inputs=[StepInput(
... asset_name="landsat",
... dataset_project=proj,
... spatial_chunk_shape=(512, 512),
... type="tensor",
... time_bin_selection=T.BinSelection(all=True),
... )],
... outputs=[
... StepOutput(
... asset_name="brightness-params",
... chunk_shape=(1, 10, 512, 512),
... type="tensor",
... pixel_dtype="<f8",
... fill_value="nan",
... ),
... StepOutput(
... asset_name="greenness-params",
... chunk_shape=(1, 10, 512, 512),
... type="tensor",
... pixel_dtype="<f8",
... fill_value="nan",
... ),
... StepOutput(
... asset_name="wetness-params",
... chunk_shape=(1, 10, 512, 512),
... type="tensor",
... pixel_dtype="<f8",
... fill_value="nan",
... )
... ],
... workers=10
... )
"""
if isinstance(container, str):
ref = container
try:
container = parse_container(ref)
except Exception:
raise ValueError(f"unable to parse container image ref '{ref}'")
container = Container(**container)
if args:
container.args = args
step = Step(
name=name,
type="model",
container=container,
inputs=inputs,
outputs=outputs,
gpu=gpu,
workers=workers,
)
return self.add_step(step)
def add_step(self, step: Step) -> "Job":
if nx is not None:
if "step:" + step.name in self.dag.nodes:
raise ValueError(
"this job already has a step named '{step.name}'. Step names must be unique"
)
step_type = step.get("type")
if step_type is None:
raise ValueError("must specify step type for every step")
self.steps.append(Step(**step))
if nx is not None:
if not self.is_dag():
raise ValueError("job's graph is not acyclic")
return self
[docs] def update_step_params(
self, step_name: str, input_index=None, output_index=None, **params
) -> bool:
"""updates the parameters for an existing step by looking up the step by name and then applying parameters
This method can be used to update the info in a step that's already been added to a job. You can modify
parameters at either the top level of the Step or in any of the inputs or outputs by specifying an
``input_index`` or an ``output_index``.
Args:
step_name: must match one of the steps in the job. This is the step that will be updated
input_index: the index of the input you would like to modify
output_index: the index of the output you would like to modify
**params: key/values to set on the Step, StepInput, or StepOutput selected
Returns:
True if the step passes DAG validation, False otherwise
Examples:
Rename the step:
>>> job.update_step_params('old_name', name='new_name')
Change the input dataset for 0th input
>>> job.update_step_params('step', input_index=0, dataset="new-dataset")
Change the 3rd output's pixel_dtype
>>> job.update_step_params('step', output_index=3, pixel_dtype=np.float32)
"""
selected_step = None
closest = None
closest_distance = 256
for step in self.steps:
cur_name = step.name
d = hamming_distance(cur_name, step_name)
if d == 0:
selected_step = step
break
if d < closest_distance:
closest = cur_name
closest_distance = d
if selected_step is None:
extra = ""
if closest is not None and closest_distance < len(step_name) / 2:
extra = f"closest to provided step name '{closest}'"
raise ValueError(f"could not find step with name '{step_name}' ({extra})")
if input_index is not None and output_index is not None:
raise ValueError("can only update one input or output, not both")
# By default, update parameters in the step
obj = selected_step
# if the input_index is specified, update the parameters in that input
if input_index is not None:
obj = selected_step.inputs[input_index]
if output_index is not None:
obj = selected_step.outputs[output_index]
for key, value in params.items():
obj[key] = value
try:
return self.is_dag()
except ImportError:
return True
def is_dag(self) -> bool:
if nx is None:
raise ImportError("is_dag requires networkx to be installed")
return nx.is_directed_acyclic_graph(self.dag)
[docs] def delete(self, remove_data: bool = False):
"""Deletes a job in the Tesseract service.
Unless specified, data created by this job will remain in the underlying storage. Set
`remove_data` to True to remove created asset data.
Args:
remove_data: Delete underlying data created by this job
"""
if not self.job_id:
raise Exception("job_id not set, cannot delete")
_ = raise_on_error(
self._service.delete(
f"jobs/{self.job_id}", remove_data=remove_data, project=self.project.uid
)
).json()
self._submitted = False
@property
def dag(self):
graph = nx.DiGraph()
for asset in self.asset_specs:
graph.add_node(asset.name)
# add asset/step nodes and asset/step and step/asset edges
for step in self.steps:
if step.get("type") == "rechunk" or step.get("type") == "multiscale":
continue
graph.add_node("step:" + step.name, type="step", color="red")
for input in step.inputs:
if input.get("asset_name") is not None:
graph.add_node("asset:" + input.asset_name, type="asset", color="green")
graph.add_edge(input.asset_name, step.name)
for output in step.outputs:
if output.asset_name not in graph.nodes:
graph.add_node("asset:" + output.asset_name, type="asset")
graph.add_edge(step.name, output.asset_name)
return graph
def _build_widget(self):
# Progress bar
self._prog = ipywidgets.IntProgress(
value=self._n_completed,
min=0,
max=self._n_quarks,
step=1,
description="Running: ",
bar_style="",
orientation="horizontal",
)
self._title = ipywidgets.HTML(value=self._get_title())
self._ratio = ipywidgets.HTML(value=self._get_ratio())
zoom, center = mapping.calc_zoom_center(self._item["bbox"])
self.map = mapping.Map(center=center, zoom=zoom, scroll_wheel_zoom=True)
self.map.add_control(ipyleaflet.LayersControl(position="topright"))
vb = ipywidgets.VBox([self._title, self._ratio, self._prog])
w = ipywidgets.HBox([vb, self.map])
self._widget = w
def _add_item_layer(self):
if not self._item:
return
disp = Item(**self._item)
disp.geometry = disp.geometry.buffer(np.sqrt(disp.geometry.area) * 0.05).envelope
fci = {"type": "FeatureCollection", "features": [disp]}
query_layer = ipyleaflet.GeoJSON(
data=fci,
style={
"opacity": 1,
"color": "#e2e6d5",
"fillOpacity": 0.0,
"weight": 1,
"dashArray": "4 4",
},
hover_style={"fillOpacity": 0.75},
)
query_layer.name = "Requested Extent"
self.map.add_layer(query_layer)
def _add_quark_layer(self):
if not self.quark_geoms:
return
fc = {"type": "FeatureCollection", "features": self.quark_geoms.features}
self._quark_layer = ipyleaflet.GeoJSON(
data=fc,
style={},
hover_style={
"fillOpacity": 0.75,
},
style_callback=self._quark_style,
)
self._quark_layer.name = "Quark Extents"
self.map.add_layer(self._quark_layer)
def widget(self):
try:
ipywidgets.VBox
except ImportError:
raise ValueError("ipywidgets must be installed to view widget")
if self._state == "dry_run":
return (
ipywidgets.HTML(
value='<h2 style="color: red;">Job is currently in "dry_run" state. Submit before watching job</h2>'
),
False,
)
if not self.job_id:
raise Exception("job_id not set, nothing to watch")
quark_status = self.status(return_quark_status=True)
self.quark_geoms_lookup = {}
for q in self.quark_geoms.features:
self.quark_geoms_lookup[q["id"]] = q
for k, status in quark_status["quark_status"].items():
self.quark_geoms_lookup[k].properties["status"] = status
self._build_widget()
self._add_item_layer()
self._add_quark_layer()
return self._widget, True
def _quark_style(self, feature):
# Default Style
style = {
"opacity": 0.5,
"color": "#888888",
"fillColor": "#888888",
"fillOpacity": 0.05,
}
sts = feature["properties"].get("status", "incomplete")
if sts == "incomplete":
style["fillOpacity"] = 0.0
return style
elif sts == "running":
style["fillColor"] = "yellow"
style["color"] = "yellow"
style["opacity"] = 1.0
elif sts == "failed":
style["fillColor"] = "red"
style["color"] = "red"
style["opacity"] = 0.0
elif sts == "completed":
style["fillColor"] = "green"
style["color"] = "green"
style["opacity"] = 0.0
return style
[docs] def watch(self):
"""Monitor the tesseract job with the SeerAI widget.
Will create a jupyter widget that will watch the progress of this tesseract job.
"""
have_ipywidgets = True
try:
ipywidgets.VBox
except ImportError:
have_ipywidgets = False
if not self.job_id:
if have_ipywidgets:
display.display(
ipywidgets.HTML(
value='<h2 style="color: red;">No Job ID - submit job before watching</h2>'
)
)
return
raise ValueError("no job_id: has job been submitted?")
while self.state in ("planning"):
time.sleep(1.0)
self.status()
self.status(return_quark_status=True, return_quark_geoms=True)
if not have_ipywidgets:
return self.watch_terminal()
widget, valid = self.widget()
display.display(widget)
if not valid:
return
keep_watching = True
while keep_watching:
self._update_widget()
time.sleep(1)
if self._n_completed == self._n_quarks:
break
def watch_terminal(self):
with tqdm.tqdm(total=self._n_quarks) as progress:
while True:
state = self._state
if state == "dry_run":
break
progress.set_description(f"Job State: {self._state}")
self.status()
progress.update(self._n_completed)
time.sleep(1)
if self._n_completed == self._n_quarks:
break
print(f"Job State: {self._state}")
def _update_widget(self):
quark_status = self.status(return_quark_status=True, return_quark_geoms=True)
for k, status in quark_status["quark_status"].items():
self.quark_geoms_lookup[k].properties["status"] = status
feats = {
"type": "FeatureCollection",
"features": [f for _, f in self.quark_geoms_lookup.items()],
}
self._quark_layer.data = feats
# set numerics
self._prog.value = self._n_completed
# When the progress bar is first created self._n_quarks is sometimes None for some reason (I think depending on when the watch function is called), so lets set it here just to be safe.
self._prog.max = self._n_quarks if self._n_quarks is not None else 100
self._title.value = self._get_title()
self._ratio.value = self._get_ratio()
def _get_title(self):
return f"<h2>Job: {self.alias} - {self._state}</h2>"
def _get_ratio(self):
return f"<h2>{self._n_completed} / {self._n_quarks}</h2>"
[docs] def add_rechunk_step(
self, step_name: str, asset_name: str, chunk_shape: List[int], workers: int = 1
):
"""adds a rechunk step to the job
A rechunk step will create a new zarr array for a given asset called "rechunk" which will with copy
of the tesseract array with the new given chunk shape. Note it is best to not decrease any dimension of the
chunk shape too much. For example (1,1,1,1000) to (1,1,1000, 1) will be an extrememly inefficent operation
(not to mention impractical).
Args:
step_name: name for the rechunking step
asset_name: output asset name from a previous step to rechunk
chunk_shape: a list of integers of length four which will
workers: the number of workers to use for this step. (default=1)
**params: key/values to set on the Step, StepInput, or StepOutput selected
Example:
Add rechunk an asset from (1,1,1000,1000) to (4,2,1000,1000):
>>> job.add_rechunk_step(
... step_name='rechunk_sentinel',
... asset_name="sentinel-out",
... chunk_shape=[4,2,1000,1000],
... workers=3
... )
"""
step = Step(
name=step_name,
type="rechunk",
inputs=[{"asset_name": asset_name}],
outputs=[{"asset_name": asset_name, "chunk_shape": chunk_shape}],
workers=workers,
)
return self.add_step(step)
[docs] def add_multiscale_step(
self, step_name: str, asset_name: str, min_zoom: int = 0, workers: int = 1
):
"""adds a multiscales step to the job
Args:
step_name: name for the multiscale step
asset_name: output asset name from a previous step to create multiscales for
min_zoom: minimum zoom level for the multiscale step to generate. This might be useful
if the asset is large with a small pixel size and all 20 zoom levels are not desired.
workers: the number of workers to use for this step. (default=1)
Example:
Add a multiscale step for asset sentinel
>>> job.add_multiscale_step(
... step_name='multiscale_sentinel',
... asset_name="sentinel-out",
... min_zoom=10,
... workers=3
... )
"""
step = Step(
name=step_name,
type="multiscale",
inputs=[{"asset_name": asset_name}],
outputs=[
StepOutput(
asset_name=asset_name,
multiscale_options=MultiscaleOptions({"min_zoom": min_zoom}),
)
],
workers=workers,
)
return self.add_step(step)
[docs]class JobList(_APIObject):
def __init__(self, jobs, ids=[]):
self.ids = ids
if len(ids) != len(jobs):
self.ids = [job.name for job in jobs]
for job in jobs:
self._set_item(job.name, job)
def __getitem__(self, k) -> Job:
if isinstance(k, str):
return super().__getitem__(k)
elif isinstance(k, int):
jid = self.ids[k]
return super().__getitem__(jid)
else:
raise KeyError("invalid job id")
def _ipython_display_(self, **kwargs):
raise NotImplementedError