import itertools
import json
import logging
import re
import warnings
from datetime import datetime, timezone
from typing import Iterable, Union
from urllib.error import HTTPError
import numpy as np
import pandas as pd
import pyarrow as pa
import pytz
from cachetools import TTLCache, cached
from IPython.display import HTML
from .auth import AuthClient
from .base import (
BaseEncoder,
ClientBase,
_api_endpoints,
handle_response,
)
from .endpoints import materialization_api_versions, materialization_common
from .mytimer import MyTimeIt
from .tools.table_manager import TableManager, ViewManager
logger = logging.getLogger(__name__)
SERVER_KEY = "me_server_address"
DEFAULT_COMPRESSION = "zstd"
[docs]def deserialize_query_response(response):
"""Deserialize pyarrow responses"""
content_type = response.headers.get("Content-Type")
if content_type == "data.arrow":
with pa.ipc.open_stream(response.content) as reader:
df = reader.read_pandas()
return df
elif content_type == "x-application/pyarrow":
try:
return pa.deserialize(response.content)
except NameError:
(
"Deserialization of this request requires an older version of Pyarrow (version 3 works).\
Update Materialization Deployment or locally downgrade Pyarrow."
)
else:
raise ValueError(
f'Unknown response type: {response.headers.get("Content-Type")}'
)
[docs]def convert_position_columns(df, given_resolution, desired_resolution):
"""function to take a dataframe with x,y,z position columns and convert
them to the desired resolution from the given resolution
Args:
df (pd.DataFrame): dataframe to alter
given_resolution (Iterable[float]): what the given resolution is
desired_resoultion (Iterable[float]): what the desired resolution is
Returns:
pd.DataFrame: [description]
"""
gr = np.array(given_resolution)
dr = np.array(desired_resolution)
sf = gr / dr
posmap = {"x": 0, "y": 1, "z": 2}
if np.all(sf == 1):
return df
else:
grps = itertools.groupby(df.columns, key=lambda x: x[:-2])
for _, g in grps:
gl = list(g)
t = "".join([k[-1:] for k in gl])
if t == "xyz":
for col in gl:
df[col] = df[col] * sf[posmap[col[-1]]]
return df
[docs]def concatenate_position_columns(df, inplace=False):
"""function to take a dataframe with x,y,z position columns and replace them
with one column per position with an xyz numpy array. Edits occur
Args:
df (pd.DataFrame): dataframe to alter
inplace (bool): whether to perform edits in place
Returns:
pd.DataFrame: [description]
"""
if inplace:
df2 = df
else:
df2 = df.copy()
grps = itertools.groupby(df2.columns, key=lambda x: x[:-2])
for base, g in grps:
gl = list(g)
t = "".join([k[-1:] for k in gl])
if t == "xyz":
df2[base] = [np.array(x) for x in df2[gl].values.tolist()]
if inplace:
df2.drop(gl, axis=1, inplace=inplace)
else:
df2 = df2.drop(gl, axis=1, inplace=inplace)
return df2
[docs]def convert_timestamp(ts: datetime):
if ts == "now":
ts = datetime.now(timezone.utc)
if isinstance(ts, datetime):
if ts.tzinfo is None:
return pytz.UTC.localize(dt=ts)
else:
return ts.astimezone(timezone.utc)
elif isinstance(ts, float):
return datetime.fromtimestamp(ts)
elif ts is None:
return pd.Timestamp.max.to_pydatetime()
dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%f")
return dt.replace(tzinfo=timezone.utc)
[docs]def MaterializationClient(
server_address,
datastack_name=None,
auth_client=None,
cg_client=None,
synapse_table=None,
api_version="latest",
version=None,
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
desired_resolution=None,
over_client=None,
):
"""Factory for returning AnnotationClient
Parameters
----------
server_address : str
server_address to use to connect to (i.e. https://minniev1.microns-daf.com)
datastack_name : str
Name of the datastack.
auth_client : AuthClient or None, optional
Authentication client to use to connect to server. If None, do not use authentication.
api_version : str or int (default: latest)
What version of the api to use, 0: Legacy client (i.e www.dynamicannotationframework.com)
2: new api version, (i.e. minniev1.microns-daf.com)
'latest': default to the most recent (current 2)
cg_client: caveclient.chunkedgraph.ChunkedGraphClient
chunkedgraph client for live materializations
synapse_table: str
default synapse table for queries
version : default version to query
if None will default to latest version
desired_resolution : Iterable[float] or None, optional
If given, should be a list or array of the desired resolution you want queries returned in
useful for materialization queries.
Returns
-------
ClientBaseWithDatastack
List of datastack names for available datastacks on the annotation engine
"""
if auth_client is None:
auth_client = AuthClient()
auth_header = auth_client.request_header
endpoints, api_version = _api_endpoints(
api_version,
SERVER_KEY,
server_address,
materialization_common,
materialization_api_versions,
auth_header,
fallback_version=2,
verify=verify,
)
MatClient = client_mapping[api_version]
return MatClient(
server_address,
auth_header,
api_version,
endpoints,
SERVER_KEY,
datastack_name,
cg_client=cg_client,
synapse_table=synapse_table,
version=version,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
desired_resolution=desired_resolution,
)
[docs]class MaterializationClientV2(ClientBase):
def __init__(
self,
server_address,
auth_header,
api_version,
endpoints,
server_name,
datastack_name,
cg_client=None,
synapse_table=None,
version=None,
verify=True,
max_retries=None,
pool_maxsize=None,
pool_block=None,
over_client=None,
desired_resolution=None,
):
super(MaterializationClientV2, self).__init__(
server_address,
auth_header,
api_version,
endpoints,
server_name,
verify=verify,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
pool_block=pool_block,
over_client=over_client,
)
self._datastack_name = datastack_name
if version is None:
version = self.most_recent_version()
self._version = version
if cg_client is None:
if self.fc is not None:
self.cg_client = self.fc.chunkedgraph
else:
self.cg_client = cg_client
self.synapse_table = synapse_table
self.desired_resolution = desired_resolution
self._tables = None
self._views = None
@property
def datastack_name(self):
return self._datastack_name
@property
def version(self):
return self._version
@property
def homepage(self):
url = (
f"{self._server_address}/materialize/views/datastack/{self._datastack_name}"
)
return HTML(f'<a href="{url}" target="_blank">Materialization Engine</a>')
@version.setter
def version(self, x):
if int(x) in self.get_versions():
self._version = int(x)
else:
raise ValueError("Version not in materialized database")
[docs] def most_recent_version(self, datastack_name=None):
"""get the most recent version of materialization
for this datastack name
Args:
datastack_name (str, optional): datastack name to find most
recent materialization of.
If None, uses the one specified in the client.
"""
versions = self.get_versions(datastack_name=datastack_name)
return np.max(np.array(versions))
[docs] def get_versions(self, datastack_name=None, expired=False):
"""get versions available
Args:
datastack_name ([type], optional): [description]. Defaults to None.
"""
if datastack_name is None:
datastack_name = self.datastack_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
url = self._endpoints["versions"].format_map(endpoint_mapping)
query_args = {"expired": expired}
response = self.session.get(url, params=query_args)
self.raise_for_status(response)
return response.json()
@property
def tables(self):
if self._tables is None:
self._tables = TableManager(self.fc)
return self._tables
@property
def views(self):
if self._views is None:
self._views = ViewManager(self.fc)
return self._views
[docs] def get_tables(self, datastack_name=None, version=None):
"""Gets a list of table names for a datastack
Parameters
----------
datastack_name : str or None, optional
Name of the datastack, by default None.
If None, uses the one specified in the client.
Will be set correctly if you are using the framework_client
version: int or None, optional
the version to query, else get the tables in the most recent version
Returns
-------
list
List of table names
"""
if datastack_name is None:
datastack_name = self.datastack_name
if version is None:
version = self.version
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = version
# TODO fix up latest version
url = self._endpoints["tables"].format_map(endpoint_mapping)
response = self.session.get(url)
self.raise_for_status(response)
return response.json()
[docs] def get_annotation_count(self, table_name: str, datastack_name=None, version=None):
"""Get number of annotations in a table
Parameters
----------
table_name (str):
name of table to mark for deletion
datastack_name: str or None, optional,
Name of the datastack_name. If None, uses the one specified in the client.
version: int or None, optional
the version to query, else get the tables in the most recent version
Returns
-------
int
number of annotations
"""
if datastack_name is None:
datastack_name = self.datastack_name
if version is None:
version = self.version
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["table_name"] = table_name
endpoint_mapping["version"] = version
url = self._endpoints["table_count"].format_map(endpoint_mapping)
response = self.session.get(url)
self.raise_for_status(response)
return response.json()
[docs] def get_timestamp(self, version: int = None, datastack_name: str = None):
"""Get datetime.datetime timestamp for a materialization version.
Parameters
----------
version : int or None, optional
Materialization version, by default None. If None, defaults to the value set in the client.
datastack_name : str or None, optional
Datastack name, by default None. If None, defaults to the value set in the client.
Returns
-------
datetime.datetime
Datetime when the materialization version was frozen.
"""
meta = self.get_version_metadata(version=version, datastack_name=datastack_name)
return convert_timestamp(meta["time_stamp"])
def _format_query_components(
self,
datastack_name,
version,
tables,
select_columns,
suffix_map,
filter_in_dict,
filter_out_dict,
filter_equal_dict,
filter_spatial_dict,
filter_regex_dict,
return_pyarrow,
split_positions,
offset,
limit,
desired_resolution,
use_view=False,
random_sample: int = None,
):
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = version
data = {}
query_args = {}
query_args["return_pyarrow"] = return_pyarrow
query_args["arrow_format"] = return_pyarrow
query_args["split_positions"] = split_positions
if random_sample:
query_args["random_sample"] = random_sample
if len(tables) == 1:
if use_view:
endpoint_mapping["view_name"] = tables[0]
url = self._endpoints["view_query"].format_map(endpoint_mapping)
else:
endpoint_mapping["table_name"] = tables[0]
url = self._endpoints["simple_query"].format_map(endpoint_mapping)
else:
data["tables"] = tables
url = self._endpoints["join_query"].format_map(endpoint_mapping)
if filter_in_dict is not None:
data["filter_in_dict"] = filter_in_dict
if filter_out_dict is not None:
data["filter_notin_dict"] = filter_out_dict
if filter_equal_dict is not None:
data["filter_equal_dict"] = filter_equal_dict
if filter_spatial_dict is not None:
data["filter_spatial_dict"] = filter_spatial_dict
if filter_regex_dict is not None:
data["filter_regex_dict"] = filter_regex_dict
if select_columns is not None:
if isinstance(select_columns, list):
data["select_columns"] = select_columns
elif isinstance(select_columns, dict):
data["select_column_map"] = select_columns
else:
raise ValueError(
"select columns should be a dictionary with tables as keys and values of column names in table (no suffixes)"
)
if offset is not None:
data["offset"] = offset
if suffix_map is not None:
if isinstance(suffix_map, list):
data["suffixes"] = suffix_map
elif isinstance(suffix_map, dict):
data["suffix_map"] = suffix_map
else:
raise ValueError(
"suffixes should be a dictionary with tables as keys and values as suffixes"
)
if limit is not None:
assert limit > 0
data["limit"] = limit
if desired_resolution is not None:
data["desired_resolution"] = desired_resolution
if return_pyarrow:
encoding = DEFAULT_COMPRESSION
else:
encoding = "gzip"
return url, data, query_args, encoding
def _resolve_merge_reference(
self, merge_reference, table, datastack_name, materialization_version
):
if merge_reference:
md = self.get_table_metadata(
table_name=table,
datastack_name=datastack_name,
version=materialization_version,
log_warning=False,
)
if md["reference_table"] is None:
target_table = None
else:
if len(md["reference_table"]) == 0:
target_table = None
else:
target_table = md["reference_table"]
else:
target_table = None
if target_table is not None:
tables = [[table, "target_id"], [md["reference_table"], "id"]]
if self._api_version == 2:
suffix_map = ["", "ref"]
elif self._api_version > 2:
suffix_map = {table: "", md["reference_table"]: "_ref"}
else:
tables = [table]
suffix_map = None
return tables, suffix_map
[docs] def query_table(
self,
table: str,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
filter_regex_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
datastack_name: str = None,
return_df: bool = True,
split_positions: bool = False,
materialization_version: int = None,
timestamp: datetime = None,
metadata: bool = True,
merge_reference: bool = True,
desired_resolution: Iterable = None,
get_counts: bool = False,
random_sample: int = None,
):
"""generic query on materialization tables
Args:
table: 'str'
filter_in_dict (dict , optional):
keys are column names, values are allowed entries.
Defaults to None.
filter_out_dict (dict, optional):
keys are column names, values are not allowed entries.
Defaults to None.
filter_equal_dict (dict, optional):
inner layer: keys are column names, values are specified entry.
Defaults to None.
filter_spatial_dict (dict, optional):
inner layer: keys are column names, values are bounding boxes
as [[min_x, min_y,min_z],[max_x, max_y, max_z]]
Expressed in units of the voxel_resolution of this dataset.
filter_regex_dict (dict, optional):
inner layer: keys are column names, values are regex strings
offset (int, optional): offset in query result
limit (int, optional): maximum results to return (server will set upper limit, see get_server_config)
select_columns (list of str, optional): columns to select. Defaults to None.
suffixes: (list[str], optional): suffixes to use on duplicate columns
offset (int, optional): result offset to use. Defaults to None.
will only return top K results.
datastack_name (str, optional): datastack to query.
If None defaults to one specified in client.
return_df (bool, optional): whether to return as a dataframe
default True, if False, data is returned as json (slower)
split_positions (bool, optional): whether to break position columns into x,y,z columns
default False, if False data is returned as one column with [x,y,z] array (slower)
materialization_version (int, optional): version to query.
If None defaults to one specified in client.
timestamp (datetime.datetime, optional): timestamp to query
If passsed will do a live query. Error if also passing a materialization version
metadata: (bool, optional) : toggle to return metadata (default True)
If True (and return_df is also True), return table and query metadata in the df.attr dictionary.
merge_reference: (bool, optional) : toggle to automatically join reference table
If True, metadata will be queries and if its a reference table it will perform a join
on the reference table to return the rows of that
desired_resolution: (Iterable[float], Optional) : desired resolution you want all spatial points returned in
If None, defaults to one specified in client, if that is None then points are returned
as stored in the table and should be in the resolution specified in the table metadata
random_sample: (int, optional) : if given, will do a tablesample of the table to return that many annotations
Returns:
pd.DataFrame: a pandas dataframe of results of query
"""
if desired_resolution is None:
desired_resolution = self.desired_resolution
if timestamp is not None:
if materialization_version is not None:
raise ValueError("cannot specify timestamp and materialization version")
else:
return self.live_query(
table,
timestamp,
filter_in_dict=filter_in_dict,
filter_out_dict=filter_out_dict,
filter_equal_dict=filter_equal_dict,
filter_spatial_dict=filter_spatial_dict,
filter_regex_dict=filter_regex_dict,
select_columns=select_columns,
offset=offset,
limit=limit,
datastack_name=datastack_name,
split_positions=split_positions,
post_filter=True,
metadata=metadata,
merge_reference=merge_reference,
desired_resolution=desired_resolution,
random_sample=random_sample,
)
if materialization_version is None:
materialization_version = self.version
if datastack_name is None:
datastack_name = self.datastack_name
tables, suffix_map = self._resolve_merge_reference(
merge_reference, table, datastack_name, materialization_version
)
url, data, query_args, encoding = self._format_query_components(
datastack_name,
materialization_version,
tables,
select_columns,
suffix_map,
{table: filter_in_dict} if filter_in_dict is not None else None,
{table: filter_out_dict} if filter_out_dict is not None else None,
{table: filter_equal_dict} if filter_equal_dict is not None else None,
{table: filter_spatial_dict} if filter_spatial_dict is not None else None,
{table: filter_regex_dict} if filter_regex_dict is not None else None,
return_df,
True,
offset,
limit,
desired_resolution,
random_sample=random_sample,
)
if get_counts:
query_args["count"] = True
headers = {"Content-Type": "application/json", "Accept-Encoding": encoding}
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers=headers,
params=query_args,
stream=~return_df,
)
self.raise_for_status(response)
if return_df:
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if desired_resolution is not None:
if not response.headers.get("dataframe_resolution", None):
if len(desired_resolution) != 3:
raise ValueError(
"desired resolution needs to be of length 3, for xyz"
)
vox_res = self.get_table_metadata(
table,
datastack_name,
materialization_version,
log_warning=False,
)["voxel_resolution"]
df = convert_position_columns(df, vox_res, desired_resolution)
if metadata:
attrs = self._assemble_attributes(
tables,
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
"regex": filter_regex_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=timestamp is not None,
timestamp=string_format_timestamp(timestamp),
materialization_version=materialization_version,
desired_resolution=response.headers.get(
"dataframe_resolution", desired_resolution
),
column_names=response.headers.get("column_names", None),
)
df.attrs.update(attrs)
if split_positions:
return df
else:
return concatenate_position_columns(df, inplace=True)
else:
return response.json()
[docs] def join_query(
self,
tables,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
filter_regex_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
suffixes: list = None,
datastack_name: str = None,
return_df: bool = True,
split_positions: bool = False,
materialization_version: int = None,
metadata: bool = True,
desired_resolution: Iterable = None,
random_sample: int = None,
):
"""generic query on materialization tables
Args:
tables: list of lists with length 2 or 'str'
list of two lists: first entries are table names, second
entries are the columns used for the join
filter_in_dict (dict of dicts, optional):
outer layer: keys are table names
inner layer: keys are column names, values are allowed entries.
Defaults to None.
filter_out_dict (dict of dicts, optional):
outer layer: keys are table names
inner layer: keys are column names, values are not allowed entries.
Defaults to None.
filter_equal_dict (dict of dicts, optional):
outer layer: keys are table names
inner layer: keys are column names, values are specified entry.
Defaults to None.
filter_spatial (dict of dicts, optional):
outer layer: keys are table names:
inner layer: keys are column names, values are bounding boxes
as [[min_x, min_y,min_z],[max_x, max_y, max_z]]
Expressed in units of the voxel_resolution of this dataset.
Defaults to None
filter_regex_dict (dict of dicts, optional):
outer layer: keys are table names:
inner layer: keys are column names, values are regex strings
Defaults to None
select_columns (dict of lists of str, optional): keys are table names,values are the list of columns from that table.
Defaults to None, which will select all tables. Will be passed to server as select_column_maps.
Passing a list will be passed as select_columns which is deprecated.
offset (int, optional): result offset to use. Defaults to None.
will only return top K results.
limit (int, optional): maximum results to return (server will set upper limit, see get_server_config)
suffixes (dict, optional): suffixes to use for duplicate columns, keys are table names, values are the suffix
datastack_name (str, optional): datastack to query.
If None defaults to one specified in client.
return_df (bool, optional): whether to return as a dataframe
default True, if False, data is returned as json (slower)
split_positions (bool, optional): whether to break position columns into x,y,z columns
default False, if False data is returned as one column with [x,y,z] array (slower)
materialization_version (int, optional): version to query.
If None defaults to one specified in client.
metadata: (bool, optional) : toggle to return metadata
If True (and return_df is also True), return table and query metadata in the df.attr dictionary.
desired_resolution (Iterable, optional):
What resolution to convert position columns to. Defaults to None will use defaults.
random_sample: (int, optional) : if given, will do a tablesample of the table to return that many annotations
Returns:
pd.DataFrame: a pandas dataframe of results of query
"""
if materialization_version is None:
materialization_version = self.version
if datastack_name is None:
datastack_name = self.datastack_name
if desired_resolution is None:
desired_resolution = self.desired_resolution
url, data, query_args, encoding = self._format_query_components(
datastack_name,
materialization_version,
tables,
select_columns,
suffixes,
filter_in_dict,
filter_out_dict,
filter_equal_dict,
filter_spatial_dict,
filter_regex_dict,
return_df,
True,
offset,
limit,
desired_resolution,
random_sample=random_sample,
)
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={"Content-Type": "application/json", "Accept-Encoding": encoding},
params=query_args,
stream=~return_df,
)
self.raise_for_status(response)
if return_df:
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if metadata:
attrs = self._assemble_attributes(
tables,
suffixes=suffixes,
desired_resolution=response.headers.get(
"dataframe_resolution", desired_resolution
),
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
"regex": filter_regex_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=False,
timestamp=None,
materialization_version=materialization_version,
column_names=response.headers.get("column_names", None),
)
df.attrs.update(attrs)
if split_positions:
return df
else:
return concatenate_position_columns(df, inplace=True)
[docs] def map_filters(self, filters, timestamp, timestamp_past):
"""translate a list of filter dictionaries
from a point in the future, to a point in the past
Args:
filters (list[dict]): filter dictionaries with
timestamp ([type]): [description]
timestamp_past ([type]): [description]
Returns:
[type]: [description]
"""
timestamp = convert_timestamp(timestamp)
timestamp_past = convert_timestamp(timestamp_past)
new_filters = []
root_ids = []
for filter_dict in filters:
if filter_dict is not None:
for col, val in filter_dict.items():
if col.endswith("root_id"):
if not isinstance(val, (Iterable, np.ndarray)):
root_ids.append([val])
else:
root_ids.append(val)
# if there are no root_ids then we can safely return now
if len(root_ids) == 0:
return filters, {}
root_ids = np.unique(np.concatenate(root_ids))
filter_timed_end = self.cg_client.is_latest_roots(root_ids, timestamp=timestamp)
filter_timed_start = self.cg_client.get_root_timestamps(root_ids) < timestamp
filter_timestamp = np.logical_and(filter_timed_start, filter_timed_end)
if not np.all(filter_timestamp):
roots_too_old = root_ids[~filter_timed_end]
roots_too_recent = root_ids[~filter_timed_start]
if len(roots_too_old) > 0:
too_old_str = f"{roots_too_old} are expired, "
else:
too_old_str = ""
if len(roots_too_recent) > 0:
too_recent_str = f"{roots_too_recent} are too recent, "
else:
too_recent_str = ""
raise ValueError(
f"Timestamp incompatible with IDs: {too_old_str}{too_recent_str} use chunkedgraph client to find valid ID(s)"
)
id_mapping = self.cg_client.get_past_ids(
root_ids, timestamp_past=timestamp_past, timestamp_future=timestamp
)
for filter_dict in filters:
if filter_dict is None:
new_filters.append(filter_dict)
else:
new_dict = {}
for col, root_ids in filter_dict.items():
if col.endswith("root_id"):
if not isinstance(root_ids, (Iterable, np.ndarray)):
new_dict[col] = id_mapping["past_id_map"][root_ids]
else:
new_dict[col] = np.concatenate(
[id_mapping["past_id_map"][v] for v in root_ids]
)
else:
new_dict[col] = root_ids
new_filters.append(new_dict)
return new_filters, id_mapping["future_id_map"]
def _update_rootids(self, df: pd.DataFrame, timestamp: datetime, future_map: dict):
# post process the dataframe to update all the root_ids columns
# with the most up to date get roots
if len(future_map) == 0:
future_map = None
if future_map is not None:
# pyarrow can make dataframes read only. Copying resets that.
df = df.copy()
sv_columns = [c for c in df.columns if c.endswith("supervoxel_id")]
with MyTimeIt("is_latest_roots"):
all_root_ids = np.empty(0, dtype=np.int64)
# go through the columns and collect all the root_ids to check
# to see if they need updating
for sv_col in sv_columns:
root_id_col = sv_col[: -len("supervoxel_id")] + "root_id"
# use the future map to update rootIDs
if future_map is not None:
df[root_id_col].replace(future_map, inplace=True)
all_root_ids = np.append(all_root_ids, df[root_id_col].values.copy())
uniq_root_ids = np.unique(all_root_ids)
del all_root_ids
uniq_root_ids = uniq_root_ids[uniq_root_ids != 0]
# logging.info(f"uniq_root_ids {uniq_root_ids}")
is_latest_root = self.cg_client.is_latest_roots(
uniq_root_ids, timestamp=timestamp
)
latest_root_ids = uniq_root_ids[is_latest_root]
latest_root_ids = np.concatenate([[0], latest_root_ids])
# go through the columns and collect all the supervoxel ids to update
all_svids = np.empty(0, dtype=np.int64)
all_is_latest = []
all_svid_lengths = []
for sv_col in sv_columns:
with MyTimeIt(f"find svids {sv_col}"):
root_id_col = sv_col[: -len("supervoxel_id")] + "root_id"
svids = df[sv_col].values
root_ids = df[root_id_col]
is_latest_root = np.isin(root_ids, latest_root_ids)
all_is_latest.append(is_latest_root)
n_svids = len(svids[~is_latest_root])
all_svid_lengths.append(n_svids)
logger.info(f"{sv_col} has {n_svids} to update")
all_svids = np.append(all_svids, svids[~is_latest_root])
logger.info(f"num zero svids: {np.sum(all_svids==0)}")
logger.info(f"all_svids dtype {all_svids.dtype}")
logger.info(f"all_svid_lengths {all_svid_lengths}")
with MyTimeIt("get_roots"):
# find the up to date root_ids for those supervoxels
updated_root_ids = self.cg_client.get_roots(all_svids, timestamp=timestamp)
del all_svids
# loop through the columns again replacing the root ids with their updated
# supervoxelids
k = 0
for is_latest_root, n_svids, sv_col in zip(
all_is_latest, all_svid_lengths, sv_columns
):
with MyTimeIt(f"replace_roots {sv_col}"):
root_id_col = sv_col[: -len("supervoxel_id")] + "root_id"
root_ids = df[root_id_col].values.copy()
uroot_id = updated_root_ids[k : k + n_svids]
k += n_svids
root_ids[~is_latest_root] = uroot_id
# ran into an isssue with pyarrow producing read only columns
df[root_id_col] = None
df[root_id_col] = root_ids
return df
[docs] def ingest_annotation_table(
self,
table_name: str,
datastack_name: str = None,
):
"""Trigger supervoxel lookup and rootID looksup of new annotations in a table.
Args:
table_name (str): table to drigger
datastack_name (str, optional): datastack to trigger it. Defaults to what is set in client.
Returns:
response: status code of response from server
"""
if datastack_name is None:
datastack_name = self.datastack_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["ingest_annotation_table"].format_map(endpoint_mapping)
response = self.session.post(url)
return handle_response(response)
[docs] def lookup_supervoxel_ids(
self,
table_name: str,
annotation_ids: list = None,
datastack_name: str = None,
):
"""Trigger supervoxel lookups of new annotations in a table.
Args:
table_name (str): table to drigger
annotation_ids: (list, optional): list of annotation ids to lookup. Default is None,
which will trigger lookup of entire table.
datastack_name (str, optional): datastack to trigger it. Defaults to what is set in client.
Returns:
response: status code of response from server
"""
if datastack_name is None:
datastack_name = self.datastack_name
if annotation_ids is not None:
data = {"annotation_ids": annotation_ids}
else:
data = {}
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["table_name"] = table_name
url = self._endpoints["lookup_supervoxel_ids"].format_map(endpoint_mapping)
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={
"Content-Type": "application/json",
"Accept-Encoding": "",
},
)
return handle_response(response)
[docs] def live_live_query(
self,
table: str,
timestamp: datetime,
joins=None,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
datastack_name: str = None,
split_positions: bool = False,
metadata: bool = True,
suffixes: dict = None,
desired_resolution: Iterable = None,
allow_missing_lookups: bool = False,
random_sample: int = None,
):
"""Beta method for querying cave annotation tables with rootIDs and annotations at a particular
timestamp. Note: this method requires more explicit mapping of filters and selection to table
as its designed to test a more general endpoint that should eventually support complex joins.
Args:
table (str): principle table to query
timestamp (datetime): timestamp to use for querying
joins (list): a list of joins, where each join is a list of [table1,column1, table2, column2]
filter_in_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and list values to accept . Defaults to None.
filter_out_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and list values to reject. Defaults to None.
filter_equal_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values to equate. Defaults to None.
filter_spatial_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values of 2x3 list of bounds. Defaults to None.
filter_regex_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values of regex strings. Defaults to None.
select_columns (_type_, optional): a dictionary with tables as keys, values are list of columns. Defaults to None.
offset (int, optional): value to offset query by. Defaults to None.
limit (int, optional): limit of query. Defaults to None.
datastack_name (str, optional): datastack to query. Defaults to set by client.
split_positions (bool, optional): whether to split positions into seperate columns, True is faster. Defaults to False.
metadata (bool, optional): whether to attach metadata to dataframe. Defaults to True.
suffixes (dict, optional): what suffixes to use on joins, keys are table_names, values are suffixes. Defaults to None.
desired_resolution (Iterable, optional): What resolution to convert position columns to. Defaults to None will use defaults.
allow_missing_lookups (bool, optional): If there are annotations without supervoxels and rootids yet, allow results. Defaults to False.
random_sample: (int, optional) : if given, will do a tablesample of the table to return that many annotations
Example:
live_live_query("table_name",datetime.datetime.now(datetime.timezone.utc),
joins=[[table_name, table_column, joined_table, joined_column],
[joined_table, joincol2, third_table, joincol_third]]
suffixes={
"table_name":"suffix1",
"joined_table":"suffix2",
"third_table":"suffix3"
},
select_columns= {
"table_name":[ "column","names"],
"joined_table":["joined_colum"]
},
filter_in_dict= {
"table_name":{
"column_name":[included,values]
}
},
filter_out_dict= {
"table_name":{
"column_name":[excluded,values]
}
},
filter_equal_dict"={
"table_name":{
"column_name":value
},
filter_spatial_dict"= {
"table_name": {
"column_name": [[min_x, min_y, min_z], [max_x, max_y, max_z]]
}
filter_regex_dict"= {
"table_name": {
"column_name": "regex_string"
}
Returns:
pd.DataFrame: result of query
"""
logging.warning(
"Deprecation: this method is to facilitate beta testing of this feature, \
it will likely get removed in future versions. "
)
timestamp = convert_timestamp(timestamp)
return_df = True
if datastack_name is None:
datastack_name = self.datastack_name
if desired_resolution is None:
desired_resolution = self.desired_resolution
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
data = {}
query_args = {}
query_args["return_pyarrow"] = True
query_args["arrow_format"] = True
query_args["merge_reference"] = False
query_args["allow_missing_lookups"] = allow_missing_lookups
if random_sample:
query_args["random_sample"] = random_sample
data["table"] = table
data["timestamp"] = timestamp
url = self._endpoints["live_live_query"].format_map(endpoint_mapping)
if joins is not None:
data["join_tables"] = joins
if filter_in_dict is not None:
data["filter_in_dict"] = filter_in_dict
if filter_out_dict is not None:
data["filter_notin_dict"] = filter_out_dict
if filter_equal_dict is not None:
data["filter_equal_dict"] = filter_equal_dict
if filter_spatial_dict is not None:
data["filter_spatial_dict"] = filter_spatial_dict
if select_columns is not None:
data["select_columns"] = select_columns
if offset is not None:
data["offset"] = offset
if limit is not None:
assert limit > 0
data["limit"] = limit
if suffixes is not None:
data["suffixes"] = suffixes
encoding = DEFAULT_COMPRESSION
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={
"Content-Type": "application/json",
"Accept-Encoding": encoding,
},
params=query_args,
stream=~return_df,
verify=self.verify,
)
self.raise_for_status(response)
if desired_resolution is None:
desired_resolution = self.desired_resolution
with MyTimeIt("deserialize"):
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if desired_resolution is not None:
if len(desired_resolution) != 3:
raise ValueError(
"desired resolution needs to be of length 3, for xyz"
)
vox_res = self.get_table_metadata(
table_name=table,
datastack_name=datastack_name,
log_warning=False,
)["voxel_resolution"]
df = convert_position_columns(df, vox_res, desired_resolution)
if not split_positions:
concatenate_position_columns(df, inplace=True)
if metadata:
try:
attrs = self._assemble_attributes(
table,
join_query=False,
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=timestamp is not None,
timestamp=string_format_timestamp(timestamp),
materialization_version=None,
desired_resolution=desired_resolution,
)
df.attrs.update(attrs)
except HTTPError as e:
raise Exception(
e.message
+ " Metadata could not be loaded, try with metadata=False if not needed"
)
return df
[docs] def live_query(
self,
table: str,
timestamp: datetime,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
filter_regex_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
datastack_name: str = None,
split_positions: bool = False,
post_filter: bool = True,
metadata: bool = True,
merge_reference: bool = True,
desired_resolution: Iterable = None,
random_sample: int = None,
):
"""generic query on materialization tables
Args:
table: 'str'
timestamp (datetime.datetime): time to materialize (in utc)
pass datetime.datetime.now(datetime.timezone.utc) for present time
filter_in_dict (dict , optional):
keys are column names, values are allowed entries.
Defaults to None.
filter_out_dict (dict, optional):
keys are column names, values are not allowed entries.
Defaults to None.
filter_equal_dict (dict, optional):
inner layer: keys are column names, values are specified entry.
Defaults to None.
filter_spatial (dict, optional):
inner layer: keys are column names, values are bounding boxes
as [[min_x, min_y,min_z],[max_x, max_y, max_z]]
Expressed in units of the voxel_resolution of this dataset.
Defaults to None
filter_regex_dict (dict, optional):
inner layer: keys are column names, values are regex strings
offset (int, optional): offset in query result
limit (int, optional): maximum results to return (server will set upper limit, see get_server_config)
select_columns (list of str, optional): columns to select. Defaults to None.
suffixes: (list[str], optional): suffixes to use on duplicate columns
offset (int, optional): result offset to use. Defaults to None.
will only return top K results.
datastack_name (str, optional): datastack to query.
If None defaults to one specified in client.
split_positions (bool, optional): whether to break position columns into x,y,z columns
default False, if False data is returned as one column with [x,y,z] array (slower)
post_filter (bool, optional): whether to filter down the result based upon the filters specified
if false, it will return the query with present root_ids in the root_id columns,
but the rows will reflect the filters translated into their past IDs.
So if, for example, a cell had a false merger split off since the last materialization.
those annotations on that incorrect portion of the cell will be included if this is False,
but will be filtered down if this is True. (Default=True)
metadata: (bool, optional) : toggle to return metadata
If True (and return_df is also True), return table and query metadata in the df.attr dictionary.
merge_reference: (bool, optional) : toggle to automatically join reference table
If True, metadata will be queries and if its a reference table it will perform a join
on the reference table to return the rows of that
desired_resolution: (Iterable[float], Optional) : desired resolution you want all spatial points returned in
If None, defaults to one specified in client, if that is None then points are returned
as stored in the table and should be in the resolution specified in the table metadata
random_sample: (int, optional) : if given, will do a tablesample of the table to return that many annotations
Returns:
pd.DataFrame: a pandas dataframe of results of query
"""
timestamp = convert_timestamp(timestamp)
return_df = True
if self.cg_client is None:
raise ValueError("You must have a cg_client to run live_query")
if datastack_name is None:
datastack_name = self.datastack_name
if desired_resolution is None:
desired_resolution = self.desired_resolution
with MyTimeIt("find_mat_version"):
# we want to find the most recent materialization
# in which the timestamp given is in the future
mds = self.get_versions_metadata()
materialization_version = None
# make sure the materialization's are increasing in ID/time
for md in sorted(mds, key=lambda x: x["id"]):
ts = md["time_stamp"]
if timestamp >= ts:
materialization_version = md["version"]
if timestamp == ts:
# If timestamp equality to a version, use the standard query_table.
return self.query_table(
table=table,
filter_in_dict=filter_in_dict,
filter_out_dict=filter_out_dict,
filter_equal_dict=filter_equal_dict,
filter_spatial_dict=filter_spatial_dict,
filter_regex_dict=filter_regex_dict,
select_columns=select_columns,
offset=offset,
limit=limit,
datastack_name=datastack_name,
split_positions=split_positions,
materialization_version=materialization_version,
metadata=metadata,
merge_reference=merge_reference,
desired_resolution=desired_resolution,
return_df=True,
random_sample=random_sample,
)
else:
timestamp_start = ts
# if none of the available versions are before
# this timestamp, then we cannot support the query
if materialization_version is None:
raise (
ValueError(
"""The timestamp you passed is not recent enough
for the materialization versions that are available"""
)
)
# first we want to translate all these filters into the IDss at the
# most recent materialization
with MyTimeIt("map_filters"):
past_filters, future_map = self.map_filters(
[filter_in_dict, filter_out_dict, filter_equal_dict],
timestamp,
timestamp_start,
)
past_filter_in_dict, past_filter_out_dict, past_equal_dict = past_filters
if past_equal_dict is not None:
# when doing a filter equal in the past
# we translate it to a filter_in, as 1 ID might
# be multiple IDs in the past.
# so we want to update the filter_in dict
cols = [col for col in past_equal_dict.keys()]
for col in cols:
if col.endswith("root_id"):
if past_filter_in_dict is None:
past_filter_in_dict = {}
past_filter_in_dict[col] = past_equal_dict.pop(col)
if len(past_equal_dict) == 0:
past_equal_dict = None
tables, suffix_map = self._resolve_merge_reference(
merge_reference, table, datastack_name, materialization_version
)
with MyTimeIt("package query"):
url, data, query_args, encoding = self._format_query_components(
datastack_name,
materialization_version,
tables,
select_columns,
suffix_map,
{table: past_filter_in_dict}
if past_filter_in_dict is not None
else None,
{table: past_filter_out_dict}
if past_filter_out_dict is not None
else None,
{table: past_equal_dict} if past_equal_dict is not None else None,
{table: filter_spatial_dict}
if filter_spatial_dict is not None
else None,
{table: filter_regex_dict} if filter_regex_dict is not None else None,
True,
True,
offset,
limit,
desired_resolution,
random_sample=random_sample,
)
logger.debug(f"query_args: {query_args}")
logger.debug(f"query data: {data}")
with MyTimeIt("query materialize"):
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={
"Content-Type": "application/json",
"Accept-Encoding": encoding,
},
params=query_args,
stream=~return_df,
verify=self.verify,
)
self.raise_for_status(response)
if desired_resolution is None:
desired_resolution = self.desired_resolution
with MyTimeIt("deserialize"):
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if desired_resolution is not None:
if not response.headers.get("dataframe_resolution", None):
if len(desired_resolution) != 3:
raise ValueError(
"desired resolution needs to be of length 3, for xyz"
)
vox_res = self.get_table_metadata(
table,
datastack_name,
materialization_version,
log_warning=False,
)["voxel_resolution"]
df = convert_position_columns(df, vox_res, desired_resolution)
if not split_positions:
concatenate_position_columns(df, inplace=True)
# post process the dataframe to update all the root_ids columns
# with the most up to date get roots
df = self._update_rootids(df, timestamp, future_map)
# apply the original filters to remove rows
# from this result which are not relevant
if post_filter:
with MyTimeIt("post_filter"):
if filter_in_dict is not None:
for col, val in filter_in_dict.items():
df = df[df[col].isin(val)]
if filter_out_dict is not None:
for col, val in filter_out_dict.items():
df = df[~df[col].isin(val)]
if filter_equal_dict is not None:
for col, val in filter_equal_dict.items():
df = df[df[col] == val]
if metadata:
attrs = self._assemble_attributes(
table,
join_query=False,
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
"regex": filter_regex_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=timestamp is not None,
timestamp=string_format_timestamp(timestamp),
materialization_version=None,
desired_resolution=response.headers.get(
"dataframe_resolution", desired_resolution
),
)
df.attrs.update(attrs)
return df
[docs] def synapse_query(
self,
pre_ids: Union[int, Iterable, np.ndarray] = None,
post_ids: Union[int, Iterable, np.ndarray] = None,
bounding_box=None,
bounding_box_column: str = "post_pt_position",
timestamp: datetime = None,
remove_autapses: bool = True,
include_zeros: bool = True,
limit: int = None,
offset: int = None,
split_positions: bool = False,
desired_resolution: Iterable[float] = None,
materialization_version: int = None,
synapse_table: str = None,
datastack_name: str = None,
metadata: bool = True,
):
"""Convience method for quering synapses. Will use the synapse table specified in the info service by default.
It will also remove autapses by default. NOTE: This is not designed to allow querying of the entire synapse table.
A query with no filters will return only a limited number of rows (configured by the server) and will do so in a non-deterministic fashion.
Please contact your dataset administrator if you want access to the entire table.
Args:
pre_ids (Union[int, Iterable, optional): pre_synaptic cell(s) to query. Defaults to None.
post_ids (Union[int, Iterable, optional): post synaptic cell(s) to query. Defaults to None.
timestamp (datetime.datetime, optional): timestamp to query (optional).
If passed recalculate query at timestamp, do not pass with materialization_verison
bounding_box: [[min_x, min_y, min_z],[max_x, max_y, max_z]] bounding box to filter
synapse locations. Expressed in units of the voxel_resolution of this dataset (optional)
bounding_box_column (str, optional): which synapse location column to filter by (Default to "post_pt_position")
remove_autapses (bool, optional): post-hoc filter out synapses. Defaults to True.
include_zeros (bool, optional): whether to include synapses to/from id=0 (out of segmentation). Defaults to True.
limit (int, optional): number of synapses to limit, Defaults to None (server side limit applies)
offset (int, optional): number of synapses to offset query, Defaults to None (no offset).
split_positions (bool, optional): whether to return positions as seperate x,y,z columns (faster)
defaults to False
desired_resolution : Iterable[float] or None, optional
If given, should be a list or array of the desired resolution you want queries returned in
useful for materialization queries.
synapse_table (str, optional): synapse table to query. If None, defaults to self.synapse_table.
datastack_name: (str, optional): datastack to query
materialization_version (int, optional): version to query.
defaults to self.materialization_version if not specified
metadata: (bool, optional) : toggle to return metadata
If True (and return_df is also True), return table and query metadata in the df.attr dictionary.
"""
filter_in_dict = {}
filter_equal_dict = {}
filter_out_dict = None
filter_equal_dict = {}
filter_spatial_dict = None
if synapse_table is None:
if self.synapse_table is None:
raise ValueError(
"Must define synapse table in class init or pass to method"
)
synapse_table = self.synapse_table
if not include_zeros:
filter_out_dict = {"pre_pt_root_id": [0], "post_pt_root_id": [0]}
if pre_ids is not None:
if isinstance(pre_ids, (Iterable, np.ndarray)):
filter_in_dict["pre_pt_root_id"] = pre_ids
else:
filter_equal_dict["pre_pt_root_id"] = pre_ids
if post_ids is not None:
if isinstance(post_ids, (Iterable, np.ndarray)):
filter_in_dict["post_pt_root_id"] = post_ids
else:
filter_equal_dict["post_pt_root_id"] = post_ids
if bounding_box is not None:
filter_spatial_dict = {bounding_box_column: bounding_box}
df = self.query_table(
synapse_table,
filter_in_dict=filter_in_dict,
filter_out_dict=filter_out_dict,
filter_equal_dict=filter_equal_dict,
filter_spatial_dict=filter_spatial_dict,
offset=offset,
limit=limit,
split_positions=split_positions,
desired_resolution=desired_resolution,
materialization_version=materialization_version,
timestamp=timestamp,
datastack_name=datastack_name,
metadata=metadata,
merge_reference=False,
)
if metadata:
df.attrs["remove_autapses"] = remove_autapses
if remove_autapses:
return df.query("pre_pt_root_id!=post_pt_root_id")
else:
return df
def _assemble_attributes(
self, tables, suffixes=None, desired_resolution=None, is_view=False, **kwargs
):
if isinstance(tables, str):
tables = [tables]
if isinstance(desired_resolution, str):
desired_resolution = np.array(
[float(r) for r in desired_resolution.split(", ")]
)
join_query = len(tables) > 1
materialization_version = kwargs.get("materialization_version", None)
attrs = {
"datastack_name": self.datastack_name,
}
if not join_query:
attrs["join_query"] = False
if is_view:
meta = self.get_view_metadata(
tables[0],
log_warning=False,
materialization_version=materialization_version,
)
else:
try:
meta = self.get_table_metadata(
tables[0], log_warning=False, version=materialization_version
)
except HTTPError:
meta = self.fc.annotation.get_table_metadata(tables[0])
for k, v in meta.items():
if re.match("^table", k):
attrs[k] = v
else:
attrs[f"table_{k}"] = v
if desired_resolution is None:
attrs["dataframe_resolution"] = attrs["table_voxel_resolution"]
else:
attrs["dataframe_resolution"] = desired_resolution
else:
attrs["join_query"] = True
attrs["tables"] = {}
table_attrs = attrs["tables"]
if suffixes is None:
suffixes = ["_x", "_y"]
for (tname, jcol), s in zip(tables, suffixes):
table_attrs[tname] = {}
try:
meta = self.get_table_metadata(tname, log_warning=False)
except HTTPError:
meta = self.fc.annotation.get_table_metadata(tname)
for k, v in meta.items():
if re.match("^table", k):
table_attrs[tname][k] = v
else:
table_attrs[tname][f"table_{k}"] = v
table_attrs[tname]["join_column"] = jcol
table_attrs[tname]["suffix"] = s
if desired_resolution is None:
res = []
for tn in attrs["tables"]:
res.append(attrs["tables"][tn]["table_voxel_resolution"])
if np.atleast_2d(np.unique(np.array(res), axis=0)).shape[0] == 1:
attrs["dataframe_resolution"] = res[0]
else:
attrs["dataframe_resolution"] = "mixed_resolutions"
else:
attrs["dataframe_resolution"] = desired_resolution
attrs.update(kwargs)
return attrs
[docs]class MaterializationClientV3(MaterializationClientV2):
def __init__(self, *args, **kwargs):
super(MaterializationClientV3, self).__init__(*args, **kwargs)
[docs] def live_live_query(
self,
table: str,
timestamp: datetime,
joins=None,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
filter_regex_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
datastack_name: str = None,
split_positions: bool = False,
metadata: bool = True,
suffixes: dict = None,
desired_resolution: Iterable = None,
allow_missing_lookups: bool = False,
allow_invalid_root_ids: bool = False,
random_sample: int = None,
):
"""Beta method for querying cave annotation tables with rootIDs and annotations at a particular
timestamp. Note: this method requires more explicit mapping of filters and selection to table
as its designed to test a more general endpoint that should eventually support complex joins.
Args:
table (str): principle table to query
timestamp (datetime): timestamp to use for querying
joins (list): a list of joins, where each join is a list of [table1,column1, table2, column2]
filter_in_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and list values to accept . Defaults to None.
filter_out_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and list values to reject. Defaults to None.
filter_equal_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values to equate. Defaults to None.
filter_spatial_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values of 2x3 list of bounds. Defaults to None.
filter_regex_dict (dict, optional): a dictionary with tables as keys, values are dicts with column keys and values of regex strings. Defaults to None.
select_columns (_type_, optional): a dictionary with tables as keys, values are list of columns. Defaults to None.
offset (int, optional): value to offset query by. Defaults to None.
limit (int, optional): limit of query. Defaults to None.
datastack_name (str, optional): datastack to query. Defaults to set by client.
split_positions (bool, optional): whether to split positions into seperate columns, True is faster. Defaults to False.
metadata (bool, optional): whether to attach metadata to dataframe. Defaults to True.
suffixes (dict, optional): what suffixes to use on joins, keys are table_names, values are suffixes. Defaults to None.
desired_resolution (Iterable, optional): What resolution to convert position columns to. Defaults to None will use defaults.
allow_missing_lookups (bool, optional): If there are annotations without supervoxels and rootids yet, allow results. Defaults to False.
allow_invalid_root_ids (bool, optional): If True, ignore root ids not valid at the given timestamp, otherwise raise an Error. Defaults to False.
random_sample (int, optional): If given, will do a tablesample of the table to return that many annotations
Example:
live_live_query("table_name",datetime.datetime.now(datetime.timezone.utc),
joins=[[table_name, table_column, joined_table, joined_column],
[joined_table, joincol2, third_table, joincol_third]]
suffixes={
"table_name":"suffix1",
"joined_table":"suffix2",
"third_table":"suffix3"
},
select_columns= {
"table_name":[ "column","names"],
"joined_table":["joined_colum"]
},
filter_in_dict= {
"table_name":{
"column_name":[included,values]
}
},
filter_out_dict= {
"table_name":{
"column_name":[excluded,values]
}
},
filter_equal_dict"={
"table_name":{
"column_name":value
},
filter_spatial_dict"= {
"table_name": {
"column_name": [[min_x, min_y, min_z], [max_x, max_y, max_z]]
}
filter_regex_dict"= {
"table_name": {
"column_name": "regex"
}
}
Returns:
pd.DataFrame: result of query
"""
logging.warning(
"Deprecation: this method is to facilitate beta testing of this feature, \
it will likely get removed in future versions. "
)
timestamp = convert_timestamp(timestamp)
return_df = True
if datastack_name is None:
datastack_name = self.datastack_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
data = {}
query_args = {}
query_args["return_pyarrow"] = True
query_args["arrow_format"] = True
query_args["merge_reference"] = False
query_args["allow_missing_lookups"] = allow_missing_lookups
query_args["allow_invalid_root_ids"] = allow_invalid_root_ids
if random_sample:
query_args["random_sample"] = random_sample
data["table"] = table
data["timestamp"] = timestamp
url = self._endpoints["live_live_query"].format_map(endpoint_mapping)
if joins is not None:
data["join_tables"] = joins
if filter_in_dict is not None:
data["filter_in_dict"] = filter_in_dict
if filter_out_dict is not None:
data["filter_notin_dict"] = filter_out_dict
if filter_equal_dict is not None:
data["filter_equal_dict"] = filter_equal_dict
if filter_spatial_dict is not None:
data["filter_spatial_dict"] = filter_spatial_dict
if filter_regex_dict is not None:
data["filter_regex_dict"] = filter_regex_dict
if select_columns is not None:
data["select_columns"] = select_columns
if offset is not None:
data["offset"] = offset
if limit is not None:
assert limit > 0
data["limit"] = limit
if suffixes is not None:
data["suffixes"] = suffixes
if desired_resolution is None:
desired_resolution = self.desired_resolution
if desired_resolution is not None:
data["desired_resolution"] = desired_resolution
encoding = DEFAULT_COMPRESSION
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={
"Content-Type": "application/json",
"Accept-Encoding": encoding,
},
params=query_args,
stream=~return_df,
verify=self.verify,
)
self.raise_for_status(response)
with MyTimeIt("deserialize"):
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if desired_resolution is not None:
if not response.headers.get("dataframe_resolution", None):
if len(desired_resolution) != 3:
raise ValueError(
"desired resolution needs to be of length 3, for xyz"
)
vox_res = self.get_table_metadata(
table,
datastack_name,
log_warning=False,
)["voxel_resolution"]
df = convert_position_columns(df, vox_res, desired_resolution)
if not split_positions:
concatenate_position_columns(df, inplace=True)
if metadata:
try:
attrs = self._assemble_attributes(
table,
join_query=False,
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
"regex": filter_regex_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=timestamp is not None,
timestamp=string_format_timestamp(timestamp),
materialization_version=None,
desired_resolution=response.headers.get(
"dataframe_resolution", desired_resolution
),
)
df.attrs.update(attrs)
except HTTPError as e:
raise Exception(
e.message
+ " Metadata could not be loaded, try with metadata=False if not needed"
)
return df
[docs] def get_views(self, version: int = None, datastack_name: str = None):
"""get all available views for a version
Args:
version (int, optional): version to query.
Defaults to None. (will version set by client)
datastack_name (str, optional): datastack to query. Defaults to None.
(will use datastack set by client)
Returns:
list[str]: a list of views
"""
if datastack_name is None:
datastack_name = self.datastack_name
if version is None:
version = self.version
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = version
url = self._endpoints["get_views"].format_map(endpoint_mapping)
response = self.session.get(url, verify=self.verify)
self.raise_for_status(response)
return response.json()
[docs] def get_view_schema(
self,
view_name: str,
materialization_version: int = None,
datastack_name: str = None,
log_warning: bool = True,
):
"""get schema for a view
Args:
view_name (str): name of view to query
materialization_version (int, optional): version to query.
Defaults to None. (will version set by client)
log_warning (bool, optional): whether to log warnings. Defaults to True.
Returns:
dict: schema of view
"""
if datastack_name is None:
datastack_name = self.datastack_name
if materialization_version is None:
materialization_version = self.version
endpoint_mapping = self.default_url_mapping
endpoint_mapping["view_name"] = view_name
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = materialization_version
url = self._endpoints["view_schema"].format_map(endpoint_mapping)
response = self.session.get(url, verify=self.verify)
self.raise_for_status(response, log_warning=log_warning)
return response.json()
[docs] def get_view_schemas(
self,
materialization_version: int = None,
datastack_name: str = None,
log_warning: bool = True,
):
"""get schemas for all views
Args:
materialization_version (int, optional): version to query.
Defaults to None. (will version set by client)
log_warning (bool, optional): whether to log warnings. Defaults to True.
Returns:
dict: schemas of all views
"""
if datastack_name is None:
datastack_name = self.datastack_name
if materialization_version is None:
materialization_version = self.version
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = materialization_version
url = self._endpoints["view_schemas"].format_map(endpoint_mapping)
response = self.session.get(url, verify=self.verify)
self.raise_for_status(response, log_warning=log_warning)
return response.json()
[docs] def query_view(
self,
view_name: str,
filter_in_dict=None,
filter_out_dict=None,
filter_equal_dict=None,
filter_spatial_dict=None,
filter_regex_dict=None,
select_columns=None,
offset: int = None,
limit: int = None,
datastack_name: str = None,
return_df: bool = True,
split_positions: bool = False,
materialization_version: int = None,
metadata: bool = True,
merge_reference: bool = True,
desired_resolution: Iterable = None,
get_counts: bool = False,
random_sample: int = None,
):
"""generic query on a view
Args:
table: 'str'
filter_in_dict (dict , optional):
keys are column names, values are allowed entries.
Defaults to None.
filter_out_dict (dict, optional):
keys are column names, values are not allowed entries.
Defaults to None.
filter_equal_dict (dict, optional):
inner layer: keys are column names, values are specified entry.
Defaults to None.
filter_spatial (dict, optional):
inner layer: keys are column names, values are bounding boxes
as [[min_x, min_y,min_z],[max_x, max_y, max_z]]
Expressed in units of the voxel_resolution of this dataset.
filter_regex_dict (dict, optional):
inner layer: keys are column names, values are regex strings.
offset (int, optional): offset in query result
limit (int, optional): maximum results to return (server will set upper limit, see get_server_config)
select_columns (list of str, optional): columns to select. Defaults to None.
suffixes: (list[str], optional): suffixes to use on duplicate columns
offset (int, optional): result offset to use. Defaults to None.
will only return top K results.
datastack_name (str, optional): datastack to query.
If None defaults to one specified in client.
return_df (bool, optional): whether to return as a dataframe
default True, if False, data is returned as json (slower)
split_positions (bool, optional): whether to break position columns into x,y,z columns
default False, if False data is returned as one column with [x,y,z] array (slower)
materialization_version (int, optional): version to query.
If None defaults to one specified in client.
metadata: (bool, optional) : toggle to return metadata (default True)
If True (and return_df is also True), return table and query metadata in the df.attr dictionary.
merge_reference: (bool, optional) : toggle to automatically join reference table
If True, metadata will be queries and if its a reference table it will perform a join
on the reference table to return the rows of that
desired_resolution: (Iterable[float], Optional) : desired resolution you want all spatial points returned in
If None, defaults to one specified in client, if that is None then points are returned
as stored in the table and should be in the resolution specified in the table metadata
random_sample: (int, optional) : if given, will do a tablesample of the table to return that many annotations
Returns:
pd.DataFrame: a pandas dataframe of results of query
"""
if desired_resolution is None:
desired_resolution = self.desired_resolution
if materialization_version is None:
materialization_version = self.version
if datastack_name is None:
datastack_name = self.datastack_name
url, data, query_args, encoding = self._format_query_components(
datastack_name,
materialization_version,
[view_name],
select_columns,
None,
{view_name: filter_in_dict} if filter_in_dict is not None else None,
{view_name: filter_out_dict} if filter_out_dict is not None else None,
{view_name: filter_equal_dict} if filter_equal_dict is not None else None,
{view_name: filter_spatial_dict}
if filter_spatial_dict is not None
else None,
{view_name: filter_regex_dict} if filter_regex_dict is not None else None,
return_df,
True,
offset,
limit,
desired_resolution,
use_view=True,
random_sample=random_sample,
)
if get_counts:
query_args["count"] = True
response = self.session.post(
url,
data=json.dumps(data, cls=BaseEncoder),
headers={"Content-Type": "application/json", "Accept-Encoding": encoding},
params=query_args,
stream=~return_df,
)
self.raise_for_status(response)
if return_df:
with warnings.catch_warnings():
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
df = deserialize_query_response(response)
if metadata:
attrs = self._assemble_attributes(
[view_name],
is_view=True,
filters={
"inclusive": filter_in_dict,
"exclusive": filter_out_dict,
"equal": filter_equal_dict,
"spatial": filter_spatial_dict,
"regex": filter_regex_dict,
},
select_columns=select_columns,
offset=offset,
limit=limit,
live_query=False,
timestamp=None,
materialization_version=materialization_version,
desired_resolution=response.headers.get(
"dataframe_resolution", desired_resolution
),
column_names=response.headers.get("column_names", None),
)
df.attrs.update(attrs)
if split_positions:
return df
else:
return concatenate_position_columns(df, inplace=True)
else:
return response.json()
[docs] def get_unique_string_values(self, table: str, datastack_name: str = None):
"""get unique string values for a table
Args:
table: 'str'
datastack_name (str, optional): datastack to query.
If None defaults to one specified in client.
Returns:
dict[str]: a dictionary of column names and unique values
"""
if datastack_name is None:
datastack_name = self.datastack_name
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["table_name"] = table
url = self._endpoints["unique_string_values"].format_map(endpoint_mapping)
response = self.session.get(url, verify=self.verify)
self.raise_for_status(response)
return response.json()
# included for historical reasons, there was a typo in the class name
MaterializatonClientV2 = MaterializationClientV2
MaterializatonClientV3 = MaterializationClientV3
client_mapping = {
2: MaterializationClientV2,
3: MaterializationClientV3,
"latest": MaterializationClientV2,
}