Alioth Code Coverage

uds_vsock.rs86.82%

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::fs;
18use std::io::{BufRead, BufReader, BufWriter, ErrorKind, IoSlice, IoSliceMut, Read, Write};
19use std::mem::size_of_val;
20use std::num::Wrapping;
21use std::os::fd::AsRawFd;
22use std::os::unix::net::{UnixListener, UnixStream};
23use std::path::Path;
24use std::sync::Arc;
25use std::sync::mpsc::Receiver;
26use std::thread::JoinHandle;
27
28use crate::ffi;
29use crate::hv::IoeventFd;
30use crate::mem::mapped::RamBus;
31use crate::sync::notifier::Notifier;
32use crate::virtio::dev::vsock::{
33 ShutdownFlag, VSOCK_CID_HOST, VsockConfig, VsockFeature, VsockHeader, VsockOp, VsockType,
34 VsockVirtq,
35};
36use crate::virtio::dev::{DevParam, Virtio, WakeEvent};
37use crate::virtio::queue::{DescChain, Queue, QueueReg, Status, VirtQueue};
38use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio};
39use crate::virtio::{DeviceId, FEATURE_BUILT_IN, IrqSender, Result, error};
40
41use mio::event::Event;
42use mio::unix::SourceFd;
43use mio::{Interest, Registry, Token};
44use serde::Deserialize;
45use serde_aco::Help;
46use zerocopy::{FromBytes, IntoBytes};
47
48const HEADER_SIZE: usize = size_of::<VsockHeader>();
49const SOCKET_TYPE: VsockType = VsockType::STREAM;
50
51#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Help)]
52pub struct UdsVsockParam {
53 /// Vsock context id.
54 pub cid: u32,
55 /// Host-side Unix domain socket path.
56 pub path: Box<Path>,
57}
58
59impl DevParam for UdsVsockParam {
60 type Device = UdsVsock;
61
62 fn build(self, name: impl Into<Arc<str>>) -> Result<UdsVsock> {3x
63 UdsVsock::new(self, name)3x
64 }3x
65}
66
67#[derive(Debug)]
68pub struct UdsVsock {
69 name: Arc<str>,
70 config: Arc<VsockConfig>,
71 path: Box<Path>,
72 listener: UnixListener,
73 connections: HashMap<(u32, u32), Connection>,
74 ports: HashMap<Token, (u32, u32)>,
75 sockets: HashMap<Token, UnixStream>,
76 host_ports: HashMap<u32, u32>,
77 next_port: u32,
78}
79
80fn get_buf_size(stream: &UnixStream) -> Result<usize> {6x
81 let mut buf_size = 0i32;6x
82 let mut arg_size = size_of_val(&buf_size) as libc::socklen_t;6x
83 ffi!(unsafe {6x
84 libc::getsockopt(6x
85 stream.as_raw_fd(),6x
86 libc::SOL_SOCKET,
87 libc::SO_SNDBUF,
88 &mut buf_size as *mut _ as _,6x
89 &mut arg_size,6x
90 )
91 })?;
92 Ok(buf_size as usize)6x
93}6x
94
95impl UdsVsock {
96 fn allocate_port(&mut self) -> Option<u32> {3x
97 let mut count: u64 = 0;3x
98 while self.host_ports.contains_key(&self.next_port) && count < u32::MAX as u64 {3x
99 self.next_port = self.next_port.wrapping_add(1);
100 count += 1;
101 }
102 if count == u32::MAX as u64 {3x
103 None
104 } else {
105 Some(self.next_port)3x
106 }
107 }3x
108
109 fn create_socket(&mut self, registry: &Registry) -> Result<()> {3x
110 let (stream, _) = self.listener.accept()?;3x
111 stream.set_nonblocking(true)?;3x
112 let token = Token(stream.as_raw_fd() as usize);3x
113 registry.register(3x
114 &mut SourceFd(&stream.as_raw_fd()),3x
115 token,3x
116 Interest::READABLE,
117 )?;
118 self.sockets.insert(token, stream);3x
119 Ok(())3x
120 }3x
121
122 fn handle_conn_request<'m, Q, S>(3x
123 &mut self,3x
124 token: Token,3x
125 socket: UnixStream,3x
126 rx_q: &mut Queue<'_, 'm, Q>,3x
127 irq_sender: &S,3x
128 ) -> Result<()>3x
129 where3x
130 Q: VirtQueue<'m>,3x
131 S: IrqSender,3x
132 {
133 let mut msg = String::new();3x
134 let writer = socket.try_clone()?;3x
135 let mut reader = BufReader::new(socket);3x
136 let buf_size = get_buf_size(&writer)?;3x
137 reader.read_line(&mut msg)?;3x
138 let port_str = msg.trim_start_matches("CONNECT ").trim_end();3x
139 let Ok(port) = port_str.parse::<u32>() else {3x
140 log::error!("{}: failed to parse port {port_str}", self.name);
141 return Ok(());
142 };
143 let Some(host_port) = self.allocate_port() else {3x
144 log::error!("{}: failed to allocate port", self.name);
145 return Ok(());
146 };
147 let hdr = VsockHeader {3x
148 src_cid: VSOCK_CID_HOST,3x
149 dst_cid: self.config.guest_cid,3x
150 src_port: host_port,3x
151 dst_port: port,3x
152 type_: SOCKET_TYPE,3x
153 op: VsockOp::REQUEST,3x
154 fwd_cnt: Wrapping(0),3x
155 buf_alloc: buf_size as u32,3x
156 ..Default::default()3x
157 };3x
158 self.respond(&hdr, irq_sender, rx_q)?;3x
159 let conn = Connection {3x
160 state: ConnState::Requested,3x
161 reader,3x
162 writer: BufWriter::new(writer),3x
163 buf_alloc: buf_size as u32,3x
164 };3x
165 self.connections.insert((host_port, port), conn);3x
166 let count = self.host_ports.entry(host_port).or_default();3x
167 *count += 1;3x
168 log::trace!(3x
169 "{}: host:{host_port}: count incremented to {count}",
170 self.name
171 );
172 self.ports.insert(token, (host_port, port));3x
173 log::trace!("{}: host:{host_port} -> vm:{port}: requested", self.name);3x
174 Ok(())3x
175 }3x
176
177 fn respond_rst<'m, Q, S>(3x
178 &self,3x
179 hdr: &VsockHeader,3x
180 irq_sender: &S,3x
181 rx_q: &mut Queue<'_, 'm, Q>,3x
182 ) -> Result<()>3x
183 where3x
184 Q: VirtQueue<'m>,3x
185 S: IrqSender,3x
186 {
187 let resp = VsockHeader {3x
188 src_cid: VSOCK_CID_HOST,3x
189 dst_cid: self.config.guest_cid,3x
190 src_port: hdr.dst_port,3x
191 dst_port: hdr.src_port,3x
192 type_: hdr.type_,3x
193 op: VsockOp::RST,3x
194 ..Default::default()3x
195 };3x
196 self.respond(&resp, irq_sender, rx_q)3x
197 }3x
198
199 fn respond<'m, Q, S>(9x
200 &self,9x
201 hdr: &VsockHeader,9x
202 irq_sender: &S,9x
203 rx_q: &mut Queue<'_, 'm, Q>,9x
204 ) -> Result<()>9x
205 where9x
206 Q: VirtQueue<'m>,9x
207 S: IrqSender,9x
208 {
209 let mut hdr_buf = hdr.as_bytes();9x
210 rx_q.handle_desc(VsockVirtq::RX.raw(), irq_sender, |desc| {9x
211 if hdr_buf.is_empty() {6x
212 return Ok(Status::Break);
213 }6x
214 let c = hdr_buf.read_vectored(&mut desc.writable)? as u32;6x
215 Ok(Status::Done { len: c })6x
216 })?;6x
217 if !hdr_buf.is_empty() {9x
218 log::error!(3x
219 "{}: queue RX: no enough writable buffers for {:?}",
220 self.name,
221 hdr.op
222 );
223 return error::InvalidBuffer.fail();3x
224 }6x
225 Ok(())6x
226 }9x
227
228 fn handle_tx_response<'m, Q, S>(3x
229 &mut self,3x
230 hdr: &VsockHeader,3x
231 rx_q: &mut Queue<'_, 'm, Q>,3x
232 irq_sender: &S,3x
233 ) -> Result<()>3x
234 where3x
235 Q: VirtQueue<'m>,3x
236 S: IrqSender,3x
237 {
238 let host_port = hdr.dst_port;3x
239 let guest_port = hdr.src_port;3x
240 let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {3x
241 log::warn!(
242 "{}: vm:{guest_port} -> host:{host_port}: unknown connection",
243 self.name
244 );
245 return Ok(());
246 };
247 if conn.state != ConnState::Requested {3x
248 log::error!(
249 "{}: vm:{guest_port} -> host:{host_port}: found {:?}, expect {:?}",
250 self.name,
251 conn.state,
252 ConnState::Requested
253 );
254 return Ok(());
255 };3x
256 writeln!(conn.writer, "OK {host_port}")?;3x
257 conn.writer.flush()?;3x
258 conn.state = ConnState::Established {3x
259 fwd_cnt: Wrapping(0),3x
260 };3x
261 log::trace!(3x
262 "{}: host:{host_port} -> vm:{guest_port}: established",
263 self.name
264 );
265 self.transfer_rx_data(host_port, guest_port, rx_q, irq_sender)3x
266 }3x
267
268 fn remove_conn(&mut self, host_port: u32, guest_port: u32, registry: &Registry) -> Result<()> {6x
269 let Some(conn) = self.connections.remove(&(host_port, guest_port)) else {6x
270 log::warn!(
271 "{}: vm:{guest_port} -> host:{host_port}: unknown connection",
272 self.name
273 );
274 return Ok(());
275 };
276 let reader = conn.reader.into_inner();6x
277 let token = Token(reader.as_raw_fd() as usize);6x
278 self.ports.remove(&token);6x
279 if let Some(count) = self.host_ports.get_mut(&host_port) {6x
280 if *count == 1 {6x
281 self.host_ports.remove(&host_port);6x
282 log::trace!("{}: host:{host_port}: free port", self.name);6x
283 } else {
284 *count -= 1;
285 log::trace!(
286 "{}: host:{host_port}: count decremented to {count}",
287 self.name
288 );
289 }
290 } else {
291 log::error!(
292 "{}: vm:{guest_port} -> host:{host_port}: unknown host port",
293 self.name
294 );
295 }
296 registry.deregister(&mut SourceFd(&reader.as_raw_fd()))?;6x
297 Ok(())6x
298 }6x
299
300 fn handle_tx_rst(&mut self, hdr: &VsockHeader, registry: &Registry) -> Result<()> {3x
301 let host_port = hdr.dst_port;3x
302 let guest_port = hdr.src_port;3x
303 self.remove_conn(host_port, guest_port, registry)?;3x
304 log::trace!("{}: vm:{guest_port} -> host:{host_port}: reset", self.name);3x
305 Ok(())3x
306 }3x
307
308 fn handle_tx_shutdown<'m, Q, S>(6x
309 &mut self,6x
310 hdr: &VsockHeader,6x
311 registry: &Registry,6x
312 irq_sender: &S,6x
313 rx_q: &mut Queue<'_, 'm, Q>,6x
314 ) -> Result<()>6x
315 where6x
316 Q: VirtQueue<'m>,6x
317 S: IrqSender,6x
318 {
319 let host_port = hdr.dst_port;6x
320 let guest_port = hdr.src_port;6x
321 let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {6x
322 log::warn!(
323 "{}: vm:{guest_port} -> host:{host_port}: unknown connection",
324 self.name
325 );
326 return Ok(());
327 };
328 let mut flags = if let ConnState::Shutdown { flags } = conn.state {6x
329 flags3x
330 } else {
331 ShutdownFlag::empty()3x
332 };
333 flags |= ShutdownFlag::from_bits_truncate(hdr.flags);6x
334 if flags != ShutdownFlag::all() {6x
335 conn.state = ConnState::Shutdown { flags };3x
336 log::trace!(3x
337 "{}: vm:{guest_port} -> host:{host_port}: {flags:?}",
338 self.name
339 );
340 } else {
341 if let Err(e) = self.respond_rst(hdr, irq_sender, rx_q) {3x
342 log::error!("{}: failed to respond to shutdown: {e:?}", self.name);3x
343 }
344 self.remove_conn(host_port, guest_port, registry)?;3x
345 log::trace!(3x
346 "{}: vm:{guest_port} -> host:{host_port}: shutdown",
347 self.name
348 );
349 }
350 Ok(())6x
351 }6x
352
353 fn handle_tx_request<'m, Q, S>(3x
354 &mut self,3x
355 hdr: &VsockHeader,3x
356 registry: &Registry,3x
357 irq_sender: &S,3x
358 rx_q: &mut Queue<'_, 'm, Q>,3x
359 ) -> Result<()>3x
360 where3x
361 Q: VirtQueue<'m>,3x
362 S: IrqSender,3x
363 {
364 let host_port = hdr.dst_port;3x
365 let guest_port = hdr.src_port;3x
366 let port_socket = format!("{}_{host_port}", self.path.to_string_lossy());3x
367 let reader = match UnixStream::connect(&port_socket) {3x
368 Ok(reader) => reader,3x
369 Err(e) => {
370 log::error!("{}: failed to connect to {port_socket}: {e:?}", self.name);
371 return self.respond_rst(hdr, irq_sender, rx_q);
372 }
373 };
374 reader.set_nonblocking(true)?;3x
375 let writer = reader.try_clone()?;3x
376 let token = Token(reader.as_raw_fd() as usize);3x
377 registry.register(3x
378 &mut SourceFd(&reader.as_raw_fd()),3x
379 token,3x
380 Interest::READABLE,
381 )?;
382 let buf_size = get_buf_size(&writer)?;3x
383 let conn = Connection {3x
384 reader: BufReader::new(reader),3x
385 writer: BufWriter::new(writer),3x
386 buf_alloc: buf_size as u32,3x
387 state: ConnState::Established {3x
388 fwd_cnt: Wrapping(0),3x
389 },3x
390 };3x
391 let resp = VsockHeader {3x
392 src_cid: VSOCK_CID_HOST,3x
393 dst_cid: self.config.guest_cid,3x
394 src_port: host_port,3x
395 dst_port: guest_port,3x
396 type_: hdr.type_,3x
397 op: VsockOp::RESPONSE,3x
398 fwd_cnt: Wrapping(0),3x
399 buf_alloc: buf_size as u32,3x
400 ..Default::default()3x
401 };3x
402 self.respond(&resp, irq_sender, rx_q)?;3x
403 self.connections.insert((host_port, guest_port), conn);3x
404 let count = self.host_ports.entry(host_port).or_default();3x
405 *count += 1;3x
406 log::trace!(3x
407 "{}: host:{host_port}: count incremented to {count}",
408 self.name
409 );
410 self.ports.insert(token, (host_port, guest_port));3x
411 log::trace!(3x
412 "{}: vm:{guest_port} -> host:{host_port}: established",
413 self.name
414 );
415 Ok(())3x
416 }3x
417
418 fn handle_tx_desc<'m, Q, S>(18x
419 &mut self,18x
420 desc: &mut DescChain,18x
421 registry: &Registry,18x
422 irq_sender: &S,18x
423 rx_q: &mut Queue<'_, 'm, Q>,18x
424 ) -> Result<()>18x
425 where18x
426 Q: VirtQueue<'m>,18x
427 S: IrqSender,18x
428 {
429 let name = &*self.name;18x
430 let [buf, readable @ ..] = desc.readable.as_slice() else {18x
431 return error::InvalidBuffer.fail();
432 };
433 let Some((header, body)) = buf.split_first_chunk::<HEADER_SIZE>() else {18x
434 return error::InvalidBuffer.fail();
435 };
436 let Ok(hdr) = VsockHeader::ref_from_bytes(header) else {18x
437 return error::InvalidBuffer.fail();
438 };
439 if hdr.src_cid != self.config.guest_cid || hdr.dst_cid != VSOCK_CID_HOST {18x
440 log::warn!(
441 "{name}: invalid CID pair: {} -> {}",
442 hdr.src_cid,
443 hdr.dst_cid
444 );
445 }18x
446 log::trace!(18x
447 "{name}: vm:{} -> host:{}: {:?}",
448 hdr.src_port,
449 hdr.dst_port,
450 hdr.op
451 );
452 match hdr.op {18x
453 VsockOp::REQUEST => self.handle_tx_request(hdr, registry, irq_sender, rx_q),3x
454 VsockOp::RESPONSE => self.handle_tx_response(hdr, rx_q, irq_sender),3x
455 VsockOp::RST => self.handle_tx_rst(hdr, registry),3x
456 VsockOp::RW => self.transfer_tx_data(hdr, body, readable),3x
457 VsockOp::CREDIT_UPDATE => {
458 log::info!(
459 "{name}: CREDIT_UPDATE: fwd_cnt: {}, buf_alloc: {}",
460 hdr.fwd_cnt,
461 hdr.buf_alloc
462 );
463 Ok(())
464 }
465 VsockOp::SHUTDOWN => self.handle_tx_shutdown(hdr, registry, irq_sender, rx_q),6x
466 _ => {
467 log::error!("{name}: unsupported operation: {:?}", hdr.op);
468 Ok(())
469 }
470 }
471 }18x
472
473 fn handle_tx<'m, Q, S, E>(18x
474 &mut self,18x
475 active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,18x
476 ) -> Result<()>18x
477 where18x
478 Q: VirtQueue<'m>,18x
479 S: IrqSender,18x
480 E: IoeventFd,18x
481 {
482 let [Some(rx_q), Some(tx_q), ..] = active_mio.queues else {18x
483 let tx_index = VsockVirtq::TX.raw();
484 return error::InvalidQueueIndex { index: tx_index }.fail();
485 };
486 let irq_sender = active_mio.irq_sender;18x
487 let registry = active_mio.poll.registry();18x
488 tx_q.handle_desc(VsockVirtq::TX.raw(), irq_sender, |desc| {18x
489 self.handle_tx_desc(desc, registry, irq_sender, rx_q)?;18x
490 Ok(Status::Done { len: 0 })18x
491 })18x
492 }18x
493
494 fn transfer_rx_data<'m, Q, S>(6x
495 &mut self,6x
496 host_port: u32,6x
497 guest_port: u32,6x
498 rx_q: &mut Queue<'_, 'm, Q>,6x
499 irq_sender: &S,6x
500 ) -> Result<()>6x
501 where6x
502 Q: VirtQueue<'m>,6x
503 S: IrqSender,6x
504 {
505 fn copy_to_rx(3x
506 hdr: &mut VsockHeader,3x
507 conn: &mut BufReader<UnixStream>,3x
508 buffers: &mut [IoSliceMut],3x
509 ) -> Result<usize> {3x
510 let mut nskip = 0;3x
511 let mut nread = 0;3x
512 for buf in buffers.iter_mut() {9x
513 let r = if HEADER_SIZE > nskip {9x
514 let Some((_, data)) = buf.split_at_mut_checked(HEADER_SIZE - nskip) else {6x
515 nskip += buf.len();3x
516 continue;3x
517 };
518 nskip = HEADER_SIZE;3x
519 if data.is_empty() {3x
520 continue;
521 }3x
522 conn.read(data)3x
523 } else {
524 conn.read(buf)3x
525 };
526 let n = match r {3x
527 Ok(0) => break,
528 Ok(n) => n,3x
529 Err(e) if e.kind() == ErrorKind::WouldBlock => break,3x
530 Err(e) => Err(e)?,
531 };
532 nread += n;3x
533 }
534 if nskip != HEADER_SIZE {3x
535 return error::InvalidBuffer.fail();
536 }3x
537 hdr.len = nread as u32;3x
538 let mut hdr_buf = hdr.as_bytes();3x
539 let _ = hdr_buf.read_vectored(buffers);3x
540 Ok(nread)3x
541 }3x
542
543 let rx_idx = VsockVirtq::RX.raw();6x
544 let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {6x
545 log::warn!(
546 "{}: vm:{guest_port} -> host:{host_port}: unknown connection",
547 self.name
548 );
549 return Ok(());
550 };
551 let ConnState::Established { fwd_cnt } = conn.state else {6x
552 log::error!("{}: unexpected state {:?}", self.name, conn.state);
553 return Ok(());
554 };
555 let mut hdr = VsockHeader {6x
556 src_cid: VSOCK_CID_HOST,6x
557 dst_cid: self.config.guest_cid,6x
558 src_port: host_port,6x
559 dst_port: guest_port,6x
560 type_: SOCKET_TYPE,6x
561 op: VsockOp::RW,6x
562 fwd_cnt,6x
563 buf_alloc: conn.buf_alloc,6x
564 ..Default::default()6x
565 };6x
566 rx_q.handle_desc(rx_idx, irq_sender, |desc| {6x
567 let nread = copy_to_rx(&mut hdr, &mut conn.reader, &mut desc.writable)? as u32;3x
568 if nread == 0 {3x
569 return Ok(Status::Break);
570 }3x
571 log::trace!(3x
572 "{}: host:{host_port} -> vm:{guest_port}: transfered {nread} bytes",
573 self.name
574 );
575 Ok(Status::Done {3x
576 len: nread + HEADER_SIZE as u32,3x
577 })3x
578 })?;3x
579 Ok(())6x
580 }6x
581
582 fn transfer_tx_data(3x
583 &mut self,3x
584 hdr: &VsockHeader,3x
585 body: &[u8],3x
586 buffers: &[IoSlice],3x
587 ) -> Result<()> {3x
588 fn copy_to_conn(3x
589 buf: &[u8],3x
590 conn: &mut BufWriter<UnixStream>,3x
591 remain: &mut usize,3x
592 ) -> Result<()> {3x
593 if let Some(b) = buf.get(..*remain) {3x
594 conn.write_all(b)?;3x
595 *remain = 0;3x
596 } else {
597 conn.write_all(buf)?;
598 *remain -= buf.len();
599 }
600 Ok(())3x
601 }3x
602
603 let host_port = hdr.dst_port;3x
604 let guest_port = hdr.src_port;3x
605 let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {3x
606 log::warn!(
607 "{}: vm:{guest_port} -> host:{host_port}: unknown connection",
608 self.name
609 );
610 return Ok(());
611 };
612 let ConnState::Established { fwd_cnt } = &mut conn.state else {3x
613 log::warn!("{}: invalid connection state {:?}", self.name, conn.state);
614 return Ok(());
615 };
616 let mut remain = hdr.len as usize;3x
617 if !body.is_empty() {3x
618 copy_to_conn(body, &mut conn.writer, &mut remain)?;
619 }3x
620 for buf in buffers {3x
621 if remain == 0 {3x
622 break;
623 }3x
624 copy_to_conn(buf, &mut conn.writer, &mut remain)?;3x
625 }
626 if remain != 0 {3x
627 log::error!("{}: missing {remain} bytes", self.name);
628 return error::InvalidBuffer.fail();
629 }3x
630 *fwd_cnt += hdr.len;3x
631 log::trace!(3x
632 "{}: vm:{guest_port} -> host:{host_port}: transferred {} bytes",
633 self.name,
634 hdr.len
635 );
636 conn.writer.flush()?;3x
637 Ok(())3x
638 }3x
639}
640
641impl Drop for UdsVsock {
642 fn drop(&mut self) {3x
643 let Ok(addr) = self.listener.local_addr() else {3x
644 return;
645 };
646 let Some(path) = addr.as_pathname() else {3x
647 return;
648 };
649 if let Err(e) = fs::remove_file(path) {3x
650 log::error!("{}: error removing {path:?}: {e:?}", self.name);
651 }3x
652 }3x
653}
654
655#[derive(Debug, PartialEq, Eq)]
656pub enum ConnState {
657 Requested,
658 Established { fwd_cnt: Wrapping<u32> },
659 Shutdown { flags: ShutdownFlag },
660}
661
662#[derive(Debug)]
663pub struct Connection {
664 state: ConnState,
665 reader: BufReader<UnixStream>,
666 writer: BufWriter<UnixStream>,
667 buf_alloc: u32,
668}
669
670impl UdsVsock {
671 fn new(param: UdsVsockParam, name: impl Into<Arc<str>>) -> Result<Self> {3x
672 let name = name.into();3x
673 let listener = UnixListener::bind(&param.path)?;3x
674 listener.set_nonblocking(true)?;3x
675 let vsock = UdsVsock {3x
676 name,3x
677 path: param.path,3x
678 config: Arc::new(VsockConfig {3x
679 guest_cid: param.cid,3x
680 ..Default::default()3x
681 }),3x
682 listener,3x
683 connections: HashMap::new(),3x
684 sockets: HashMap::new(),3x
685 ports: HashMap::new(),3x
686 host_ports: HashMap::new(),3x
687 next_port: 1024,3x
688 };3x
689 Ok(vsock)3x
690 }3x
691}
692
693impl Virtio for UdsVsock {
694 type Config = VsockConfig;
695 type Feature = VsockFeature;
696
697 fn id(&self) -> DeviceId {3x
698 DeviceId::SOCKET3x
699 }3x
700
701 fn name(&self) -> &str {12x
702 &self.name12x
703 }12x
704
705 fn num_queues(&self) -> u16 {3x
706 33x
707 }3x
708
709 fn config(&self) -> Arc<VsockConfig> {3x
710 self.config.clone()3x
711 }3x
712
713 fn feature(&self) -> u128 {3x
714 VsockFeature::STREAM.bits() | FEATURE_BUILT_IN3x
715 }3x
716
717 fn spawn_worker<S, E>(3x
718 self,3x
719 event_rx: Receiver<WakeEvent<S, E>>,3x
720 memory: Arc<RamBus>,3x
721 queue_regs: Arc<[QueueReg]>,3x
722 ) -> Result<(JoinHandle<()>, Arc<Notifier>)>3x
723 where3x
724 S: IrqSender,3x
725 E: IoeventFd,3x
726 {
727 Mio::spawn_worker(self, event_rx, memory, queue_regs)3x
728 }3x
729}
730
731impl VirtioMio for UdsVsock {
732 fn activate<'m, Q, S, E>(3x
733 &mut self,3x
734 _feature: u128,3x
735 active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,3x
736 ) -> Result<()>3x
737 where3x
738 Q: VirtQueue<'m>,3x
739 S: IrqSender,3x
740 E: IoeventFd,3x
741 {
742 active_mio.poll.registry().register(3x
743 &mut SourceFd(&self.listener.as_raw_fd()),3x
744 Token(self.listener.as_raw_fd() as usize),3x
745 Interest::READABLE,
746 )?;
747 Ok(())3x
748 }3x
749
750 fn handle_event<'m, Q, S, E>(9x
751 &mut self,9x
752 event: &Event,9x
753 active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,9x
754 ) -> Result<()>9x
755 where9x
756 Q: VirtQueue<'m>,9x
757 S: IrqSender,9x
758 E: IoeventFd,9x
759 {
760 let token = event.token();9x
761 let registry = active_mio.poll.registry();9x
762 let irq_sender = active_mio.irq_sender;9x
763 let rx_index = VsockVirtq::RX.raw();9x
764 let Some(Some(rx_q)) = active_mio.queues.get_mut(rx_index as usize) else {9x
765 return error::InvalidQueueIndex { index: rx_index }.fail();
766 };
767 if token.0 == self.listener.as_raw_fd() as usize {9x
768 self.create_socket(registry)3x
769 } else if let Some(socket) = self.sockets.remove(&token) {6x
770 self.handle_conn_request(token, socket, rx_q, irq_sender)3x
771 } else if let Some(port_pair) = self.ports.get(&token) {3x
772 let (host_port, guest_port) = port_pair.to_owned();3x
773 self.transfer_rx_data(host_port, guest_port, rx_q, irq_sender)3x
774 } else {
775 log::error!("{}: invalid token: {token:#x?}", self.name);
776 Ok(())
777 }
778 }9x
779
780 fn handle_queue<'m, Q, S, E>(21x
781 &mut self,21x
782 index: u16,21x
783 active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,21x
784 ) -> Result<()>21x
785 where21x
786 Q: VirtQueue<'m>,21x
787 S: IrqSender,21x
788 E: IoeventFd,21x
789 {
790 let index = VsockVirtq::from(index);21x
791 let name = &self.name;21x
792 match index {21x
793 VsockVirtq::TX => self.handle_tx(active_mio)?,18x
794 VsockVirtq::RX => log::debug!("{name}: queue RX buffer available"),3x
795 VsockVirtq::EVENT => log::debug!("{name}: queue EVENT buffer available"),
796 _ => log::error!("{name}: unknown queue index {index:?}"),
797 }
798 Ok(())21x
799 }21x
800
801 fn reset(&mut self, registry: &Registry) {3x
802 for (_, conn) in self.connections.drain() {3x
803 let reader = conn.reader.into_inner();
804 if let Err(err) = registry.deregister(&mut SourceFd(&reader.as_raw_fd())) {
805 log::error!("{}: failed to deregister socket: {err}", self.name);
806 }
807 }
808 for (_, socket) in self.sockets.drain() {3x
809 if let Err(err) = registry.deregister(&mut SourceFd(&socket.as_raw_fd())) {
810 log::error!("{}: failed to deregister socket: {err}", self.name);
811 }
812 }
813 if let Err(err) = registry.deregister(&mut SourceFd(&self.listener.as_raw_fd())) {3x
814 log::error!("{}: failed to deregister listener: {err}", self.name);
815 }3x
816 self.host_ports.clear();3x
817 self.next_port = 1024;3x
818 }3x
819}
820
821#[cfg(test)]
822#[path = "uds_vsock_test.rs"]
823mod tests;
824