]>
Commit | Line | Data |
---|---|---|
1 | # This file was part of Buildbot. Buildbot is free software: you can | |
2 | # redistribute it and/or modify it under the terms of the GNU General Public | |
3 | # License as published by the Free Software Foundation, version 2. | |
4 | # | |
5 | # This program is distributed in the hope that it will be useful, but WITHOUT | |
6 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
7 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | |
8 | # details. | |
9 | # | |
10 | # You should have received a copy of the GNU General Public License along with | |
11 | # this program; if not, write to the Free Software Foundation, Inc., 51 | |
12 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
13 | # | |
14 | # Portions Copyright Buildbot Team Members | |
15 | # Portions Copyright 2010 Isotoma Limited | |
16 | ||
17 | ||
18 | import os | |
19 | ||
20 | from twisted.internet import defer | |
21 | from twisted.internet import threads | |
22 | from twisted.internet import utils | |
23 | from twisted.python import failure | |
24 | from twisted.python import log | |
25 | ||
26 | from buildbot import config | |
27 | from buildbot.util.eventual import eventually | |
28 | from buildbot.worker import AbstractLatentWorker | |
29 | ||
30 | try: | |
31 | import libvirt | |
32 | except ImportError: | |
33 | libvirt = None | |
34 | ||
35 | import random | |
36 | import string | |
37 | ||
38 | def random_string_generator(): | |
39 | chars = string.ascii_letters | |
40 | return ''.join(random.choice(chars) for x in range(6)) | |
41 | ||
42 | class WorkQueue: | |
43 | ||
44 | """ | |
45 | I am a class that turns parallel access into serial access. | |
46 | ||
47 | I exist because we want to run libvirt access in threads as we don't | |
48 | trust calls not to block, but under load libvirt doesn't seem to like | |
49 | this kind of threaded use. | |
50 | """ | |
51 | ||
52 | def __init__(self): | |
53 | self.queue = [] | |
54 | ||
55 | def _process(self): | |
56 | log.msg("Looking to start a piece of work now...") | |
57 | ||
58 | # Is there anything to do? | |
59 | if not self.queue: | |
60 | log.msg("_process called when there is no work") | |
61 | return | |
62 | ||
63 | # Peek at the top of the stack - get a function to call and | |
64 | # a deferred to fire when its all over | |
65 | d, next_operation, args, kwargs = self.queue[0] | |
66 | ||
67 | # Start doing some work - expects a deferred | |
68 | try: | |
69 | d2 = next_operation(*args, **kwargs) | |
70 | except Exception: | |
71 | d2 = defer.fail() | |
72 | ||
73 | # Whenever a piece of work is done, whether it worked or not | |
74 | # call this to schedule the next piece of work | |
75 | @d2.addBoth | |
76 | def _work_done(res): | |
77 | log.msg("Completed a piece of work") | |
78 | self.queue.pop(0) | |
79 | if self.queue: | |
80 | log.msg("Preparing next piece of work") | |
81 | eventually(self._process) | |
82 | return res | |
83 | ||
84 | # When the work is done, trigger d | |
85 | d2.chainDeferred(d) | |
86 | ||
87 | def execute(self, cb, *args, **kwargs): | |
88 | kickstart_processing = not self.queue | |
89 | d = defer.Deferred() | |
90 | self.queue.append((d, cb, args, kwargs)) | |
91 | if kickstart_processing: | |
92 | self._process() | |
93 | return d | |
94 | ||
95 | def executeInThread(self, cb, *args, **kwargs): | |
96 | return self.execute(threads.deferToThread, cb, *args, **kwargs) | |
97 | ||
98 | ||
99 | # A module is effectively a singleton class, so this is OK | |
100 | queue = WorkQueue() | |
101 | ||
102 | ||
103 | class Domain: | |
104 | ||
105 | """ | |
106 | I am a wrapper around a libvirt Domain object | |
107 | """ | |
108 | ||
109 | def __init__(self, connection, domain): | |
110 | self.connection = connection | |
111 | self.domain = domain | |
112 | ||
113 | def name(self): | |
114 | return queue.executeInThread(self.domain.name) | |
115 | ||
116 | def create(self): | |
117 | return queue.executeInThread(self.domain.create) | |
118 | ||
119 | def shutdown(self): | |
120 | return queue.executeInThread(self.domain.shutdown) | |
121 | ||
122 | def destroy(self): | |
123 | return queue.executeInThread(self.domain.destroy) | |
124 | ||
125 | class Volume: | |
126 | def __init__(self, connection, volume): | |
127 | self.connection = connection | |
128 | self.volume = volume | |
129 | ||
130 | @defer.inlineCallbacks | |
131 | def destroy(self): | |
132 | yield queue.executeInThread(self.volume.wipe) | |
133 | yield queue.executeInThread(self.volume.delete) | |
134 | ||
135 | class Pool: | |
136 | VolumeClass = Volume | |
137 | def __init__(self, connection, pool): | |
138 | self.connection = connection | |
139 | self.pool = pool | |
140 | ||
141 | @defer.inlineCallbacks | |
142 | def create_volume(self, xml): | |
143 | res = yield queue.executeInThread(self.pool.createXML, xml) | |
144 | return self.VolumeClass(self.connection, res) | |
145 | ||
146 | class Connection: | |
147 | ||
148 | """ | |
149 | I am a wrapper around a libvirt Connection object. | |
150 | """ | |
151 | ||
152 | DomainClass = Domain | |
153 | PoolClass = Pool | |
154 | ||
155 | def __init__(self, uri): | |
156 | self.uri = uri | |
157 | self.connection = libvirt.open(uri) | |
158 | ||
159 | @defer.inlineCallbacks | |
160 | def create(self, xml): | |
161 | """ I take libvirt XML and start a new VM """ | |
162 | res = yield queue.executeInThread(self.connection.createXML, xml, 0) | |
163 | return self.DomainClass(self, res) | |
164 | ||
165 | @defer.inlineCallbacks | |
166 | def lookup_pool(self, name): | |
167 | res = yield queue.executeInThread(self.connection.storagePoolLookupByName, name) | |
168 | return self.PoolClass(self, res) | |
169 | ||
170 | class LibVirtWorker(AbstractLatentWorker): | |
171 | ||
172 | def __init__(self, name, password, connection, master_url, base_image=None, **kwargs): | |
173 | super().__init__(name, password, **kwargs) | |
174 | if not libvirt: | |
175 | config.error( | |
176 | "The python module 'libvirt' is needed to use a LibVirtWorker") | |
177 | ||
178 | self.master_url = master_url | |
179 | self.random_name = random_string_generator() | |
180 | self.connection = connection | |
181 | self.base_image = base_image | |
182 | ||
183 | self.domain = None | |
184 | self.domain_name = "buildbot-" + self.workername + "-" + self.random_name | |
185 | self.volume = None | |
186 | self.volume_name = "buildbot-" + self.workername + "-" + self.random_name | |
187 | self.pool_name = "buildbot-disks" | |
188 | ||
189 | def reconfigService(self, *args, **kwargs): | |
190 | if 'build_wait_timeout' not in kwargs: | |
191 | kwargs['build_wait_timeout'] = 0 | |
192 | return super().reconfigService(*args, **kwargs) | |
193 | ||
194 | def canStartBuild(self): | |
195 | if self.domain and not self.isConnected(): | |
196 | log.msg( | |
197 | "Not accepting builds as existing domain but worker not connected") | |
198 | return False | |
199 | ||
200 | return super().canStartBuild() | |
201 | ||
202 | @defer.inlineCallbacks | |
203 | def _prepare_image(self): | |
204 | log.msg("Creating temporary image {}".format(self.volume_name)) | |
205 | pool = yield self.connection.lookup_pool(self.pool_name) | |
206 | vol_xml = """ | |
207 | <volume type='file'> | |
208 | <name>{vol_name}</name> | |
209 | <capacity unit='G'>10</capacity> | |
210 | <target> | |
211 | <format type='qcow2'/> | |
212 | <permissions> | |
213 | <mode>0600</mode> | |
214 | <owner>0</owner> | |
215 | <group>0</group> | |
216 | </permissions> | |
217 | </target> | |
218 | <backingStore> | |
219 | <path>/etc/libvirtd/base-images/buildbot.qcow2</path> | |
220 | <format type='qcow2'/> | |
221 | </backingStore> | |
222 | </volume> | |
223 | """.format(vol_name = self.volume_name) | |
224 | self.volume = yield pool.create_volume(vol_xml) | |
225 | ||
226 | @defer.inlineCallbacks | |
227 | def start_instance(self, build): | |
228 | """ | |
229 | I start a new instance of a VM. | |
230 | ||
231 | If a base_image is specified, I will make a clone of that otherwise i will | |
232 | use image directly. | |
233 | ||
234 | If i'm not given libvirt domain definition XML, I will look for my name | |
235 | in the list of defined virtual machines and start that. | |
236 | """ | |
237 | domain_xml = """ | |
238 | <domain type="kvm"> | |
239 | <name>{domain_name}</name> | |
240 | <memory unit="GiB">2</memory> | |
241 | <vcpu>1</vcpu> | |
242 | <sysinfo type='smbios'> | |
243 | <oemStrings> | |
244 | <entry>buildbot_master_url={master_url}</entry> | |
245 | <entry>buildbot_worker_name={worker_name}</entry> | |
246 | </oemStrings> | |
247 | </sysinfo> | |
248 | <os> | |
249 | <type arch="x86_64">hvm</type> | |
250 | <smbios mode='sysinfo'/> | |
251 | </os> | |
252 | <devices> | |
253 | <emulator>/run/current-system/sw/bin/qemu-system-x86_64</emulator> | |
254 | <disk type="volume" device="disk"> | |
255 | <driver name='qemu' type='qcow2' /> | |
256 | <source type="volume" pool="{pool_name}" volume="{volume_name}" /> | |
257 | <backingStore type='volume'> | |
258 | <format type='qcow2'/> | |
259 | <source type="volume" pool="niximages" volume="buildbot.qcow2" /> | |
260 | </backingStore> | |
261 | <target dev="vda" bus="virtio"/> | |
262 | </disk> | |
263 | <input type="keyboard" bus="usb"/> | |
264 | <graphics type="vnc" port="-1" autoport="yes"/> | |
265 | <interface type="network"> | |
266 | <source network="immae" /> | |
267 | </interface> | |
268 | </devices> | |
269 | </domain> | |
270 | """.format(volume_name = self.volume_name, master_url = self.master_url, pool_name = | |
271 | self.pool_name, domain_name = self.domain_name, worker_name = self.workername) | |
272 | ||
273 | yield self._prepare_image() | |
274 | ||
275 | try: | |
276 | self.domain = yield self.connection.create(domain_xml) | |
277 | except Exception: | |
278 | log.err(failure.Failure(), | |
279 | ("Cannot start a VM ({}), failing gracefully and triggering" | |
280 | "a new build check").format(self.workername)) | |
281 | self.domain = None | |
282 | return False | |
283 | ||
284 | return [self.domain_name] | |
285 | ||
286 | def stop_instance(self, fast=False): | |
287 | """ | |
288 | I attempt to stop a running VM. | |
289 | I make sure any connection to the worker is removed. | |
290 | If the VM was using a cloned image, I remove the clone | |
291 | When everything is tidied up, I ask that bbot looks for work to do | |
292 | """ | |
293 | ||
294 | log.msg("Attempting to stop '{}'".format(self.workername)) | |
295 | if self.domain is None: | |
296 | log.msg("I don't think that domain is even running, aborting") | |
297 | return defer.succeed(None) | |
298 | ||
299 | domain = self.domain | |
300 | self.domain = None | |
301 | ||
302 | d = domain.destroy() | |
303 | if self.volume is not None: | |
304 | self.volume.destroy() | |
305 | ||
306 | return d |