Alioth Code Coverage

dev.rs48.37%

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod balloon;
16pub mod blk;
17pub mod entropy;
18#[path = "fs/fs.rs"]
19pub mod fs;
20#[path = "net/net.rs"]
21pub mod net;
22#[path = "vsock/vsock.rs"]
23pub mod vsock;
24
25use std::fmt::Debug;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicU8, AtomicU16, AtomicU32};
28use std::sync::mpsc::{self, Receiver, Sender};
29use std::thread::JoinHandle;
30
31use bitflags::Flags;
32use snafu::ResultExt;
33
34use crate::hv::IoeventFd;
35use crate::mem::emulated::Mmio;
36use crate::mem::mapped::{Ram, RamBus};
37use crate::mem::{LayoutChanged, LayoutUpdated, MemRegion};
38use crate::sync::notifier::Notifier;
39use crate::virtio::queue::packed::PackedQueue;
40use crate::virtio::queue::split::SplitQueue;
41use crate::virtio::queue::{QUEUE_SIZE_MAX, Queue, QueueReg, VirtQueue};
42#[cfg(target_os = "linux")]
43use crate::virtio::vu::conn::VuChannel;
44use crate::virtio::{DeviceId, IrqSender, Result, VirtioFeature, error};
45
46pub trait Virtio: Debug + Send + Sync + 'static {
47 type Config: Mmio;
48 type Feature: Flags<Bits = u128> + Debug;
49
50 fn name(&self) -> &str;
51 fn id(&self) -> DeviceId;
52 fn num_queues(&self) -> u16;
53 fn config(&self) -> Arc<Self::Config>;
54 fn feature(&self) -> u128;
55 fn spawn_worker<S: IrqSender, E: IoeventFd>(
56 self,
57 event_rx: Receiver<WakeEvent<S, E>>,
58 memory: Arc<RamBus>,
59 queue_regs: Arc<[QueueReg]>,
60 ) -> Result<(JoinHandle<()>, Arc<Notifier>)>;
61 fn shared_mem_regions(&self) -> Option<Arc<MemRegion>> {
62 None
63 }
64 fn ioeventfd_offloaded(&self, _q_index: u16) -> Result<bool> {
65 Ok(false)
66 }
67 fn mem_update_callback(&self) -> Option<Box<dyn LayoutUpdated>> {
68 None
69 }
70 fn mem_change_callback(&self) -> Option<Box<dyn LayoutChanged>> {
71 None
72 }
73 #[cfg(target_os = "linux")]
74 fn set_vu_channel(&mut self, _channel: Arc<VuChannel>) {}
75}
76
77#[derive(Debug, Default)]
78pub struct Register {
79 pub device_feature: [u32; 4],
80 pub driver_feature: [AtomicU32; 4],
81 pub device_feature_sel: AtomicU8,
82 pub driver_feature_sel: AtomicU8,
83 pub queue_sel: AtomicU16,
84 pub status: AtomicU8,
85}
86
87const TOKEN_WARKER: u64 = 1 << 63;
88
89#[derive(Debug, Clone)]
90pub struct StartParam<S, E>
91where
92 S: IrqSender,
93 E: IoeventFd,
94{
95 pub(crate) feature: u128,
96 pub(crate) irq_sender: Arc<S>,
97 pub(crate) ioeventfds: Option<Arc<[E]>>,
98}
99
100#[derive(Debug, Clone)]
101pub enum WakeEvent<S, E>
102where
103 S: IrqSender,
104 E: IoeventFd,
105{
106 Notify {
107 q_index: u16,
108 },
109 Shutdown,
110 #[cfg(target_os = "linux")]
111 VuChannel {
112 channel: Arc<VuChannel>,
113 },
114 Start {
115 param: StartParam<S, E>,
116 },
117 Reset,
118}
119
120#[derive(Debug, PartialEq, Eq)]
121pub enum WorkerState {
122 Pending,
123 Running,
124 Shutdown,
125}
126
127#[derive(Debug)]
128pub struct Worker<D, S, E, B>
129where
130 S: IrqSender,
131 E: IoeventFd,
132{
133 context: Context<D, S, E>,
134 backend: B,
135}
136
137#[derive(Debug)]
138pub struct VirtioDevice<S, E>
139where
140 S: IrqSender,
141 E: IoeventFd,
142{
143 pub name: Arc<str>,
144 pub id: DeviceId,
145 pub device_config: Arc<dyn Mmio>,
146 pub device_feature: u128,
147 pub queue_regs: Arc<[QueueReg]>,
148 pub shared_mem_regions: Option<Arc<MemRegion>>,
149 pub notifier: Arc<Notifier>,
150 pub event_tx: Sender<WakeEvent<S, E>>,
151 worker_handle: Option<JoinHandle<()>>,
152}
153
154impl<S, E> VirtioDevice<S, E>
155where
156 S: IrqSender,
157 E: IoeventFd,
158{
159 fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> {
160 let Some(handle) = self.worker_handle.take() else {
161 return Ok(());
162 };
163 self.event_tx.send(WakeEvent::Shutdown)?;
164 self.notifier.notify()?;
165 if let Err(e) = handle.join() {
166 log::error!("{}: failed to join worker thread: {e:?}", self.name)
167 }
168 Ok(())
169 }
170
171 pub fn new<D>(
172 name: impl Into<Arc<str>>,
173 dev: D,
174 memory: Arc<RamBus>,
175 restricted_memory: bool,
176 ) -> Result<Self>
177 where
178 D: Virtio,
179 {
180 let name = name.into();
181 let id = dev.id();
182 let device_config = dev.config();
183 let mut device_feature = dev.feature();
184 if restricted_memory {
185 device_feature |= VirtioFeature::ACCESS_PLATFORM.bits()
186 } else {
187 device_feature &= !VirtioFeature::ACCESS_PLATFORM.bits()
188 }
189 let num_queues = dev.num_queues();
190 let queue_regs = (0..num_queues).map(|_| QueueReg {
191 size: AtomicU16::new(QUEUE_SIZE_MAX),
192 ..Default::default()
193 });
194 let queue_regs = queue_regs.collect::<Arc<_>>();
195
196 let shared_mem_regions = dev.shared_mem_regions();
197 let (event_tx, event_rx) = mpsc::channel();
198 let (handle, notifier) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?;
199 log::debug!(
200 "{name}: created with {:x?}, {:x?}",
201 VirtioFeature::from_bits_retain(device_feature & !D::Feature::all().bits()),
202 D::Feature::from_bits_truncate(device_feature)
203 );
204 let virtio_dev = VirtioDevice {
205 name,
206 id,
207 device_feature,
208 queue_regs,
209 worker_handle: Some(handle),
210 event_tx,
211 notifier,
212 device_config,
213 shared_mem_regions,
214 };
215 Ok(virtio_dev)
216 }
217}
218
219impl<S, E> Drop for VirtioDevice<S, E>
220where
221 S: IrqSender,
222 E: IoeventFd,
223{
224 fn drop(&mut self) {
225 if let Err(e) = self.shutdown() {
226 log::error!("{}: failed to shutdown: {e}", self.name);
227 }
228 }
229}
230
231pub trait Backend<D: Virtio>: Send + 'static {
232 fn register_notifier(&mut self, token: u64) -> Result<Arc<Notifier>>;
233 fn reset(&self, dev: &mut D) -> Result<()>;
234 fn event_loop<'m, S, Q, E>(
235 &mut self,
236 memory: &'m Ram,
237 context: &mut Context<D, S, E>,
238 queues: &mut [Option<Queue<'_, 'm, Q>>],
239 param: &StartParam<S, E>,
240 ) -> Result<()>
241 where
242 S: IrqSender,
243 Q: VirtQueue<'m>,
244 E: IoeventFd;
245}
246
247pub trait BackendEvent {
248 fn token(&self) -> u64;
249}
250
251pub trait ActiveBackend<D: Virtio> {
252 type Event: BackendEvent;
253 fn handle_event(&mut self, dev: &mut D, event: &Self::Event) -> Result<()>;
254 fn handle_queue(&mut self, dev: &mut D, index: u16) -> Result<()>;
255}
256
257#[derive(Debug)]
258pub struct Context<D, S, E>
259where
260 S: IrqSender,
261 E: IoeventFd,
262{
263 pub dev: D,
264 memory: Arc<RamBus>,
265 event_rx: Receiver<WakeEvent<S, E>>,
266 queue_regs: Arc<[QueueReg]>,
267 pub state: WorkerState,
268}
269
270impl<D, S, E> Context<D, S, E>
271where
272 D: Virtio,
273 S: IrqSender,
274 E: IoeventFd,
275{
276 fn handle_wake_events<B>(&mut self, backend: &mut B) -> Result<()>29x
277 where29x
278 B: ActiveBackend<D>,29x
279 {
280 while let Ok(event) = self.event_rx.try_recv() {59x
281 match event {36x
282 WakeEvent::Notify { q_index } => backend.handle_queue(&mut self.dev, q_index)?,30x
283 WakeEvent::Shutdown => {
284 self.state = WorkerState::Shutdown;6x
285 break;6x
286 }
287 WakeEvent::Start { .. } => {
288 log::error!("{}: device has already started", self.dev.name())
289 }
290 #[cfg(target_os = "linux")]
291 WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
292 WakeEvent::Reset => {
293 log::info!("{}: guest requested reset", self.dev.name());
294 self.state = WorkerState::Pending;
295 break;
296 }
297 }
298 }
299 Ok(())29x
300 }29x
301
302 fn wait_start(&mut self) -> Option<StartParam<S, E>> {6x
303 for wake_event in self.event_rx.iter() {6x
304 match wake_event {6x
305 WakeEvent::Reset => {}
306 WakeEvent::Start { param } => {6x
307 self.state = WorkerState::Running;6x
308 return Some(param);6x
309 }
310 #[cfg(target_os = "linux")]
311 WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
312 WakeEvent::Shutdown => break,
313 WakeEvent::Notify { q_index } => {
314 log::error!(
315 "{}: driver notified queue {q_index} before device is ready",
316 self.dev.name()
317 )
318 }
319 }
320 }
321 self.state = WorkerState::Shutdown;
322 None
323 }6x
324
325 pub fn handle_event<B>(&mut self, event: &B::Event, backend: &mut B) -> Result<()>38x
326 where38x
327 B: ActiveBackend<D>,38x
328 {
329 if event.token() == TOKEN_WARKER {38x
330 self.handle_wake_events(backend)29x
331 } else {
332 backend.handle_event(&mut self.dev, event)9x
333 }
334 }38x
335}
336
337impl<D, S, E, B> Worker<D, S, E, B>
338where
339 D: Virtio,
340 S: IrqSender,
341 B: Backend<D>,
342 E: IoeventFd,
343{
344 pub fn spawn(6x
345 dev: D,6x
346 mut backend: B,6x
347 event_rx: Receiver<WakeEvent<S, E>>,6x
348 memory: Arc<RamBus>,6x
349 queue_regs: Arc<[QueueReg]>,6x
350 ) -> Result<(JoinHandle<()>, Arc<Notifier>)> {6x
351 let notifier = backend.register_notifier(TOKEN_WARKER)?;6x
352 let worker = Worker {6x
353 context: Context {6x
354 dev,6x
355 event_rx,6x
356 memory,6x
357 queue_regs,6x
358 state: WorkerState::Pending,6x
359 },6x
360 backend,6x
361 };6x
362 let name = worker.context.dev.name();6x
363 let handle = std::thread::Builder::new()6x
364 .name(name.to_owned())6x
365 .spawn(move || worker.do_work())6x
366 .context(error::WorkerThread)?;6x
367 Ok((handle, notifier))6x
368 }6x
369
370 fn event_loop<'m, Q>(6x
371 &mut self,6x
372 queues: &mut [Option<Queue<'_, 'm, Q>>],6x
373 ram: &'m Ram,6x
374 param: &StartParam<S, E>,6x
375 ) -> Result<()>6x
376 where6x
377 Q: VirtQueue<'m>,6x
378 E: IoeventFd,6x
379 {
380 log::debug!(6x
381 "{}: activated with {:x?}, {:x?}",
382 self.context.dev.name(),6x
383 VirtioFeature::from_bits_retain(param.feature & !D::Feature::all().bits()),6x
384 D::Feature::from_bits_truncate(param.feature)6x
385 );
386 self.backend6x
387 .event_loop(ram, &mut self.context, queues, param)6x
388 }6x
389
390 fn loop_until_reset(&mut self) -> Result<()> {6x
391 let Some(param) = self.context.wait_start() else {6x
392 return Ok(());
393 };
394 let memory = self.context.memory.clone();6x
395 let ram = memory.lock_layout();6x
396 let feature = param.feature & !VirtioFeature::ACCESS_PLATFORM.bits();6x
397 let queue_regs = self.context.queue_regs.clone();6x
398 let feature = VirtioFeature::from_bits_retain(feature);6x
399 let event_idx = feature.contains(VirtioFeature::EVENT_IDX);6x
400 if feature.contains(VirtioFeature::RING_PACKED) {6x
401 let new_queue = |reg| {
402 let Some(split_queue) = PackedQueue::new(reg, &ram, event_idx)? else {
403 return Ok(None);
404 };
405 Ok(Some(Queue::new(split_queue, reg, &ram)))
406 };
407 let queues: Result<Box<_>> = queue_regs.iter().map(new_queue).collect();
408 self.event_loop(&mut (queues?), &ram, &param)?;
409 } else {
410 let new_queue = |reg| {12x
411 let Some(split_queue) = SplitQueue::new(reg, &ram, event_idx)? else {12x
412 return Ok(None);
413 };
414 Ok(Some(Queue::new(split_queue, reg, &ram)))12x
415 };12x
416 let queues: Result<Box<_>> = queue_regs.iter().map(new_queue).collect();6x
417 self.event_loop(&mut (queues?), &ram, &param)?;6x
418 };
419 self.backend.reset(&mut self.context.dev)?;6x
420 Ok(())6x
421 }6x
422
423 fn do_work(mut self) {6x
424 while self.context.state != WorkerState::Shutdown {12x
425 if let Err(e) = self.loop_until_reset() {6x
426 log::error!("worker {}: {e:?}", self.context.dev.name(),);
427 return;
428 }6x
429 }
430 log::debug!("worker {}: done", self.context.dev.name())6x
431 }6x
432}
433
434pub trait DevParam {
435 type Device;
436 fn build(self, name: impl Into<Arc<str>>) -> Result<Self::Device>;
437 fn needs_mem_shared_fd(&self) -> bool {
438 false
439 }
440}
441