import re
import numpy as np
from .base import (
ClientBaseWithDatastack,
_api_endpoints,
handle_response,
)
from .auth import AuthClient
from .endpoints import (
infoservice_common,
infoservice_api_versions,
default_global_server_address,
)
from .format_utils import (
output_map,
format_raw,
)
SERVER_KEY = "i_server_address"
[docs]def InfoServiceClient(
server_address=None,
datastack_name=None,
auth_client=None,
api_version="latest",
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
over_client=None,
info_cache=None,
):
if server_address is None:
server_address = default_global_server_address
if auth_client is None:
auth_client = AuthClient()
auth_header = auth_client.request_header
endpoints, api_version = _api_endpoints(
api_version,
SERVER_KEY,
server_address,
infoservice_common,
infoservice_api_versions,
auth_header,
verify=verify
)
InfoClient = client_mapping[api_version]
return InfoClient(
server_address,
auth_header,
api_version,
endpoints,
SERVER_KEY,
datastack_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
info_cache=info_cache,
)
[docs]class InfoServiceClientV2(ClientBaseWithDatastack):
def __init__(
self,
server_address,
auth_header,
api_version,
endpoints,
server_name,
datastack_name,
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
over_client=None,
info_cache=None,
):
super(InfoServiceClientV2, self).__init__(
server_address,
auth_header,
api_version,
endpoints,
server_name,
datastack_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
)
if not info_cache:
self.info_cache = dict()
else:
self.info_cache = info_cache
if datastack_name is not None:
ds_info = self.get_datastack_info(datastack_name=datastack_name)
self._aligned_volume_name = ds_info["aligned_volume"]["name"]
self._aligned_volume_id = ds_info["aligned_volume"]["id"]
else:
self._aligned_volume_name = None
self._aligned_volume_id = None
@property
def aligned_volume_name(self):
return self._aligned_volume_name
@property
def aligned_volume_id(self):
return self._aligned_volume_id
[docs] def get_datastacks(self):
"""Query which datastacks are available at the info service
Returns
-------
list
List of datastack names
"""
endpoint_mapping = self.default_url_mapping
url = self._endpoints["datastacks"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
[docs] def get_datastack_info(self, datastack_name=None, use_stored=True):
"""Gets the info record for a datastack
Parameters
----------
datastack_name : str, optional
datastack to look up. If None, uses the one specified by the client. By default None
use_stored : bool, optional
If True and the information has already been queried for that datastack, then uses the cached version. If False, re-queries the infromation. By default True
Returns
-------
dict or None
The complete info record for the datastack
"""
if datastack_name is None:
datastack_name = self.datastack_name
if datastack_name is None:
raise ValueError("No Dataset set")
if (not use_stored) or (datastack_name not in self.info_cache):
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
url = self._endpoints["datastack_info"].format_map(endpoint_mapping)
response = self.session.get(url)
self.raise_for_status(response)
self.info_cache[datastack_name] = handle_response(response)
return self.info_cache.get(datastack_name, None)
def _get_property(
self,
info_property,
datastack_name=None,
use_stored=True,
format_for="raw",
output_map=output_map,
):
if datastack_name is None:
datastack_name = self.datastack_name
if datastack_name is None:
raise ValueError("No Dataset set")
self.get_datastack_info(datastack_name=datastack_name, use_stored=use_stored)
value = self.info_cache[datastack_name].get(info_property, None)
return output_map.get(format_for, format_raw)(value)
[docs] def get_aligned_volumes(self):
endpoint_mapping = self.default_url_mapping
url = self._endpoints["aligned_volumes"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
[docs] def get_aligned_volume_info(self, datastack_name: str = None, use_stored=True):
"""Gets the info record for a aligned_volume
Parameters
----------
datastack_name : str, optional
datastack_name to look up. If None, uses the one specified by the client. By default None
use_stored : bool, optional
If True and the information has already been queried for that dataset, then uses the cached version. If False, re-queries the infromation. By default True
Returns
-------
dict or None
The complete info record for the aligned_volume
"""
return self._get_property(
"aligned_volume", datastack_name=datastack_name, use_stored=use_stored
)
[docs] def get_datastacks_by_aligned_volume(
self, aligned_volume: str = None
):
"""Lookup what datastacks are associated with this aligned volume
Args:
aligned_volume (str, optional): aligned volume to lookup. Defaults to None.
Raises:
ValueError: if no aligned volume is specified
Returns:
list: a list of datastack string
"""
if aligned_volume is None:
aligned_volume = self._aligned_volume_name
if aligned_volume is None:
raise ValueError(
"Must specify aligned_volume_id or provide datastack_name in init"
)
print(aligned_volume)
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume
url = self._endpoints["datastacks_from_aligned_volume"].format_map(
endpoint_mapping
)
response = self.session.get(url)
return handle_response(response)
[docs] def get_aligned_volume_info_by_id(
self, aligned_volume_id: int = None, use_stored=True
):
if aligned_volume_id is None:
aligned_volume_id = self._aligned_volume_id
if aligned_volume_id is None:
raise ValueError(
"Must specify aligned_volume_id or provide datastack_name in init"
)
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_id"] = aligned_volume_id
url = self._endpoints["aligned_volume_by_id"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
[docs] def local_server(self, datastack_name=None, use_stored=True):
return self._get_property(
"local_server",
datastack_name=datastack_name,
use_stored=use_stored,
output_map=output_map,
)
[docs] def annotation_endpoint(self, datastack_name=None, use_stored=True):
"""AnnotationEngine endpoint for a dataset.
Parameters
----------
datastack_name : str or None, optional
Name of the datastack to look up. If None, uses the value specified by the client. Default is None.
use_stored : bool, optional
If True, uses the cached value if available. If False, re-queries the InfoService. Default is True.
Returns
-------
str
Location of the AnnotationEngine
"""
local_server = self.local_server(
datastack_name=datastack_name, use_stored=use_stored
)
return local_server + "/annotation"
[docs] def image_source(self, datastack_name=None, use_stored=True, format_for="raw"):
"""Cloud path to the imagery for the dataset
Parameters
----------
datastack_name : str or None, optional
Name of the datastack to look up. If None, uses the value specified by the client. Default is None.
use_stored : bool, optional
If True, uses the cached value if available. If False, re-queries the InfoService. Default is True.
format_for : 'raw', 'cloudvolume', or 'neuroglancer', optional
Formats the path for different uses.
If 'raw' (default), the path in the InfoService is passed along.
If 'cloudvolume', a "precomputed://gs://" type path is converted to a full https URL.
If 'neuroglancer', a full https URL is converted to a "precomputed://gs://" type path.
Returns
-------
str
Formatted cloud path to the flat segmentation
"""
av_info = self.get_aligned_volume_info(
datastack_name=datastack_name, use_stored=use_stored
)
return av_info["image_source"]
[docs] def synapse_segmentation_source(
self, datastack_name=None, use_stored=True, format_for="raw"
):
"""Cloud path to the synapse segmentation for a dataset
Parameters
----------
datastack_name : str or None, optional
Name of the dataset to look up. If None, uses the value specified by the client. Default is None.
use_stored : bool, optional
If True, uses the cached value if available. If False, re-queries the InfoService. Default is True.
format_for : 'raw', 'cloudvolume', or 'neuroglancer', optional
Formats the path for different uses.
If 'raw' (default), the path in the InfoService is passed along.
If 'cloudvolume', a "precomputed://gs://" type path is converted to a full https URL.
If 'neuroglancer', a full https URL is converted to a "precomputed://gs://" type path.
Returns
-------
str
Formatted cloud path to the synapse segmentation
"""
return self._get_property(
"synapse_segmentation_source",
datastack_name=datastack_name,
use_stored=use_stored,
format_for=format_for,
output_map=output_map,
)
[docs] def segmentation_source(
self, datastack_name=None, format_for="raw", use_stored=True
):
"""Cloud path to the chunkgraph-backed Graphene segmentation for a dataset
Parameters
----------
datastack_name : str or None, optional
Name of the datastack to look up. If None, uses the value specified by the client. Default is None.
use_stored : bool, optional
If True, uses the cached value if available. If False, re-queries the InfoService. Default is True.
format_for : 'raw', 'cloudvolume', or 'neuroglancer', optional
Formats the path for different uses.
If 'raw' (default), the path in the InfoService is passed along.
If 'cloudvolume', a "graphene://https://" type path is used
If 'neuroglancer', a "graphene://https://" type path is used, as needed by Neuroglancer.
Returns
-------
str
Formatted cloud path to the Graphene segmentation
"""
return self._get_property(
"segmentation_source",
datastack_name=datastack_name,
use_stored=use_stored,
output_map=output_map,
format_for=format_for,
)
[docs] def refresh_stored_data(self):
"""Reload the stored info values from the server."""
for ds in self.info_cache.keys():
self.get_datastack_info(datastack_name=ds, use_stored=False)
[docs] def viewer_resolution(self, datastack_name=None, use_stored=True):
"""get the viewer resolution metadata for this datastack
Args:
datastack_name (_type_, optional): _description_. Defaults to None.
If None use the default one configured in the client
use_stored (bool, optional): _description_. Defaults to True.
Use the cached value, if False go get a new value from server
Returns:
np.array: voxel resolution as a len(3) np.array
"""
vx = self._get_property(
"viewer_resolution_x",
datastack_name=datastack_name,
use_stored=use_stored,
)
vy = self._get_property(
"viewer_resolution_y",
datastack_name=datastack_name,
use_stored=use_stored,
)
vz = self._get_property(
"viewer_resolution_z",
datastack_name=datastack_name,
use_stored=use_stored,
)
return np.array([vx, vy, vz])
[docs] def viewer_site(self, datastack_name=None, use_stored=True):
"""Get the base Neuroglancer URL for the dataset"""
return self._get_property(
"viewer_site",
datastack_name=datastack_name,
use_stored=use_stored,
)
[docs] def image_cloudvolume(self, **kwargs):
"""Generate a cloudvolume instance based on the image source, using authentication if needed and
sensible default values for reading CAVE resources. By default, fill_missing is True and bounded
is False. All keyword arguments are passed onto the CloudVolume initialization function, and defaults
can be overridden.
Requires cloudvolume to be installed, which is not included by default.
"""
return self._make_cloudvolume(self.image_source(format_for='cloudvolume'), **kwargs)
[docs] def segmentation_cloudvolume(self, use_client_secret=True, **kwargs):
"""Generate a cloudvolume instance based on the segmentation source, using authentication if needed and
sensible default values for reading CAVE resources. By default, fill_missing is True and bounded
is False. All keyword arguments are passed onto the CloudVolume initialization function, and defaults
can be overridden.
Requires cloudvolume to be installed, which is not included by default.
"""
return self._make_cloudvolume(
self.segmentation_source(format_for='cloudvolume'), use_client_secret=use_client_secret, **kwargs
)
def _make_cloudvolume(self, cloudpath, use_client_secret=True, **kwargs):
try:
import cloudvolume
except ImportError:
raise ImportError(
"Could not import cloudvolume. Make sure it is installed. See https://pypi.org/project/cloud-volume for more info."
)
use_https = kwargs.pop("use_https", True)
bounded = kwargs.pop("bounded", False)
fill_missing = kwargs.pop("fill_missing", True)
if re.search("^graphene", cloudpath) and use_client_secret:
# Authentication header is "Authorization {token}"
secrets = {"token": self.session.headers.get("Authorization").split(" ")[1]}
else:
secrets = None
cv = cloudvolume.CloudVolume(
cloudpath,
use_https=use_https,
fill_missing=fill_missing,
bounded=bounded,
secrets=secrets,
**kwargs,
)
return cv
client_mapping = {
2: InfoServiceClientV2,
"latest": InfoServiceClientV2,
}