X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=flakes%2Fprivate%2Fbuildbot%2Fcommon%2Flibvirt.py;fp=flakes%2Fprivate%2Fbuildbot%2Fcommon%2Flibvirt.py;h=e25062744ca2ec5e332f711770d35431ea80b6a6;hb=1a64deeb894dc95e2645a75771732c6cc53a79ad;hp=0000000000000000000000000000000000000000;hpb=fa25ffd4583cc362075cd5e1b4130f33306103f0;p=perso%2FImmae%2FConfig%2FNix.git diff --git a/flakes/private/buildbot/common/libvirt.py b/flakes/private/buildbot/common/libvirt.py new file mode 100644 index 0000000..e250627 --- /dev/null +++ b/flakes/private/buildbot/common/libvirt.py @@ -0,0 +1,318 @@ +# This file was part of Buildbot. Buildbot is free software: you can +# redistribute it and/or modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation, version 2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Portions Copyright Buildbot Team Members +# Portions Copyright 2010 Isotoma Limited + + +import os + +from twisted.internet import defer +from twisted.internet import threads +from twisted.internet import utils +from twisted.python import failure +from twisted.python import log + +from buildbot import config +from buildbot.util.eventual import eventually +from buildbot.worker import AbstractLatentWorker + +try: + import libvirt +except ImportError: + libvirt = None + +import random +import string + +def random_string_generator(): + chars = string.ascii_letters + return ''.join(random.choice(chars) for x in range(6)) + +class WorkQueue: + + """ + I am a class that turns parallel access into serial access. + + I exist because we want to run libvirt access in threads as we don't + trust calls not to block, but under load libvirt doesn't seem to like + this kind of threaded use. + """ + + def __init__(self): + self.queue = [] + + def _process(self): + log.msg("Looking to start a piece of work now...") + + # Is there anything to do? + if not self.queue: + log.msg("_process called when there is no work") + return + + # Peek at the top of the stack - get a function to call and + # a deferred to fire when its all over + d, next_operation, args, kwargs = self.queue[0] + + # Start doing some work - expects a deferred + try: + d2 = next_operation(*args, **kwargs) + except Exception: + d2 = defer.fail() + + # Whenever a piece of work is done, whether it worked or not + # call this to schedule the next piece of work + @d2.addBoth + def _work_done(res): + log.msg("Completed a piece of work") + self.queue.pop(0) + if self.queue: + log.msg("Preparing next piece of work") + eventually(self._process) + return res + + # When the work is done, trigger d + d2.chainDeferred(d) + + def execute(self, cb, *args, **kwargs): + kickstart_processing = not self.queue + d = defer.Deferred() + self.queue.append((d, cb, args, kwargs)) + if kickstart_processing: + self._process() + return d + + def executeInThread(self, cb, *args, **kwargs): + return self.execute(threads.deferToThread, cb, *args, **kwargs) + + +# A module is effectively a singleton class, so this is OK +queue = WorkQueue() + + +class Domain: + + """ + I am a wrapper around a libvirt Domain object + """ + + def __init__(self, connection, domain): + self.connection = connection + self.domain = domain + + def name(self): + return queue.executeInThread(self.domain.name) + + def create(self): + return queue.executeInThread(self.domain.create) + + def shutdown(self): + return queue.executeInThread(self.domain.shutdown) + + def destroy(self): + return queue.executeInThread(self.domain.destroy) + +class Volume: + def __init__(self, connection, volume): + self.connection = connection + self.volume = volume + + @defer.inlineCallbacks + def destroy(self): + yield queue.executeInThread(self.volume.wipe) + yield queue.executeInThread(self.volume.delete) + +class Pool: + VolumeClass = Volume + def __init__(self, connection, pool): + self.connection = connection + self.pool = pool + + @defer.inlineCallbacks + def create_volume(self, xml): + res = yield queue.executeInThread(self.pool.createXML, xml) + return self.VolumeClass(self.connection, res) + +class Connection: + + """ + I am a wrapper around a libvirt Connection object. + """ + + DomainClass = Domain + PoolClass = Pool + + def __init__(self, uri): + self.uri = uri + self._connection = None + + @property + def connection(self): + if self._connection is not None: + try: + if not self._connection.isAlive(): + self._connection = None + except: + self._connection = None + if self._connection is None: + self._connection = libvirt.open(self.uri) + return self._connection + + @defer.inlineCallbacks + def create(self, xml): + """ I take libvirt XML and start a new VM """ + res = yield queue.executeInThread(self.connection.createXML, xml, 0) + return self.DomainClass(self, res) + + @defer.inlineCallbacks + def lookup_pool(self, name): + res = yield queue.executeInThread(self.connection.storagePoolLookupByName, name) + return self.PoolClass(self, res) + +class LibVirtWorker(AbstractLatentWorker): + + def __init__(self, name, password, connection, master_url, base_image=None, **kwargs): + super().__init__(name, password, **kwargs) + if not libvirt: + config.error( + "The python module 'libvirt' is needed to use a LibVirtWorker") + + self.master_url = master_url + self.random_name = random_string_generator() + self.connection = connection + self.base_image = base_image + + self.domain = None + self.domain_name = "buildbot-" + self.workername + "-" + self.random_name + self.volume = None + self.volume_name = "buildbot-" + self.workername + "-" + self.random_name + self.pool_name = "buildbot-disks" + + def reconfigService(self, *args, **kwargs): + if 'build_wait_timeout' not in kwargs: + kwargs['build_wait_timeout'] = 0 + return super().reconfigService(*args, **kwargs) + + def canStartBuild(self): + if self.domain and not self.isConnected(): + log.msg( + "Not accepting builds as existing domain but worker not connected") + return False + + return super().canStartBuild() + + @defer.inlineCallbacks + def _prepare_image(self): + log.msg("Creating temporary image {}".format(self.volume_name)) + pool = yield self.connection.lookup_pool(self.pool_name) + vol_xml = """ + + {vol_name} + 10 + + + + 0600 + 0 + 0 + + + + /etc/libvirtd/base-images/buildbot.qcow2 + + + + """.format(vol_name = self.volume_name) + self.volume = yield pool.create_volume(vol_xml) + + @defer.inlineCallbacks + def start_instance(self, build): + """ + I start a new instance of a VM. + + If a base_image is specified, I will make a clone of that otherwise i will + use image directly. + + If i'm not given libvirt domain definition XML, I will look for my name + in the list of defined virtual machines and start that. + """ + domain_xml = """ + + {domain_name} + 2 + 1 + + + buildbot_master_url={master_url} + buildbot_worker_name={worker_name} + + + + hvm + + + + /run/current-system/sw/bin/qemu-system-x86_64 + + + + + + + + + + + + + + + + + """.format(volume_name = self.volume_name, master_url = self.master_url, pool_name = + self.pool_name, domain_name = self.domain_name, worker_name = self.workername) + + yield self._prepare_image() + + try: + self.domain = yield self.connection.create(domain_xml) + except Exception: + log.err(failure.Failure(), + ("Cannot start a VM ({}), failing gracefully and triggering" + "a new build check").format(self.workername)) + self.domain = None + return False + + return [self.domain_name] + + def stop_instance(self, fast=False): + """ + I attempt to stop a running VM. + I make sure any connection to the worker is removed. + If the VM was using a cloned image, I remove the clone + When everything is tidied up, I ask that bbot looks for work to do + """ + + log.msg("Attempting to stop '{}'".format(self.workername)) + if self.domain is None: + log.msg("I don't think that domain is even running, aborting") + return defer.succeed(None) + + domain = self.domain + self.domain = None + + d = domain.destroy() + if self.volume is not None: + self.volume.destroy() + + return d