from .base import (
handle_response,
)
import urllib
from .endpoints import auth_endpoints_v1, default_global_server_address
import os
import webbrowser
import requests
import json
import logging
import time
logger = logging.getLogger(__name__)
default_token_location = "~/.cloudvolume/secrets"
default_token_name = "cave-secret.json"
deprecated_token_names = ["chunkedgraph-secret.json"]
default_token_key = "token"
default_token_file = f"{default_token_location}/{default_token_name}"
deprecated_token_files = [
f"{default_token_location}/{f}" for f in deprecated_token_names
]
[docs]def write_token(token, filepath, key, overwrite=True):
if os.path.exists(filepath):
with open(filepath, "r") as f:
secrets = json.load(f)
if overwrite is False and key in secrets:
raise ValueError(f'Key "{key}" already exists in token file "{filepath}"')
else:
secrets = {}
secrets[key] = token
secret_dir = os.path.dirname(filepath)
if not os.path.exists(secret_dir):
full_dir = os.path.expanduser(secret_dir)
os.makedirs(full_dir)
with open(filepath, "w") as f:
json.dump(secrets, f)
[docs]class AuthClient(object):
"""Client to find and use auth tokens to access the dynamic annotation framework services.
Parameters
----------
token_file : str, optional
Path to a JSON key:value file holding your auth token.
By default, "~/.cloudvolume/secrets/cave-secret.json"
(will check deprecated token name "chunkedgraph-secret.json" as well)
token_key : str, optional
Key for the token in the token_file.
By default, "token"
token : str or None, optional
Direct entry of the token as a string. If provided, overrides the files.
If None, attempts to use the file paths.
server_address : str, optional,
URL to the auth server. By default, uses a default server address.
"""
def __init__(
self,
token_file=None,
token_key=None,
token=None,
server_address=default_global_server_address,
):
if token_file is None:
server = urllib.parse.urlparse(server_address).netloc
server_file = server + "-cave-secret.json"
self._server_file_path = os.path.join(default_token_location, server_file)
self._server_file_path = os.path.expanduser(self._server_file_path)
if os.path.isfile(self._server_file_path):
token_file = self._server_file_path
else:
token_file = default_token_file
self._token_file = os.path.expanduser(token_file)
if token_key is None:
token_key = default_token_key
self._token_key = token_key
if token is None:
token = self._load_token(self._token_file, self._token_key)
if token is None:
# then check the deprecated token
for deprecated_file in deprecated_token_files:
_dep_file = os.path.expanduser(deprecated_file)
token = self._load_token(_dep_file, self._token_key)
if token is not None:
logger.warning(
f"""file location {deprecated_file} is deprecated,
rename to 'cave-secret.json' or 'SERVER_ADDRESS-cave-secret.json"""
)
# then we found a token and we should break
break
self._token = token
self._server_address = server_address
self._default_endpoint_mapping = {"auth_server_address": self._server_address}
@property
def token(self):
"""Secret token used to authenticate yourself to the Connectome Annotation Versioning Engine services."""
return self._token
@token.setter
def token(self, new_token):
self._token = new_token
self._token_key = None
[docs] def get_token(
self,
token_key=None,
):
"""Load a token with a given key the specified token file
Parameters
----------
token_key : str or None, optional
key in the token file JSON, by default None. If None, uses 'token'.
"""
self._token_key = token_key
self._token = self._load_token(self._token_file, self._token_key)
def _load_token(self, token_file, token_key):
if token_file is None:
return None
if os.path.exists(token_file):
with open(token_file, "r") as f:
token = json.load(f).get(token_key, None)
else:
token = None
return token
[docs] def setup_token(self, make_new=True, open=True):
"""Currently, returns instructions for getting your auth token based on the current settings and saving it to the local environment.
New OAuth tokens are currently not able to be retrieved programmatically.
Parameters
----------
make_new : bool, optional
If True, will make a new token, else prompt you to open a page to
retrieve an existing token.
open : bool, optional
If True, opens a web browser to the web page where you can retrieve a token.
"""
if make_new:
return self.get_new_token(open=open)
auth_url = auth_endpoints_v1["get_tokens"].format_map(
self._default_endpoint_mapping
)
txt = f"""Tokens need to be acquired by hand. Please follow the following steps:
1) Go to: {auth_url} to view a list of your existing tokens.
2) Log in with your Google credentials copy one of the tokens from the dictionary (the string under the key 'token').
3a) Save it to your computer with: client.auth.save_token(token="PASTE_YOUR_TOKEN_HERE")
or
3b) Set it for the current session only with client.auth.token = "PASTE_YOUR_TOKEN_HERE"
Note: If you need to save or load multiple tokens, please read the documentation for details.
if you want to create a new token, or have no token use ```self.get_new_token``` instead
or use this function with the keyword argument make_new=True"""
print(txt)
if open:
webbrowser.open(auth_url)
return None
[docs] def get_tokens(self):
"""Get the tokens setup for this users
Returns
-------
list[dict]:
a list of dictionary of tokens, each with the keys
"id": the id of this token
"token": the token (str)
"user_id": the users id (should be your ID)
"""
url = auth_endpoints_v1["get_tokens"].format_map(self._default_endpoint_mapping)
response = requests.Session().get(url, headers=self.request_header)
return handle_response(response)
[docs] def get_new_token(self, open=False):
"""Currently, returns instructions for getting a new token based on the current settings and saving it to the local environment. New OAuth tokens are currently not able to be retrieved programmatically.
Parameters
----------
open : bool, optional
If True, opens a web browser to the web page where you can generate a new token.
"""
auth_url = auth_endpoints_v1["create_token"].format_map(
self._default_endpoint_mapping
)
txt = f"""New Tokens need to be acquired by hand. Please follow the following steps:
1) Go to: {auth_url} to create a new token.
2) Log in with your Google credentials and copy the token shown afterward.
3a) Save it to your computer with: client.auth.save_token(token="PASTE_YOUR_TOKEN_HERE")
or
3b) Set it for the current session only with client.auth.token = "PASTE_YOUR_TOKEN_HERE"
Note: If you need to save or load multiple tokens, please read the documentation for details.
Warning! Creating a new token by finishing step 2 will invalidate the previous token!"""
print(txt)
if open:
webbrowser.open(auth_url)
return None
[docs] def save_token(
self,
token=None,
token_key=default_token_key,
overwrite=False,
token_file=None,
switch_token=True,
write_to_server_file=True,
):
"""Conveniently save a token in the correct format.
After getting a new token by following the instructions in `authclient.get_new_token()`, you can save it with a fully default configuration by running:
token = 'my_shiny_new_token'
authclient.save_token(token=token)
Now on next load, authclient=AuthClient() will make an authclient instance using this token.
If you would like to specify more information about the json file where the token will be stored, see the parameters below.
Parameters
----------
token : str, optional
New token to save, by default None
token_key : str, optional
Key for the token in the token_file json, by default "token"
overwrite : bool, optional
Allow an existing token to be changed, by default False
token_file : str, optional
Path to the token file, by default None. If None, uses the default file location specified above.
switch_token : bool, optional
If True, switch the auth client over into using the new token, by default True
write_to_server_file: bool, optional
If True, will write token to a server specific file to support this machine
interacting with multiple auth servers.
"""
if token is None:
token = self.token
if token_file is not None:
save_token_file = token_file
else:
save_token_file = self._token_file
if save_token_file is None:
raise ValueError("No token file is set")
if write_to_server_file:
write_token(token, self._server_file_path, token_key, overwrite=overwrite)
write_token(token, save_token_file, token_key, overwrite=overwrite)
if switch_token:
self._token = token
self._token_key = token_key
self._token_file = save_token_file
[docs] def get_group_users(self, group_id):
"""Get users in a group
Parameters
----------
group_id : int
ID value for a given group
Returns
-------
list
List of dicts of user ids. Returns empty list if group does not exist.
"""
endpoint_mapping = self._default_endpoint_mapping
endpoint_mapping["group_id"] = group_id
url = auth_endpoints_v1["get_group_users"].format_map(endpoint_mapping)
response = requests.Session().get(url, headers=self.request_header)
return handle_response(response)
@property
def request_header(self):
"""Formatted request header with the specified token"""
if self.token is not None:
auth_header = {"Authorization": f"Bearer {self.token}"}
return auth_header
else:
return {}