--- /dev/null
+# This file was part of Buildbot. Buildbot is free software: you can
+# redistribute it and/or modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation, version 2.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program; if not, write to the Free Software Foundation, Inc., 51
+# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Portions Copyright Buildbot Team Members
+# Portions Copyright 2010 Isotoma Limited
+
+
+import os
+
+from twisted.internet import defer
+from twisted.internet import threads
+from twisted.internet import utils
+from twisted.python import failure
+from twisted.python import log
+
+from buildbot import config
+from buildbot.util.eventual import eventually
+from buildbot.worker import AbstractLatentWorker
+
+try:
+ import libvirt
+except ImportError:
+ libvirt = None
+
+import random
+import string
+
+def random_string_generator():
+ chars = string.ascii_letters
+ return ''.join(random.choice(chars) for x in range(6))
+
+class WorkQueue:
+
+ """
+ I am a class that turns parallel access into serial access.
+
+ I exist because we want to run libvirt access in threads as we don't
+ trust calls not to block, but under load libvirt doesn't seem to like
+ this kind of threaded use.
+ """
+
+ def __init__(self):
+ self.queue = []
+
+ def _process(self):
+ log.msg("Looking to start a piece of work now...")
+
+ # Is there anything to do?
+ if not self.queue:
+ log.msg("_process called when there is no work")
+ return
+
+ # Peek at the top of the stack - get a function to call and
+ # a deferred to fire when its all over
+ d, next_operation, args, kwargs = self.queue[0]
+
+ # Start doing some work - expects a deferred
+ try:
+ d2 = next_operation(*args, **kwargs)
+ except Exception:
+ d2 = defer.fail()
+
+ # Whenever a piece of work is done, whether it worked or not
+ # call this to schedule the next piece of work
+ @d2.addBoth
+ def _work_done(res):
+ log.msg("Completed a piece of work")
+ self.queue.pop(0)
+ if self.queue:
+ log.msg("Preparing next piece of work")
+ eventually(self._process)
+ return res
+
+ # When the work is done, trigger d
+ d2.chainDeferred(d)
+
+ def execute(self, cb, *args, **kwargs):
+ kickstart_processing = not self.queue
+ d = defer.Deferred()
+ self.queue.append((d, cb, args, kwargs))
+ if kickstart_processing:
+ self._process()
+ return d
+
+ def executeInThread(self, cb, *args, **kwargs):
+ return self.execute(threads.deferToThread, cb, *args, **kwargs)
+
+
+# A module is effectively a singleton class, so this is OK
+queue = WorkQueue()
+
+
+class Domain:
+
+ """
+ I am a wrapper around a libvirt Domain object
+ """
+
+ def __init__(self, connection, domain):
+ self.connection = connection
+ self.domain = domain
+
+ def name(self):
+ return queue.executeInThread(self.domain.name)
+
+ def create(self):
+ return queue.executeInThread(self.domain.create)
+
+ def shutdown(self):
+ return queue.executeInThread(self.domain.shutdown)
+
+ def destroy(self):
+ return queue.executeInThread(self.domain.destroy)
+
+class Volume:
+ def __init__(self, connection, volume):
+ self.connection = connection
+ self.volume = volume
+
+ @defer.inlineCallbacks
+ def destroy(self):
+ yield queue.executeInThread(self.volume.wipe)
+ yield queue.executeInThread(self.volume.delete)
+
+class Pool:
+ VolumeClass = Volume
+ def __init__(self, connection, pool):
+ self.connection = connection
+ self.pool = pool
+
+ @defer.inlineCallbacks
+ def create_volume(self, xml):
+ res = yield queue.executeInThread(self.pool.createXML, xml)
+ return self.VolumeClass(self.connection, res)
+
+class Connection:
+
+ """
+ I am a wrapper around a libvirt Connection object.
+ """
+
+ DomainClass = Domain
+ PoolClass = Pool
+
+ def __init__(self, uri):
+ self.uri = uri
+ self._connection = None
+
+ @property
+ def connection(self):
+ if self._connection is not None:
+ try:
+ if not self._connection.isAlive():
+ self._connection = None
+ except:
+ self._connection = None
+ if self._connection is None:
+ self._connection = libvirt.open(self.uri)
+ return self._connection
+
+ @defer.inlineCallbacks
+ def create(self, xml):
+ """ I take libvirt XML and start a new VM """
+ res = yield queue.executeInThread(self.connection.createXML, xml, 0)
+ return self.DomainClass(self, res)
+
+ @defer.inlineCallbacks
+ def lookup_pool(self, name):
+ res = yield queue.executeInThread(self.connection.storagePoolLookupByName, name)
+ return self.PoolClass(self, res)
+
+class LibVirtWorker(AbstractLatentWorker):
+
+ def __init__(self, name, password, connection, master_url, base_image=None, **kwargs):
+ super().__init__(name, password, **kwargs)
+ if not libvirt:
+ config.error(
+ "The python module 'libvirt' is needed to use a LibVirtWorker")
+
+ self.master_url = master_url
+ self.random_name = random_string_generator()
+ self.connection = connection
+ self.base_image = base_image
+
+ self.domain = None
+ self.domain_name = "buildbot-" + self.workername + "-" + self.random_name
+ self.volume = None
+ self.volume_name = "buildbot-" + self.workername + "-" + self.random_name
+ self.pool_name = "buildbot-disks"
+
+ def reconfigService(self, *args, **kwargs):
+ if 'build_wait_timeout' not in kwargs:
+ kwargs['build_wait_timeout'] = 0
+ return super().reconfigService(*args, **kwargs)
+
+ def canStartBuild(self):
+ if self.domain and not self.isConnected():
+ log.msg(
+ "Not accepting builds as existing domain but worker not connected")
+ return False
+
+ return super().canStartBuild()
+
+ @defer.inlineCallbacks
+ def _prepare_image(self):
+ log.msg("Creating temporary image {}".format(self.volume_name))
+ pool = yield self.connection.lookup_pool(self.pool_name)
+ vol_xml = """
+ <volume type='file'>
+ <name>{vol_name}</name>
+ <capacity unit='G'>10</capacity>
+ <target>
+ <format type='qcow2'/>
+ <permissions>
+ <mode>0600</mode>
+ <owner>0</owner>
+ <group>0</group>
+ </permissions>
+ </target>
+ <backingStore>
+ <path>/etc/libvirtd/base-images/buildbot.qcow2</path>
+ <format type='qcow2'/>
+ </backingStore>
+ </volume>
+ """.format(vol_name = self.volume_name)
+ self.volume = yield pool.create_volume(vol_xml)
+
+ @defer.inlineCallbacks
+ def start_instance(self, build):
+ """
+ I start a new instance of a VM.
+
+ If a base_image is specified, I will make a clone of that otherwise i will
+ use image directly.
+
+ If i'm not given libvirt domain definition XML, I will look for my name
+ in the list of defined virtual machines and start that.
+ """
+ domain_xml = """
+ <domain type="kvm">
+ <name>{domain_name}</name>
+ <memory unit="GiB">2</memory>
+ <vcpu>1</vcpu>
+ <sysinfo type='smbios'>
+ <oemStrings>
+ <entry>buildbot_master_url={master_url}</entry>
+ <entry>buildbot_worker_name={worker_name}</entry>
+ </oemStrings>
+ </sysinfo>
+ <os>
+ <type arch="x86_64">hvm</type>
+ <smbios mode='sysinfo'/>
+ </os>
+ <devices>
+ <emulator>/run/current-system/sw/bin/qemu-system-x86_64</emulator>
+ <disk type="volume" device="disk">
+ <driver name='qemu' type='qcow2' />
+ <source type="volume" pool="{pool_name}" volume="{volume_name}" />
+ <backingStore type='volume'>
+ <format type='qcow2'/>
+ <source type="volume" pool="niximages" volume="buildbot.qcow2" />
+ </backingStore>
+ <target dev="vda" bus="virtio"/>
+ </disk>
+ <input type="keyboard" bus="usb"/>
+ <graphics type="vnc" port="-1" autoport="yes"/>
+ <interface type="network">
+ <source network="immae" />
+ </interface>
+ </devices>
+ </domain>
+ """.format(volume_name = self.volume_name, master_url = self.master_url, pool_name =
+ self.pool_name, domain_name = self.domain_name, worker_name = self.workername)
+
+ yield self._prepare_image()
+
+ try:
+ self.domain = yield self.connection.create(domain_xml)
+ except Exception:
+ log.err(failure.Failure(),
+ ("Cannot start a VM ({}), failing gracefully and triggering"
+ "a new build check").format(self.workername))
+ self.domain = None
+ return False
+
+ return [self.domain_name]
+
+ def stop_instance(self, fast=False):
+ """
+ I attempt to stop a running VM.
+ I make sure any connection to the worker is removed.
+ If the VM was using a cloned image, I remove the clone
+ When everything is tidied up, I ask that bbot looks for work to do
+ """
+
+ log.msg("Attempting to stop '{}'".format(self.workername))
+ if self.domain is None:
+ log.msg("I don't think that domain is even running, aborting")
+ return defer.succeed(None)
+
+ domain = self.domain
+ self.domain = None
+
+ d = domain.destroy()
+ if self.volume is not None:
+ self.volume.destroy()
+
+ return d