Source code for dotide.managers

from datetime import datetime
import json
from dotide.models import AccessToken, Datastream, Datapoint, Dataset


[docs]class Manager(object): """Abstract Manager.""" def _parse_datetime(self, dt): """Parse iso8601 format UTC time to datetime.""" return datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S.%fZ") def _format_params(self, params): """Format params.""" def _coerce(v): if isinstance(v, list): return ','.join(v) elif isinstance(v, datetime): return v.isoformat() else: return v return {k: _coerce(params[k]) for k in params}
[docs]class AccessTokenManager(Manager): """AccessToken Manager.""" def __init__(self, client): self._client = client def _build_access_token(self, json): """Build AccessToken Model.""" return AccessToken(manager=self, access_token=json['access_token'], scopes=json['scopes'], created_at=self._parse_datetime(json['created_at']), updated_at=self._parse_datetime(json['updated_at']))
[docs] def filter(self): """Filter AccessTokens. :returns: List of AccessTokens. Usage:: >>> access_tokens = client.access_tokens.filter() """ res = self._client.list_access_tokens() return [self._build_access_token(token) for token in res]
[docs] def create(self, scopes=None): """Create an AccessToken. :param list scopes: AccessToken's effect scopes. :returns: Created AccessToken instance. Usage:: >>> access_token = client.access_tokens.create(scopes=[{ 'permissions': ['read', 'write', 'delete'], 'global': False, 'ids': ['id0'], 'tags': ['tag0'] }]) """ data = {} if scopes: data['scopes'] = scopes res = self._client.create_access_token(data=json.dumps(data)) return self._build_access_token(res)
[docs] def get(self, access_token): """Get an AccessToken. :param str access_token: AccessToken's access_token. :returns: AccessToken instance. Usage:: >>> access_token = client.access_tokens.get('your_access_token') """ res = self._client.read_access_token(access_token) return self._build_access_token(res)
[docs] def update(self, access_token, scopes=None): """Update an AccessToken.""" data = {} if scopes: data['scopes'] = scopes res = self._client.update_access_token(access_token, data=json.dumps(data)) return self._build_access_token(res)
[docs] def delete(self, access_token): """Delete an AccessToken.""" ret = self._client.delete_access_token(access_token) return ret is None
[docs]class DatastreamManager(Manager): """Datastream Manager.""" def __init__(self, client): self._client = client def _build_datastream(self, json): """Build Datastream Model.""" return Datastream(manager=self, id=json['id'], name=json['name'], type=json['type'], tags=json['tags'], properties=json['properties'], current_t=self._parse_datetime(json['current_t']), current_v=json['current_v'], created_at=self._parse_datetime(json['created_at']), updated_at=self._parse_datetime(json['updated_at']))
[docs] def filter(self, ids=None, tags=None, limit=None, offset=None): """Filter Datastreams. :param list ids: Datastream id list. :param list tags: Datastream tag list. :param int limit: Results amount limit. :param int offset: Results offset amount. :returns: List of Datastreams. Usage:: >>> datastreams = client.datastreams.filter(ids=['id0', 'id1'], tags=['tag0', 'tag1'], limit=10, offset=10 ) """ params = {k: v for k, v in ( ('ids', ids), ('tags', tags), ('limit', limit), ('offset', offset), ) if v is not None} res = self._client.list_datastreams(params=self._format_params(params)) return [self._build_datastream(datastream) for datastream in res]
[docs] def create(self, id=None, name=None, type=None, tags=None, properties=None): """Create Datastream. :param str id: Datastream's id. :param str name: Datastream's name. :param str type: Datastream's type. :param list tags: Datastream's tags. :param dict properties: Datastream's properties. :returns: Created Datastream instance. Usage:: >>> datastream = client.datastreams.create(id='id0', name='name0', type='number', tags=['tag0'], proerties={'prop0': 1} ) """ data = {k: v for k, v in ( ('id', id), ('name', name), ('type', type), ('tags', tags), ('properties', properties), ) if v is not None} res = self._client.create_datastream(data=json.dumps(data)) return self._build_datastream(res)
[docs] def get(self, id): """Get a Datastream. :param str id: Datastream's id. :returns: Datastream instance. Usage:: >>> datastream = client.datastreams.get('id0') """ res = self._client.read_datastream(id) return self._build_datastream(res)
[docs] def update(self, id, name=None, tags=None, properties=None): """Update a datastream.""" data = {k: v for k, v in ( ('name', name), ('tags', tags), ('properties', properties), ) if v is not None} res = self._client.update_datastream(id, data=json.dumps(data)) return self._build_datastream(res)
[docs] def delete(self, id): """Delete a datastream.""" ret = self._client.delete_datastream(id) return ret is None
[docs]class DatapointManager(Manager): """Datapoint Manager.""" def __init__(self, client, id): self._client = client self._id = id def _build_datapoint(self, json): """Build Datapoint Model.""" return Datapoint(t=self._parse_datetime(json['t']), v=json['v']) def _build_dataset(self, json): """Build Dataset Model.""" return Dataset(id=json['id'], datapoints=json['datapoints'], options=json.get('options', {}), summary=json.get('summary', {}))
[docs] def filter(self, start=None, end=None, order=None, t=None, limit=None, offset=None, summary=None, interval=None, function=None): """Filter Datapoints. :param datetime start: Start time. :param datetime end: End time. :param str order: Order asc or desc. :param datetime t: Exactly time. :param int limit: Results amount limit. :param int offset: Results offset amount. :param int summary: Whether contain summary in output, 0 or 1. :param int interval: Sampling interval in ms. :param str function: Sampling function. :returns: Dataset instance. Usage:: >>> dataset = datastream.datapoints.filter(start=datetime(2014, 1, 1), end=datetime.utcnow(), order='asc', limit=1000) """ params = {k: v for k, v in ( ('start', start), ('end', end), ('order', order), ('t', t), ('limit', limit), ('offset', offset), ('summary', summary), ('interval', interval), ('function', function), ) if v is not None} res = self._client.list_datapoints(self._id, params=self._format_params(params)) res['datapoints'] = [self._build_datapoint(datapoint) for datapoint in res['datapoints']] return self._build_dataset(res)
[docs] def create(self, datapoints=None, t=None, v=None): """Create datapoint(s). :param list datapoints: List of datapoints. :param datetime t: Time. :param v: Value. :returns: Created datapoint(s). Usage:: >>> datapoint = datastream.datapoints.create(t=datetime.utcnow(), v=1) >>> datapoints = datastream.datapoints.create([{'t': datetime.utcnow(), 'v': 1}]) """ if datapoints: data = [] for datapoint in datapoints: p = {} if 't' in datapoint: p['t'] = datapoint['t'].isoformat() p['v'] = datapoint.get('v', None) data.append(p) res = self._client.create_datapoint(self._id, data=data) return [self._build_datapoint(datapoint) for datapoint in res] else: data = {'v': v} if t: data['t'] = t.isoformat() res = self._client.create_datapoint(self._id, data=data) return self._build_datapoint(res)
[docs] def get(self, t): """Get datapoint by timestamp. :param datetime t: Time. :returns: Datapoint instance. Usage:: >>> datapoint = datastream.datapoints.get(datetime(2014, 1, 2, 3, 4, 5, 6000)) """ res = self._client.read_datapoint(self._id, t.isoformat()) return self._build_datapoint(res)
[docs] def delete(self, t=None, start=None, end=None): """Delete datapoints. :param datetime t: Exactly time. :param datetime start: Start time. :param datetime end: End time. :returns: True if success. Else False. Usage:: >>> datastream.datapoints.delete(datetime(2014, 1, 2, 3, 4, 5, 6000)) >>> datastream.datapoints.delete(start=datetime(2014, 1, 1), end=datetime.utcnow()) """ ret = False if t: ret = self._client.delete_datapoint(self._id, t.isoformat()) elif start and end: ret = self._client.delete_datapoints(self._id, start.isoformat(), end.isoformat()) return ret is None