import json
from typing import Iterable, List, Mapping
import numpy as np
import pandas as pd
from .auth import AuthClient
from .base import BaseEncoder, ClientBase, _api_endpoints, handle_response
from .endpoints import annotation_api_versions, annotation_common
from .tools import stage
SERVER_KEY = "ae_server_address"
[docs]def AnnotationClient(
server_address,
dataset_name=None,
aligned_volume_name=None,
auth_client=None,
api_version="latest",
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
over_client=None,
):
"""Factory for returning AnnotationClient
Parameters
----------
server_address : str
server_address to use to connect to (i.e. https://minniev1.microns-daf.com)
datastack_name : str
Name of the datastack.
auth_client : AuthClient or None, optional
Authentication client to use to connect to server. If None, do not use authentication.
api_version : str or int (default: latest)
What version of the api to use, 0: Legacy client (i.e www.dynamicannotationframework.com)
2: new api version, (i.e. minniev1.microns-daf.com)
'latest': default to the most recent (current 2)
verify : str (default : True)
whether to verify https
max_retries : Int or None, optional
Set the number of retries per request, by default None. If None, defaults to requests package default.
pool_block : Bool or None, optional
If True, restricts pool of threads to max size, by default None. If None, defaults to requests package default.
pool_maxsize : Int or None, optional
Sets the max number of threads in the pool, by default None. If None, defaults to requests package default.
over_client:
client to overwrite configuration with
Returns
-------
ClientBaseWithDatastack
List of datastack names for available datastacks on the annotation engine
"""
if auth_client is None:
auth_client = AuthClient()
auth_header = auth_client.request_header
endpoints, api_version = _api_endpoints(
api_version,
SERVER_KEY,
server_address,
annotation_common,
annotation_api_versions,
auth_header,
verify=verify,
)
AnnoClient = client_mapping[api_version]
if api_version > 1:
return AnnoClient(
server_address,
auth_header,
api_version,
endpoints,
SERVER_KEY,
aligned_volume_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
)
else:
return AnnoClient(
server_address,
auth_header,
api_version,
endpoints,
SERVER_KEY,
dataset_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
)
[docs]class AnnotationClientV2(ClientBase):
def __init__(
self,
server_address,
auth_header,
api_version,
endpoints,
server_name,
aligned_volume_name,
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
over_client=None,
schema_client=None,
):
super(AnnotationClientV2, self).__init__(
server_address,
auth_header,
api_version,
endpoints,
server_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
)
self._aligned_volume_name = aligned_volume_name
if schema_client is None:
self._schema_client = self.fc.schema
else:
self._schema_client = schema_client
@property
def aligned_volume_name(self):
return self._aligned_volume_name
[docs] def get_tables(self, aligned_volume_name: str = None):
"""Gets a list of table names for a aligned_volume_name
Parameters
----------
aligned_volume_name : str or None, optional
Name of the aligned_volume, by default None.
If None, uses the one specified in the client.
Will be set correctly if you are using the framework_client
Returns
-------
list
List of table names
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
url = self._endpoints["tables"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
[docs] def get_annotation_count(self, table_name: str, aligned_volume_name: str = None):
"""Get number of annotations in a table
Parameters
----------
table_name (str):
name of table to mark for deletion
aligned_volume_name: str or None, optional,
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
int
number of annotations
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["table_count"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
[docs] def delete_table(self, table_name: str, aligned_volume_name: str = None):
"""Marks a table for deletion requires super admin privileges
Parameters
----------
table_name (str):
name of table to mark for deletion
aligned_volume_name: str or None, optional,
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["table_info"].format_map(endpoint_mapping)
response = self.session.delete(url)
return handle_response(response)
[docs] def create_table(
self,
table_name: str,
schema_name: str,
description: str,
voxel_resolution: List[float],
reference_table: str = None,
track_target_id_updates: bool = None,
flat_segmentation_source: str = None,
user_id: int = None,
aligned_volume_name: str = None,
write_permission: str = "PRIVATE",
read_permission: str = "PUBLIC",
notice_text: str = None,
):
"""Creates a new data table based on an existing schema
Parameters
----------
table_name: str
Name of the new table. Cannot be the same as an existing table
schema_name: str
Name of the schema for the new table.
descrption: str
Human readable description for what is in the table.
Should include information about who generated the table
What data it covers, and how it should be interpreted.
And who should you talk to if you want to use it.
An Example:
a manual synapse table to detect chandelier synapses
on 81 PyC cells with complete AISs
[created by Agnes - agnesb@alleninstitute.org, uploaded by Forrest]
voxel_resolution: list[float]
voxel resolution points will be uploaded in, typically nm, i.e [1,1,1] means nanometers
[4,4,40] would be 4nm, 4nm, 40nm voxels
reference_table: str or None
If the schema you are using is a reference schema
Meaning it is an annotation of another annotation.
Then you need to specify what the target table
those annotations are in.
track_target_id_updates: bool or None
Indicates whether to automatically update reference table's foreign key
if target annotation table row is updated.
flat_segmentation_source: str or None
the source to a flat segmentation that corresponds to this table
i.e. precomputed:\\gs:\\mybucket\this_tables_annotation
user_id: int
If you are uploading this schema on someone else's behalf
and you want to link this table with their ID, you can specify it here
Otherwise, the table will be created with your userID in the user_id column.
aligned_volume_name: str or None, optional,
Name of the aligned_volume. If None, uses the one specified in the client.
write_permission: str, optional
What permissions to give the table for writing. One of
PRIVATE: only you can write to this table (DEFAULT)
GROUP: only members that share a group with you can write (excluding some groups)
PUBLIC: Anyone can write to this table. Note all data is logged, and deletes are done
by marking rows as deleted, so all data is always recoverable
read_permission: str, optional
What permissions to give the table for reading. One of
PRIVATE: only you can read this table. Intended to be used for sorting out bugs.
GROUP: only members that share a group with you can read (intended for within group vetting)
PUBLIC: anyone with permissions to read this datastack can read this data (DEFAULT)
notice_text: str, optional
Text the user will see when querying this table. Can be used to warn users of flaws,
and uncertainty in the data, or to advertise citations that should be used with this table.
Defaults to None, no text. If you want to remove text, send empty string.
Returns
-------
json
Response JSON
Examples
--------
Basic annotation table:
description = "Some description about the table"
voxel_res = [4,4,40]
client.create_table("some_synapse_table", "synapse", description, voxel_res)
"""
if read_permission not in ["PRIVATE", "GROUP", "PUBLIC"]:
raise ValueError("read_permission must be one of PRIVATE GROUP or PUBLIC")
if write_permission not in ["PRIVATE", "GROUP", "PUBLIC"]:
raise ValueError("write_permission must be one of PRIVATE GROUP or PUBLIC")
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
url = self._endpoints["tables"].format_map(endpoint_mapping)
metadata = {
"description": description,
"voxel_resolution_x": float(voxel_resolution[0]),
"voxel_resolution_y": float(voxel_resolution[1]),
"voxel_resolution_z": float(voxel_resolution[2]),
"read_permission": read_permission,
"write_permission": write_permission,
}
if user_id is not None:
metadata["user_id"] = user_id
if notice_text is not None:
metadata["notice_text"] = notice_text
if reference_table is not None:
metadata["table_metadata"] = {
"reference_table": reference_table,
"track_target_id_updates": track_target_id_updates,
}
if flat_segmentation_source is not None:
metadata["flat_segmentation_source"] = flat_segmentation_source
data = {
"schema_type": schema_name,
"table_name": table_name,
"metadata": metadata,
}
response = self.session.post(url, json=data)
return handle_response(response, as_json=False)
[docs] def get_annotation(
self,
table_name: str,
annotation_ids: (int or Iterable),
aligned_volume_name: str = None,
):
"""Retrieve an annotation or annotations by id(s) and table name.
Parameters
----------
table_name : str
Name of the table
annotation_ids : int or iterable
ID or IDS of the annotation to retreive
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
list
Annotation data
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["annotations"].format_map(endpoint_mapping)
try:
iter(annotation_ids)
except TypeError:
annotation_ids = [annotation_ids]
params = {"annotation_ids": ",".join([str(a) for a in annotation_ids])}
response = self.session.get(url, params=params)
return handle_response(response)
[docs] def post_annotation(
self, table_name: str, data: (dict or List), aligned_volume_name: str = None
):
"""Post one or more new annotations to a table in the AnnotationEngine.
All inserted annotations will be marked as 'valid'. To invalidate
annotations refer to 'update_annotation', 'update_annotation_df'
and 'delete_annotation' methods.
Parameters
----------
table_name : str
Name of the table where annotations will be added
data : dict or list,
A list of (or a single) dict of schematized annotation data matching the target table.
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
if isinstance(data, dict):
data = [data]
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["annotations"].format_map(endpoint_mapping)
try:
iter(data)
except TypeError:
data = [data]
data = {"annotations": data}
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={"Content-Type": "application/json"},
)
return handle_response(response)
[docs] @staticmethod
def process_position_columns(
df: pd.DataFrame, position_columns: (Iterable[str] or Mapping[str, str] or None)
):
"""Process a dataframe into a list of dictionaries, nesting thing
Args:
df (pd.DataFrame): dataframe to process
position_columns (Iterable[str] or Mapping[str, str] or None): see post_annotation_df
Returns:
json list of annotations ready for posting
"""
if position_columns is None:
position_columns = [c for c in df.columns if c.endswith("_position")]
if isinstance(position_columns, (list, np.ndarray, pd.Index)):
position_columns = {c: c.rsplit("_", 1)[0] for c in position_columns}
if type(position_columns) != dict:
raise ValueError("position_columns must be a list, dict or None")
data = df.to_dict(orient="records")
for d in data:
for k, v in position_columns.items():
pos = d.pop(k)
d[v] = {"position": pos}
return data
[docs] def post_annotation_df(
self,
table_name: str,
df: pd.DataFrame,
position_columns: (Iterable[str] or Mapping[str, str] or None),
aligned_volume_name=None,
):
"""Post one or more new annotations to a table in the AnnotationEngine.
All inserted annotations will be marked as 'valid'. To invalidate
annotations see 'update_annotation', 'update_annotation_df'
and 'delete_annotation' methods.
Parameters
----------
table_name : str
Name of the table where annotations will be added
df : pd.DataFrame
A pandas dataframe containing the annotations. Columns should be fields in schema,
position columns need to be called out in position_columns argument.
position_columns: dict or (list or np.array or pd.Index) or None
if None, will look for all columns with 'X_position' in the name and assume they go
in fields called "X".
if Iterable assumes each column given ends in _position.
(i.e. ['pt_position'] if 'pt' is the name of the position field in schema)
if Mapping, keys are names of columns in dataframe, values are the names of the fields
(i.e. {'pt_column': 'pt'} would be correct if you had one column named 'pt_column'
which needed to go into a schema with a position column called 'pt')
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
data = self.process_position_columns(df, position_columns)
return self.post_annotation(
table_name, data, aligned_volume_name=aligned_volume_name
)
[docs] def update_annotation(
self, table_name: str, data: (dict or List), aligned_volume_name: str = None
):
"""Update one or more new annotations to a table in the AnnotationEngine.
Updating is implemented by invalidating the old annotation
and inserting a new annotation row, which will receive a new primary key ID.
Notes
-----
If annotations ids were user provided upon insertion the database will
autoincrement from the current max id in the table.
Parameters
----------
table_name : str
Name of the table where annotations will be added
data : dict or list,
A list of (or a single) dict of schematized annotation data matching the target table.
each dict must contain an "id" field which is the ID of the annotation to update
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON: a list of new annotation IDs.
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
if isinstance(data, dict):
data = [data]
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["annotations"].format_map(endpoint_mapping)
try:
iter(data)
except TypeError:
data = [data]
data = {"annotations": data}
response = self.session.put(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={"Content-Type": "application/json"},
)
return handle_response(response)
[docs] def update_annotation_df(
self,
table_name: str,
df: pd.DataFrame,
position_columns: (Iterable[str] or Mapping[str, str] or None),
aligned_volume_name=None,
):
"""Update one or more annotations to a table in the AnnotationEngine using a
dataframe as format. Updating is implemented by invalidating the old annotation
and inserting a new annotation row, which will receive a new primary key ID.
Notes
-----
If annotations ids were user provided upon insertion the database will
autoincrement from the current max id in the table.
Parameters
----------
table_name : str
Name of the table where annotations will be added
df : pd.DataFrame
A pandas dataframe containing the annotations. Columns should be fields in schema,
position columns need to be called out in position_columns argument.
position_columns: dict or (list or np.array or pd.Index) or None
if None, will look for all columns with 'X_position' in the name and assume they go
in fields called "X".
if Iterable assumes each column given ends in _position.
(i.e. ['pt_position'] if 'pt' is the name of the position field in schema)
if Mapping, keys are names of columns in dataframe, values are the names of the fields
(i.e. {'pt_column': 'pt'} would be correct if you had one column named 'pt_column'
which needed to go into a schema with a position column called 'pt')
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
data = self.process_position_columns(df, position_columns)
return self.update_annotation(
table_name, data, aligned_volume_name=aligned_volume_name
)
[docs] def delete_annotation(
self,
table_name: str,
annotation_ids: (dict or List),
aligned_volume_name: str = None,
):
"""Delete one or more annotations in a table. Annotations that are
deleted are recorded as 'non-valid' but are not physically removed from the table.
Parameters
----------
table_name : str
Name of the table where annotations will be added
data : dict or list,
A list of (or a single) dict of schematized annotation data matching the target table.
each dict must contain an "id" field which is the ID of the annotation to update
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
json
Response JSON: a list of new annotation IDs.
"""
if aligned_volume_name is None:
aligned_volume_name = self.aligned_volume_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["aligned_volume_name"] = aligned_volume_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["annotations"].format_map(endpoint_mapping)
try:
iter(annotation_ids)
except TypeError:
annotation_ids = [annotation_ids]
data = {"annotation_ids": annotation_ids}
response = self.session.delete(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={"Content-Type": "application/json"},
)
return handle_response(response)
[docs] def stage_annotations(
self,
table_name=None,
schema_name=None,
update=False,
id_field=False,
table_resolution=None,
annotation_resolution=None,
):
"""
Get a StagedAnnotations object to help produce correctly formatted annotations for a given table or schema.
StagedAnnotation objects can be uploaded directly with `upload_staged_annotations`.
Parameters
----------
table_name : str, optional
Table name to stage annotations for, by default None.
schema_name : str, optional
Schema name to use to make annotations. Only needed if the table_name is not set, by default None
update : bool, optional
Set to True if individual annotations are going to be updated, by default False.
id_field : bool, optional
Set to True if id fields are to be specified. Not needed if update is True, which always needs id fields. Optional, by default False
table_resolution : list-like or None, optional
Voxel resolution of spatial points in the table in nanometers. This is found automatically from the info service if a table name is provided, by default None.
If annotation_resolution is also set, this allows points to be scaled correctly for the table.
annotation_resolution : list-like, optional
Voxel resolution of spatial points provided by the user when creating annotations. If the table resolution is also available (manually or from the info service),
annotations are correctly rescaled for the volume. By default, None.
"""
if table_name is not None:
obj_name = table_name
table_meta = self.get_table_metadata(table_name)
schema_name = table_meta["schema_type"]
table_resolution = table_meta["voxel_resolution"]
else:
if schema_name is None:
raise ValueError("Must specify either table name or schema name")
obj_name = schema_name
schema = self._schema_client.schema_definition(schema_name)
return stage.StagedAnnotations(
schema,
name=obj_name,
update=update,
id_field=id_field,
annotation_resolution=annotation_resolution,
table_resolution=table_resolution,
table_name=table_name,
)
[docs] def upload_staged_annotations(
self,
staged_annos: stage.StagedAnnotations,
aligned_volume_name: str = None,
):
"""
Upload annotations directly from an Annotation Guide object.
This method uses the options specified in the object, including table name and if the annotation is an update or not.
Parameters
----------
staged_annos : guide.AnnotationGuide
AnnotationGuide object with a specified table name and a collection of annotations already filled in.
aligned_volume_name : str or None, optional
Name of the aligned_volume. If None, uses the one specified in the client.
Returns
-------
List or dict
If new annotations are posted, a list of ids.
If annotations are being updated, a dictionary with the mapping from old ids to new ids.
"""
if staged_annos.table_name is None:
raise ValueError(
"Only annotation guide objects with a specified table name can be used here"
)
if staged_annos.is_update:
return self.update_annotation(
staged_annos.table_name,
staged_annos.annotation_list,
aligned_volume_name=aligned_volume_name,
)
else:
return self.post_annotation(
staged_annos.table_name,
staged_annos.annotation_list,
aligned_volume_name=aligned_volume_name,
)
client_mapping = {
2: AnnotationClientV2,
"latest": AnnotationClientV2,
}