Source code for toxicbuild.common.interfaces

# -*- coding: utf-8 -*-
# Copyright 2019 Juca Crispim <juca@poraodojuca.net>

# This file is part of toxicbuild.

# toxicbuild is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# toxicbuild is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with toxicbuild. If not, see <http://www.gnu.org/licenses/>.


from collections import OrderedDict
import datetime
import importlib
import json

from toxicbuild.core import requests
from toxicbuild.core.utils import string2datetime
from .client import get_hole_client
from .utils import is_datetime, format_datetime, get_hole_client_settings


__doc__ = """Module with base models that are populated using a remote api.
"""


[docs]class BaseInterface: # These references are fields that refer to other objects. # Note that this references are not always references on # database, they may be (and most are) embedded documents # that are simply treated as other objects. references = {} settings = None def __init__(self, requester, ordered_kwargs): # here is where we transform the dictonaries from the # master's response into objects that are references. # Note that we can't use **kwargs here because we want to # keep the order of the attrs. self.__ordered__ = [k for k in ordered_kwargs.keys()] for name, cls in self.references.items(): cls = self._get_ref_cls(cls) if not isinstance(ordered_kwargs.get(name), (dict, cls)): ordered_kwargs[name] = [cls(requester, kw) if not isinstance(kw, cls) else kw for kw in ordered_kwargs.get(name, [])] else: obj = ordered_kwargs[name] ordered_kwargs[name] = cls(requester, obj) if not isinstance( obj, cls) else obj for key, value in ordered_kwargs.items(): if is_datetime(value): value = string2datetime(value) setattr(self, key, value) self.__ordered__.append(key) self.requester = requester def __eq__(self, other): return isinstance(self, type(other)) and self.id == other.id def __hash__(self): return hash(self.id) def _get_ref_cls(self, cls): if isinstance(cls, str): module, cls_name = cls.rsplit('.', 1) module = importlib.import_module(module) cls = getattr(module, cls_name) return cls
[docs] def to_dict(self, dtformat=None, tzname=None): """Transforms a model into a dict. :param dtformat: Format for datetimes. :param tzname: A timezone name. """ attrs = [a for a in self.__ordered__ if not a.startswith('_')] d = OrderedDict() for attr in attrs: objattr = getattr(self, attr) is_ref = attr == 'references' if not (callable(objattr) and not is_ref): # pragma no branch if isinstance(objattr, datetime.datetime): objattr = format_datetime(objattr, dtformat, tzname) d[attr] = objattr return d
[docs] def to_json(self, *args, **kwargs): """Transforms a model into a json. :param args: Positional arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. :param kwargs: Named arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. """ d = self.to_dict(*args, **kwargs) return json.dumps(d)
@classmethod def _handle_name_or_id(cls, prefix, kw): name = kw.pop('name', None) key = '{}_name_or_id'.format(prefix) if name: kw[key] = name obj_id = kw.pop('id', None) if obj_id: kw[key] = obj_id
[docs]class NotificationInterface(BaseInterface): """Integration with the notifications api.""" def __init__(self, ordered_kwargs): super().__init__(None, ordered_kwargs)
[docs] @classmethod def api_url(cls): return getattr(cls.settings, 'NOTIFICATIONS_API_URL', None)
[docs] @classmethod def api_token(cls): return getattr(cls.settings, 'NOTIFICATIONS_API_TOKEN', None)
@classmethod def _get_headers(cls): return {'Authorization': 'token: {}'.format(cls.api_token())} @classmethod def _get_notif_url(cls, notif_name): url = '{}/{}'.format(cls.api_url(), notif_name) return url
[docs] @classmethod async def list(cls, obj_id=None): """Lists all the notifications available. :param obj_id: The of of an repository. If not None, the notifications will return the values of the configuration for that repository.""" url = '{}/list/'.format(cls.api_url()) if obj_id: url += obj_id headers = cls._get_headers() r = await requests.get(url, headers=headers) notifications = r.json()['notifications'] return [cls(n) for n in notifications]
[docs] @classmethod async def enable(cls, repo_id, notif_name, **config): """Enables a notification for a given repository. :param repo_id: The id of the repository to enable the notification. :param notif_name: The name of the notification. :param config: A dictionary with the config values for the notification. """ url = cls._get_notif_url(notif_name) config['repository_id'] = repo_id headers = cls._get_headers() r = await requests.post(url, headers=headers, data=json.dumps(config)) return r
[docs] @classmethod async def disable(cls, repo_id, notif_name): """Disables a notification for a given repository. :param repo_id: The id of the repository to enable the notification. :param notif_name: The name of the notification. """ url = cls._get_notif_url(notif_name) config = {'repository_id': repo_id} headers = cls._get_headers() r = await requests.delete(url, headers=headers, data=json.dumps(config)) return r
[docs] @classmethod async def update(cls, repo_id, notif_name, **config): """Updates a notification for a given repository. :param repo_id: The id of the repository to enable the notification. :param notif_name: The name of the notification. :param config: A dictionary with the new config values for the notification. """ url = cls._get_notif_url(notif_name) config['repository_id'] = repo_id headers = cls._get_headers() r = await requests.put(url, headers=headers, data=json.dumps(config)) return r
[docs]class BaseHoleInterface(BaseInterface): # This is for the cli only. Do not use. _client = None
[docs] @classmethod async def get_client(cls, requester): """Returns a client connected to master.""" if cls._client: if not cls._client._connected: await cls._client.connect() return cls._client client_settings = get_hole_client_settings(cls.settings) client = await get_hole_client(requester, **client_settings) return client
[docs]class UserInterface(BaseHoleInterface): """A user created in the master""" def __init__(self, requester, ordered_kwargs): if requester is None: requester = self super().__init__(requester, ordered_kwargs)
[docs] @classmethod async def get(cls, **kwargs): """Returns a user. :param requester: The user who is requesting the operation. :param kwargs: kwargs to get the user.""" requester = cls._get_root_user() with await cls.get_client(requester) as client: user_dict = await client.user_get(**kwargs) user = cls(requester, user_dict) return user
@classmethod def _get_root_user(cls): root_id = cls.settings.ROOT_USER_ID return cls(None, {'id': root_id})
[docs] @classmethod async def authenticate(cls, username_or_email, password): kw = {'username_or_email': username_or_email, 'password': password} with await cls.get_client(None) as client: user_dict = await client.user_authenticate(**kw) user = cls(None, user_dict) return user
[docs] @classmethod async def change_password(cls, requester, old_password, new_password): kw = {'username_or_email': requester.email, 'old_password': old_password, 'new_password': new_password} with await cls.get_client(requester) as client: await client.user_change_password(**kw) return True
[docs] @classmethod async def request_password_reset(cls, email, reset_link): """Request the reset of the user's password. Sends an email with a link to reset the password. """ subject = 'Reset password requested' message = "Follow the link {} to reset your password.".format( reset_link) requester = cls._get_root_user() kw = {'email': email, 'subject': subject, 'message': message} with await cls.get_client(requester) as client: await client.user_send_reset_password_email(**kw) return True
[docs] @classmethod async def change_password_with_token(cls, token, password): """Changes the user password using a token. The token was generated when ``request_password_reset`` was called and a link with the token was sent to the user email. """ kw = {'token': token, 'password': password} requester = cls._get_root_user() with await cls.get_client(requester) as client: await client.user_change_password_with_token(**kw) return True
[docs] @classmethod async def add(cls, email, username, password, allowed_actions): requester = cls._get_root_user() kw = {'username': username, 'email': email, 'password': password, 'allowed_actions': allowed_actions} with await cls.get_client(requester) as client: user_dict = await client.user_add(**kw) user = cls(None, user_dict) return user
[docs] async def delete(self): kw = {'id': str(self.id)} with await type(self).get_client(self.requester) as client: resp = await client.user_remove(**kw) return resp
[docs] @classmethod async def exists(cls, **kwargs): """Checks if a user with some given information exists. :param kwargs: Named arguments to match the user""" requester = cls._get_root_user() with await cls.get_client(requester) as client: exists = await client.user_exists(**kwargs) return exists
[docs]class SlaveInterface(BaseHoleInterface):
[docs] @classmethod async def add(cls, requester, name, port, token, owner, use_ssl=True, validate_cert=True, on_demand=False, host=None, instance_type=None, instance_confs=None): """Adds a new slave. :param name: Slave name. :param host: Slave host. :param port: Slave port. :param token: Authentication token. :param owner: The slave owner :param use_ssl: Indicates if the slave uses a ssl connection. :pram validate_cert: Should the slave certificate be validated? :param on_demand: Does this slave have an on-demand instance? :param instance_type: Type of the on-demand instance. :param instance_confs: Configuration parameters for the on-demand instance. """ kw = {'slave_name': name, 'slave_host': host, 'slave_port': port, 'slave_token': token, 'owner_id': str(owner.id), 'use_ssl': use_ssl, 'validate_cert': validate_cert, 'on_demand': on_demand, 'instance_type': instance_type, 'instance_confs': instance_confs} with await cls.get_client(requester) as client: slave_dict = await client.slave_add(**kw) slave = cls(requester, slave_dict) return slave
[docs] @classmethod async def get(cls, requester, **kwargs): """Returns a slave. :param requester: The user who is requesting the operation. :param kwargs: kwargs to get the slave.""" cls._handle_name_or_id('slave', kwargs) with await cls.get_client(requester) as client: slave_dict = await client.slave_get(**kwargs) slave = cls(requester, slave_dict) return slave
[docs] @classmethod async def list(cls, requester): """Lists all slaves. :param requester: The user who is requesting the operation.""" with await cls.get_client(requester) as client: slaves = await client.slave_list() slave_list = [cls(requester, slave) for slave in slaves] return slave_list
[docs] async def delete(self): """Delete a slave.""" with await self.get_client(self.requester) as client: resp = await client.slave_remove(slave_name_or_id=self.id) return resp
[docs] async def update(self, **kwargs): """Updates a slave""" with await self.get_client(self.requester) as client: resp = await client.slave_update(slave_name_or_id=self.id, **kwargs) return resp
[docs]class BuilderInterface(BaseHoleInterface):
[docs] @classmethod async def list(cls, requester, **kwargs): """Lists builders already used.""" with await cls.get_client(requester) as client: builders = await client.builder_list(**kwargs) builders_list = [cls(requester, builder) for builder in builders] return builders_list
[docs]class StepInterface(BaseHoleInterface):
[docs] @classmethod async def get(cls, requester, uuid): """Returns information about a step. :param uuid: The uuid of the step.""" with await cls.get_client(requester) as client: sdict = await client.buildstep_get(step_uuid=uuid) step = cls(requester, sdict) return step
[docs]class BuildInterface(BaseHoleInterface): references = {'steps': StepInterface, 'builder': BuilderInterface}
[docs] def to_dict(self, *args, **kwargs): """Converts a build object in to a dictionary. :param args: Positional arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. :param kwargs: Named arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. """ d = super().to_dict(*args, **kwargs) d['builder'] = d['builder'].to_dict(*args, **kwargs) d['steps'] = [s.to_dict(*args, **kwargs) for s in d.get('steps', [])] return d
[docs] @classmethod async def get(cls, requester, uuid): """Returns information about abuild. :param uuid: The uuid of the build.""" with await cls.get_client(requester) as client: build_dict = await client.build_get(build_uuid=uuid) build = cls(requester, build_dict) return build
[docs]class RepositoryInterface(BaseHoleInterface): """Interface for a repository.""" references = { 'slaves': SlaveInterface, 'last_buildset': 'toxicbuild.common.interfaces.BuildSetInterface' }
[docs] @classmethod async def add(cls, requester, name, url, owner, vcs_type, update_seconds=300, slaves=None, parallel_builds=None, schedule_poller=True, branches=None, external_id=None, external_full_name=None, fetch_url=None, envvars=None): """Adds a new repository. :param requester: The user who is requesting the operation. :param name: Repository's name. :param url: Repository's url. :param owner: The repository owner :param vcs_type: VCS type used on the repository. :param update_seconds: Interval to update the repository code. :param slaves: List with slave names for this reporitory. :params parallel_builds: How many paralles builds this repository executes. If None, there is no limit. :param schedule_poller: Should this repository be scheduled for polling? If this repository comes from an integration (with github, gitlab, etc...) this should be False. :param branches: A list of branches configuration that trigger builds. :param external_id: The id of the repository in an external service. :param external_full_name: The full name in an external service. :param fetch_url: If the repository uses a differente url to fetch code (ie: it has an auth token url) this is the fetch_url. :param envvars: Environment variables that will be used in every build in this repository. """ kw = {'repo_name': name, 'repo_url': url, 'vcs_type': vcs_type, 'update_seconds': update_seconds, 'parallel_builds': parallel_builds, 'owner_id': str(owner.id), 'schedule_poller': schedule_poller, 'branches': branches, 'envvars': envvars or {}, 'external_id': external_id, 'external_full_name': external_full_name, 'fetch_url': fetch_url} kw.update({'slaves': slaves or []}) with await cls.get_client(requester) as client: repo_dict = await client.repo_add(**kw) repo = cls(requester, repo_dict) return repo
[docs] @classmethod async def get(cls, requester, **kwargs): """Returns a repository. :param requester: The user who is requesting the operation. :param kwargs: kwargs to get the repository.""" cls._handle_name_or_id('repo', kwargs) with await cls.get_client(requester) as client: repo_dict = await client.repo_get(**kwargs) repo = cls(requester, repo_dict) return repo
[docs] @classmethod async def list(cls, requester, **kwargs): """Lists all repositories. :param requester: The user who is requesting the operation.""" with await cls.get_client(requester) as client: repos = await client.repo_list(**kwargs) repo_list = [cls(requester, repo) for repo in repos] return repo_list
[docs] async def delete(self): """Delete a repository.""" with await self.get_client(self.requester) as client: resp = await client.repo_remove(repo_name_or_id=self.id) return resp
[docs] async def add_slave(self, slave): """Adds a slave to the repository. :param slave: A Slave instance.""" with await self.get_client(self.requester) as client: resp = await client.repo_add_slave(repo_name_or_id=self.id, slave_name_or_id=slave.id) return resp
[docs] async def remove_slave(self, slave): """Removes a slave from the repository. :param slave: A Slave instance. """ with await self.get_client(self.requester) as client: resp = await client.repo_remove_slave( repo_name_or_id=self.id, slave_name_or_id=slave.id) return resp
[docs] async def add_branch(self, branch_name, notify_only_latest): """Adds a branch config to a repositoiry. :param branch_name: The name of the branch. :param notify_only_latest: If we should create builds for all revisions or only for the lastest one.""" with await self.get_client(self.requester) as client: resp = await client.repo_add_branch( repo_name_or_id=self.id, branch_name=branch_name, notify_only_latest=notify_only_latest) return resp
[docs] async def remove_branch(self, branch_name): """Removes a branch config from a repository. :param branch_name: The name of the branch.""" with await self.get_client(self.requester) as client: resp = await client.repo_remove_branch( repo_name_or_id=self.id, branch_name=branch_name) return resp
[docs] async def update(self, **kwargs): """Updates a slave""" with await self.get_client(self.requester) as client: resp = await client.repo_update(repo_name_or_id=self.id, **kwargs) return resp
[docs] async def start_build(self, branch, builder_name_or_id=None, named_tree=None): """Starts a (some) build(s) for a repository. :param branch: The name of the branch. :param builder_name_or_id: The name of the builder that will execute the build :param named_tree: The named_tree that will be builded. If no named_tree the last one will be used. """ with await self.get_client(self.requester) as client: resp = await client.repo_start_build( repo_name_or_id=self.id, branch=branch, builder_name_or_id=builder_name_or_id, named_tree=named_tree) return resp
[docs] def to_dict(self, *args, **kwargs): """Transforms a repository into a dictionary. :param args: Positional arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. :param kwargs: Named arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. """ d = super().to_dict(*args, **kwargs) d['slaves'] = [s.to_dict(*args, **kwargs) for s in d['slaves']] if self.last_buildset: d['last_buildset'] = self.last_buildset.to_dict(*args, **kwargs) return d
[docs] async def cancel_build(self, build_uuid): """Cancels a build from the repository. :param build_uuid: The uuid of the build.""" with await self.get_client(self.requester) as client: resp = await client.repo_cancel_build(repo_name_or_id=self.id, build_uuid=build_uuid) return resp
[docs] async def enable(self): """Enables this repository.""" with await self.get_client(self.requester) as client: resp = await client.repo_enable(repo_name_or_id=self.id) return resp
[docs] async def disable(self): """Disables this repository.""" with await self.get_client(self.requester) as client: resp = await client.repo_disable(repo_name_or_id=self.id) return resp
[docs] async def request_code_update(self, repo_branches=None, external=None, wait_for_lock=False): """Request the code update of the repository. :param repo_branches: A dictionary with information about the branches to be updated. If no ``repo_branches`` all branches in the repo config will be updated. The dictionary has the following format. .. code-block:: python {'branch-name': {'notify_only_latest': True}} :param external: If we should update code from an external (not the origin) repository, `external` is the information about this remote repo. :param wait_for_lock: Indicates if we should wait for the release of the lock or simply return if we cannot get a lock. """ with await self.get_client(self.requester) as client: resp = await client.repo_request_code_update( repo_name_or_id=self.id, repo_branches=repo_branches, external=external, wait_for_lock=wait_for_lock) return resp
[docs] async def add_envvars(self, **envvars): """Adds environment variables to use in the builds of this repository. :param envvars: Environment variables in the format {var: val, ...} """ with await self.get_client(self.requester) as client: resp = await client.repo_add_envvars( repo_name_or_id=self.id, **envvars) return resp
[docs] async def rm_envvars(self, **envvars): """Removes environment variables from this repository. :param envvars: Environment variables in the format {var: val, ...} """ with await self.get_client(self.requester) as client: resp = await client.repo_rm_envvars( repo_name_or_id=self.id, **envvars) return resp
[docs] async def replace_envvars(self, **envvars): """Replaces environment variables of this repository. :param envvars: Environment variables in the format {var: val, ...} """ with await self.get_client(self.requester) as client: resp = await client.repo_replace_envvars( repo_name_or_id=self.id, **envvars) return resp
[docs] async def list_branches(self): """Lists the branches known by this repositor. """ repo_name_or_id = self.id or self.full_name with await self.get_client(self.requester) as client: resp = await client.repo_list_branches( repo_name_or_id=repo_name_or_id) return resp
[docs]class BuildSetInterface(BaseHoleInterface): references = {'builds': BuildInterface, 'repository': RepositoryInterface}
[docs] @classmethod async def list(cls, requester, repo_name_or_id=None, summary=True, branch=None): """Lists buildsets. If ``repo_name_or_id`` only builds of this repsitory will be listed. :param repo_name: Name of a repository. :param summary: If True, no builds information will be returned. :param branch: List buildsets for this branch. If None list buildsets from all branches. """ with await cls.get_client(requester) as client: buildsets = await client.buildset_list( repo_name_or_id=repo_name_or_id, offset=10, summary=summary, branch=branch) buildset_list = [cls(requester, buildset) for buildset in buildsets] return buildset_list
[docs] def to_dict(self, *args, **kwargs): """Returns a dictionary based in a BuildSet object. :param args: Positional arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. :param kwargs: Named arguments passed to :meth:`~toxicbuild.common.interfaces.BaseInterface.to_dict`. """ d = super().to_dict(*args, **kwargs) d['builds'] = [b.to_dict(*args, **kwargs) for b in d.get('builds', [])] d['repository'] = self.repository.to_dict(*args, **kwargs) \ if self.repository else None return d
[docs] @classmethod async def get(cls, requester, buildset_id): """Returns an instance of BuildSet. :param buildset_id: The id of the buildset to get. """ with await cls.get_client(requester) as client: buildset = await client.buildset_get(buildset_id=buildset_id) return cls(requester, buildset)