-# This file was part of Buildbot. Buildbot is free software: you can
-# redistribute it and/or modify it under the terms of the GNU General Public
-# License as published by the Free Software Foundation, version 2.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
-# details.
-#
-# You should have received a copy of the GNU General Public License along with
-# this program; if not, write to the Free Software Foundation, Inc., 51
-# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-#
-# Portions Copyright Buildbot Team Members
-# Portions Copyright 2010 Isotoma Limited
-
-
-import os
-
-from twisted.internet import defer
-from twisted.internet import threads
-from twisted.internet import utils
-from twisted.python import failure
-from twisted.python import log
-
-from buildbot import config
-from buildbot.util.eventual import eventually
-from buildbot.worker import AbstractLatentWorker
-
-try:
- import libvirt
-except ImportError:
- libvirt = None
-
-import random
-import string
-
-def random_string_generator():
- chars = string.ascii_letters
- return ''.join(random.choice(chars) for x in range(6))
-
-class WorkQueue:
-
- """
- I am a class that turns parallel access into serial access.
-
- I exist because we want to run libvirt access in threads as we don't
- trust calls not to block, but under load libvirt doesn't seem to like
- this kind of threaded use.
- """
-
- def __init__(self):
- self.queue = []
-
- def _process(self):
- log.msg("Looking to start a piece of work now...")
-
- # Is there anything to do?
- if not self.queue:
- log.msg("_process called when there is no work")
- return
-
- # Peek at the top of the stack - get a function to call and
- # a deferred to fire when its all over
- d, next_operation, args, kwargs = self.queue[0]
-
- # Start doing some work - expects a deferred
- try:
- d2 = next_operation(*args, **kwargs)
- except Exception:
- d2 = defer.fail()
-
- # Whenever a piece of work is done, whether it worked or not
- # call this to schedule the next piece of work
- @d2.addBoth
- def _work_done(res):
- log.msg("Completed a piece of work")
- self.queue.pop(0)
- if self.queue:
- log.msg("Preparing next piece of work")
- eventually(self._process)
- return res
-
- # When the work is done, trigger d
- d2.chainDeferred(d)
-
- def execute(self, cb, *args, **kwargs):
- kickstart_processing = not self.queue
- d = defer.Deferred()
- self.queue.append((d, cb, args, kwargs))
- if kickstart_processing:
- self._process()
- return d
-
- def executeInThread(self, cb, *args, **kwargs):
- return self.execute(threads.deferToThread, cb, *args, **kwargs)
-
-
-# A module is effectively a singleton class, so this is OK
-queue = WorkQueue()
-
-
-class Domain:
-
- """
- I am a wrapper around a libvirt Domain object
- """
-
- def __init__(self, connection, domain):
- self.connection = connection
- self.domain = domain
-
- def name(self):
- return queue.executeInThread(self.domain.name)
-
- def create(self):
- return queue.executeInThread(self.domain.create)
-
- def shutdown(self):
- return queue.executeInThread(self.domain.shutdown)
-
- def destroy(self):
- return queue.executeInThread(self.domain.destroy)
-
-class Volume:
- def __init__(self, connection, volume):
- self.connection = connection
- self.volume = volume
-
- @defer.inlineCallbacks
- def destroy(self):
- yield queue.executeInThread(self.volume.wipe)
- yield queue.executeInThread(self.volume.delete)
-
-class Pool:
- VolumeClass = Volume
- def __init__(self, connection, pool):
- self.connection = connection
- self.pool = pool
-
- @defer.inlineCallbacks
- def create_volume(self, xml):
- res = yield queue.executeInThread(self.pool.createXML, xml)
- return self.VolumeClass(self.connection, res)
-
-class Connection:
-
- """
- I am a wrapper around a libvirt Connection object.
- """
-
- DomainClass = Domain
- PoolClass = Pool
-
- def __init__(self, uri):
- self.uri = uri
- self._connection = None
-
- @property
- def connection(self):
- if self._connection is not None:
- try:
- if not self._connection.isAlive():
- self._connection = None
- except:
- self._connection = None
- if self._connection is None:
- self._connection = libvirt.open(self.uri)
- return self._connection
-
- @defer.inlineCallbacks
- def create(self, xml):
- """ I take libvirt XML and start a new VM """
- res = yield queue.executeInThread(self.connection.createXML, xml, 0)
- return self.DomainClass(self, res)
-
- @defer.inlineCallbacks
- def lookup_pool(self, name):
- res = yield queue.executeInThread(self.connection.storagePoolLookupByName, name)
- return self.PoolClass(self, res)
-
-class LibVirtWorker(AbstractLatentWorker):
-
- def __init__(self, name, password, connection, master_url, base_image=None, **kwargs):
- super().__init__(name, password, **kwargs)
- if not libvirt:
- config.error(
- "The python module 'libvirt' is needed to use a LibVirtWorker")
-
- self.master_url = master_url
- self.random_name = random_string_generator()
- self.connection = connection
- self.base_image = base_image
-
- self.domain = None
- self.domain_name = "buildbot-" + self.workername + "-" + self.random_name
- self.volume = None
- self.volume_name = "buildbot-" + self.workername + "-" + self.random_name
- self.pool_name = "buildbot-disks"
-
- def reconfigService(self, *args, **kwargs):
- if 'build_wait_timeout' not in kwargs:
- kwargs['build_wait_timeout'] = 0
- return super().reconfigService(*args, **kwargs)
-
- def canStartBuild(self):
- if self.domain and not self.isConnected():
- log.msg(
- "Not accepting builds as existing domain but worker not connected")
- return False
-
- return super().canStartBuild()
-
- @defer.inlineCallbacks
- def _prepare_image(self):
- log.msg("Creating temporary image {}".format(self.volume_name))
- pool = yield self.connection.lookup_pool(self.pool_name)
- vol_xml = """
- <volume type='file'>
- <name>{vol_name}</name>
- <capacity unit='G'>10</capacity>
- <target>
- <format type='qcow2'/>
- <permissions>
- <mode>0600</mode>
- <owner>0</owner>
- <group>0</group>
- </permissions>
- </target>
- <backingStore>
- <path>/etc/libvirtd/base-images/buildbot.qcow2</path>
- <format type='qcow2'/>
- </backingStore>
- </volume>
- """.format(vol_name = self.volume_name)
- self.volume = yield pool.create_volume(vol_xml)
-
- @defer.inlineCallbacks
- def start_instance(self, build):
- """
- I start a new instance of a VM.
-
- If a base_image is specified, I will make a clone of that otherwise i will
- use image directly.
-
- If i'm not given libvirt domain definition XML, I will look for my name
- in the list of defined virtual machines and start that.
- """
- domain_xml = """
- <domain type="kvm">
- <name>{domain_name}</name>
- <memory unit="GiB">2</memory>
- <vcpu>1</vcpu>
- <sysinfo type='smbios'>
- <oemStrings>
- <entry>buildbot_master_url={master_url}</entry>
- <entry>buildbot_worker_name={worker_name}</entry>
- </oemStrings>
- </sysinfo>
- <os>
- <type arch="x86_64">hvm</type>
- <smbios mode='sysinfo'/>
- </os>
- <devices>
- <emulator>/run/current-system/sw/bin/qemu-system-x86_64</emulator>
- <disk type="volume" device="disk">
- <driver name='qemu' type='qcow2' />
- <source type="volume" pool="{pool_name}" volume="{volume_name}" />
- <backingStore type='volume'>
- <format type='qcow2'/>
- <source type="volume" pool="niximages" volume="buildbot.qcow2" />
- </backingStore>
- <target dev="vda" bus="virtio"/>
- </disk>
- <input type="keyboard" bus="usb"/>
- <graphics type="vnc" port="-1" autoport="yes"/>
- <interface type="network">
- <source network="immae" />
- </interface>
- </devices>
- </domain>
- """.format(volume_name = self.volume_name, master_url = self.master_url, pool_name =
- self.pool_name, domain_name = self.domain_name, worker_name = self.workername)
-
- yield self._prepare_image()
-
- try:
- self.domain = yield self.connection.create(domain_xml)
- except Exception:
- log.err(failure.Failure(),
- ("Cannot start a VM ({}), failing gracefully and triggering"
- "a new build check").format(self.workername))
- self.domain = None
- return False
-
- return [self.domain_name]
-
- def stop_instance(self, fast=False):
- """
- I attempt to stop a running VM.
- I make sure any connection to the worker is removed.
- If the VM was using a cloned image, I remove the clone
- When everything is tidied up, I ask that bbot looks for work to do
- """
-
- log.msg("Attempting to stop '{}'".format(self.workername))
- if self.domain is None:
- log.msg("I don't think that domain is even running, aborting")
- return defer.succeed(None)
-
- domain = self.domain
- self.domain = None
-
- d = domain.destroy()
- if self.volume is not None:
- self.volume.destroy()
-
- return d