]>
Commit | Line | Data |
---|---|---|
200690c9 IB |
1 | # This file was part of Buildbot. Buildbot is free software: you can |
2 | # redistribute it and/or modify it under the terms of the GNU General Public | |
3 | # License as published by the Free Software Foundation, version 2. | |
4 | # | |
5 | # This program is distributed in the hope that it will be useful, but WITHOUT | |
6 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
7 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | |
8 | # details. | |
9 | # | |
10 | # You should have received a copy of the GNU General Public License along with | |
11 | # this program; if not, write to the Free Software Foundation, Inc., 51 | |
12 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
13 | # | |
14 | # Portions Copyright Buildbot Team Members | |
15 | # Portions Copyright 2010 Isotoma Limited | |
16 | ||
17 | ||
18 | import os | |
19 | ||
20 | from twisted.internet import defer | |
21 | from twisted.internet import threads | |
22 | from twisted.internet import utils | |
23 | from twisted.python import failure | |
24 | from twisted.python import log | |
25 | ||
26 | from buildbot import config | |
27 | from buildbot.util.eventual import eventually | |
28 | from buildbot.worker import AbstractLatentWorker | |
29 | ||
30 | try: | |
31 | import libvirt | |
32 | except ImportError: | |
33 | libvirt = None | |
34 | ||
35 | import random | |
36 | import string | |
37 | ||
38 | def random_string_generator(): | |
39 | chars = string.ascii_letters | |
40 | return ''.join(random.choice(chars) for x in range(6)) | |
41 | ||
42 | class WorkQueue: | |
43 | ||
44 | """ | |
45 | I am a class that turns parallel access into serial access. | |
46 | ||
47 | I exist because we want to run libvirt access in threads as we don't | |
48 | trust calls not to block, but under load libvirt doesn't seem to like | |
49 | this kind of threaded use. | |
50 | """ | |
51 | ||
52 | def __init__(self): | |
53 | self.queue = [] | |
54 | ||
55 | def _process(self): | |
56 | log.msg("Looking to start a piece of work now...") | |
57 | ||
58 | # Is there anything to do? | |
59 | if not self.queue: | |
60 | log.msg("_process called when there is no work") | |
61 | return | |
62 | ||
63 | # Peek at the top of the stack - get a function to call and | |
64 | # a deferred to fire when its all over | |
65 | d, next_operation, args, kwargs = self.queue[0] | |
66 | ||
67 | # Start doing some work - expects a deferred | |
68 | try: | |
69 | d2 = next_operation(*args, **kwargs) | |
70 | except Exception: | |
71 | d2 = defer.fail() | |
72 | ||
73 | # Whenever a piece of work is done, whether it worked or not | |
74 | # call this to schedule the next piece of work | |
75 | @d2.addBoth | |
76 | def _work_done(res): | |
77 | log.msg("Completed a piece of work") | |
78 | self.queue.pop(0) | |
79 | if self.queue: | |
80 | log.msg("Preparing next piece of work") | |
81 | eventually(self._process) | |
82 | return res | |
83 | ||
84 | # When the work is done, trigger d | |
85 | d2.chainDeferred(d) | |
86 | ||
87 | def execute(self, cb, *args, **kwargs): | |
88 | kickstart_processing = not self.queue | |
89 | d = defer.Deferred() | |
90 | self.queue.append((d, cb, args, kwargs)) | |
91 | if kickstart_processing: | |
92 | self._process() | |
93 | return d | |
94 | ||
95 | def executeInThread(self, cb, *args, **kwargs): | |
96 | return self.execute(threads.deferToThread, cb, *args, **kwargs) | |
97 | ||
98 | ||
99 | # A module is effectively a singleton class, so this is OK | |
100 | queue = WorkQueue() | |
101 | ||
102 | ||
103 | class Domain: | |
104 | ||
105 | """ | |
106 | I am a wrapper around a libvirt Domain object | |
107 | """ | |
108 | ||
109 | def __init__(self, connection, domain): | |
110 | self.connection = connection | |
111 | self.domain = domain | |
112 | ||
113 | def name(self): | |
114 | return queue.executeInThread(self.domain.name) | |
115 | ||
116 | def create(self): | |
117 | return queue.executeInThread(self.domain.create) | |
118 | ||
119 | def shutdown(self): | |
120 | return queue.executeInThread(self.domain.shutdown) | |
121 | ||
122 | def destroy(self): | |
123 | return queue.executeInThread(self.domain.destroy) | |
124 | ||
125 | class Volume: | |
126 | def __init__(self, connection, volume): | |
127 | self.connection = connection | |
128 | self.volume = volume | |
129 | ||
130 | @defer.inlineCallbacks | |
131 | def destroy(self): | |
132 | yield queue.executeInThread(self.volume.wipe) | |
133 | yield queue.executeInThread(self.volume.delete) | |
134 | ||
135 | class Pool: | |
136 | VolumeClass = Volume | |
137 | def __init__(self, connection, pool): | |
138 | self.connection = connection | |
139 | self.pool = pool | |
140 | ||
141 | @defer.inlineCallbacks | |
142 | def create_volume(self, xml): | |
143 | res = yield queue.executeInThread(self.pool.createXML, xml) | |
144 | return self.VolumeClass(self.connection, res) | |
145 | ||
146 | class Connection: | |
147 | ||
148 | """ | |
149 | I am a wrapper around a libvirt Connection object. | |
150 | """ | |
151 | ||
152 | DomainClass = Domain | |
153 | PoolClass = Pool | |
154 | ||
155 | def __init__(self, uri): | |
156 | self.uri = uri | |
2256f2e2 IB |
157 | self._connection = None |
158 | ||
159 | @property | |
160 | def connection(self): | |
161 | if self._connection is not None: | |
162 | try: | |
163 | if not self._connection.isAlive(): | |
164 | self._connection = None | |
165 | except: | |
166 | self._connection = None | |
167 | if self._connection is None: | |
168 | self._connection = libvirt.open(self.uri) | |
169 | return self._connection | |
200690c9 IB |
170 | |
171 | @defer.inlineCallbacks | |
172 | def create(self, xml): | |
173 | """ I take libvirt XML and start a new VM """ | |
174 | res = yield queue.executeInThread(self.connection.createXML, xml, 0) | |
175 | return self.DomainClass(self, res) | |
176 | ||
177 | @defer.inlineCallbacks | |
178 | def lookup_pool(self, name): | |
179 | res = yield queue.executeInThread(self.connection.storagePoolLookupByName, name) | |
180 | return self.PoolClass(self, res) | |
181 | ||
182 | class LibVirtWorker(AbstractLatentWorker): | |
183 | ||
184 | def __init__(self, name, password, connection, master_url, base_image=None, **kwargs): | |
185 | super().__init__(name, password, **kwargs) | |
186 | if not libvirt: | |
187 | config.error( | |
188 | "The python module 'libvirt' is needed to use a LibVirtWorker") | |
189 | ||
190 | self.master_url = master_url | |
191 | self.random_name = random_string_generator() | |
192 | self.connection = connection | |
193 | self.base_image = base_image | |
194 | ||
195 | self.domain = None | |
196 | self.domain_name = "buildbot-" + self.workername + "-" + self.random_name | |
197 | self.volume = None | |
198 | self.volume_name = "buildbot-" + self.workername + "-" + self.random_name | |
199 | self.pool_name = "buildbot-disks" | |
200 | ||
201 | def reconfigService(self, *args, **kwargs): | |
202 | if 'build_wait_timeout' not in kwargs: | |
203 | kwargs['build_wait_timeout'] = 0 | |
204 | return super().reconfigService(*args, **kwargs) | |
205 | ||
206 | def canStartBuild(self): | |
207 | if self.domain and not self.isConnected(): | |
208 | log.msg( | |
209 | "Not accepting builds as existing domain but worker not connected") | |
210 | return False | |
211 | ||
212 | return super().canStartBuild() | |
213 | ||
214 | @defer.inlineCallbacks | |
215 | def _prepare_image(self): | |
216 | log.msg("Creating temporary image {}".format(self.volume_name)) | |
217 | pool = yield self.connection.lookup_pool(self.pool_name) | |
218 | vol_xml = """ | |
219 | <volume type='file'> | |
220 | <name>{vol_name}</name> | |
221 | <capacity unit='G'>10</capacity> | |
222 | <target> | |
223 | <format type='qcow2'/> | |
224 | <permissions> | |
225 | <mode>0600</mode> | |
226 | <owner>0</owner> | |
227 | <group>0</group> | |
228 | </permissions> | |
229 | </target> | |
230 | <backingStore> | |
231 | <path>/etc/libvirtd/base-images/buildbot.qcow2</path> | |
232 | <format type='qcow2'/> | |
233 | </backingStore> | |
234 | </volume> | |
235 | """.format(vol_name = self.volume_name) | |
236 | self.volume = yield pool.create_volume(vol_xml) | |
237 | ||
238 | @defer.inlineCallbacks | |
239 | def start_instance(self, build): | |
240 | """ | |
241 | I start a new instance of a VM. | |
242 | ||
243 | If a base_image is specified, I will make a clone of that otherwise i will | |
244 | use image directly. | |
245 | ||
246 | If i'm not given libvirt domain definition XML, I will look for my name | |
247 | in the list of defined virtual machines and start that. | |
248 | """ | |
249 | domain_xml = """ | |
250 | <domain type="kvm"> | |
251 | <name>{domain_name}</name> | |
252 | <memory unit="GiB">2</memory> | |
253 | <vcpu>1</vcpu> | |
254 | <sysinfo type='smbios'> | |
255 | <oemStrings> | |
256 | <entry>buildbot_master_url={master_url}</entry> | |
257 | <entry>buildbot_worker_name={worker_name}</entry> | |
258 | </oemStrings> | |
259 | </sysinfo> | |
260 | <os> | |
261 | <type arch="x86_64">hvm</type> | |
262 | <smbios mode='sysinfo'/> | |
263 | </os> | |
264 | <devices> | |
265 | <emulator>/run/current-system/sw/bin/qemu-system-x86_64</emulator> | |
266 | <disk type="volume" device="disk"> | |
267 | <driver name='qemu' type='qcow2' /> | |
268 | <source type="volume" pool="{pool_name}" volume="{volume_name}" /> | |
269 | <backingStore type='volume'> | |
270 | <format type='qcow2'/> | |
271 | <source type="volume" pool="niximages" volume="buildbot.qcow2" /> | |
272 | </backingStore> | |
273 | <target dev="vda" bus="virtio"/> | |
274 | </disk> | |
275 | <input type="keyboard" bus="usb"/> | |
276 | <graphics type="vnc" port="-1" autoport="yes"/> | |
277 | <interface type="network"> | |
278 | <source network="immae" /> | |
279 | </interface> | |
280 | </devices> | |
281 | </domain> | |
282 | """.format(volume_name = self.volume_name, master_url = self.master_url, pool_name = | |
283 | self.pool_name, domain_name = self.domain_name, worker_name = self.workername) | |
284 | ||
285 | yield self._prepare_image() | |
286 | ||
287 | try: | |
288 | self.domain = yield self.connection.create(domain_xml) | |
289 | except Exception: | |
290 | log.err(failure.Failure(), | |
291 | ("Cannot start a VM ({}), failing gracefully and triggering" | |
292 | "a new build check").format(self.workername)) | |
293 | self.domain = None | |
294 | return False | |
295 | ||
296 | return [self.domain_name] | |
297 | ||
298 | def stop_instance(self, fast=False): | |
299 | """ | |
300 | I attempt to stop a running VM. | |
301 | I make sure any connection to the worker is removed. | |
302 | If the VM was using a cloned image, I remove the clone | |
303 | When everything is tidied up, I ask that bbot looks for work to do | |
304 | """ | |
305 | ||
306 | log.msg("Attempting to stop '{}'".format(self.workername)) | |
307 | if self.domain is None: | |
308 | log.msg("I don't think that domain is even running, aborting") | |
309 | return defer.succeed(None) | |
310 | ||
311 | domain = self.domain | |
312 | self.domain = None | |
313 | ||
314 | d = domain.destroy() | |
315 | if self.volume is not None: | |
316 | self.volume.destroy() | |
317 | ||
318 | return d |