uds_vsock.rs86.82%
1
// Copyright 2025 Google LLC2
//3
// Licensed under the Apache License, Version 2.0 (the "License");4
// you may not use this file except in compliance with the License.5
// You may obtain a copy of the License at6
//7
// https://www.apache.org/licenses/LICENSE-2.08
//9
// Unless required by applicable law or agreed to in writing, software10
// distributed under the License is distributed on an "AS IS" BASIS,11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12
// See the License for the specific language governing permissions and13
// limitations under the License.14
15
use std::collections::HashMap;16
use std::fmt::Debug;17
use std::fs;18
use std::io::{BufRead, BufReader, BufWriter, ErrorKind, IoSlice, IoSliceMut, Read, Write};19
use std::mem::size_of_val;20
use std::num::Wrapping;21
use std::os::fd::AsRawFd;22
use std::os::unix::net::{UnixListener, UnixStream};23
use std::path::Path;24
use std::sync::Arc;25
use std::sync::mpsc::Receiver;26
use std::thread::JoinHandle;27
28
use crate::ffi;29
use crate::hv::IoeventFd;30
use crate::mem::mapped::RamBus;31
use crate::sync::notifier::Notifier;32
use crate::virtio::dev::vsock::{33
ShutdownFlag, VSOCK_CID_HOST, VsockConfig, VsockFeature, VsockHeader, VsockOp, VsockType,34
VsockVirtq,35
};36
use crate::virtio::dev::{DevParam, Virtio, WakeEvent};37
use crate::virtio::queue::{DescChain, Queue, QueueReg, Status, VirtQueue};38
use crate::virtio::worker::mio::{ActiveMio, Mio, VirtioMio};39
use crate::virtio::{DeviceId, FEATURE_BUILT_IN, IrqSender, Result, error};40
41
use mio::event::Event;42
use mio::unix::SourceFd;43
use mio::{Interest, Registry, Token};44
use serde::Deserialize;45
use serde_aco::Help;46
use zerocopy::{FromBytes, IntoBytes};47
48
const HEADER_SIZE: usize = size_of::<VsockHeader>();49
const SOCKET_TYPE: VsockType = VsockType::STREAM;50
51
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Help)]52
pub struct UdsVsockParam {53
/// Vsock context id.54
pub cid: u32,55
/// Host-side Unix domain socket path.56
pub path: Box<Path>,57
}58
59
impl DevParam for UdsVsockParam {60
type Device = UdsVsock;61
62
fn build(self, name: impl Into<Arc<str>>) -> Result<UdsVsock> {3x63
UdsVsock::new(self, name)3x64
}3x65
}66
67
#[derive(Debug)]68
pub struct UdsVsock {69
name: Arc<str>,70
config: Arc<VsockConfig>,71
path: Box<Path>,72
listener: UnixListener,73
connections: HashMap<(u32, u32), Connection>,74
ports: HashMap<Token, (u32, u32)>,75
sockets: HashMap<Token, UnixStream>,76
host_ports: HashMap<u32, u32>,77
next_port: u32,78
}79
80
fn get_buf_size(stream: &UnixStream) -> Result<usize> {6x81
let mut buf_size = 0i32;6x82
let mut arg_size = size_of_val(&buf_size) as libc::socklen_t;6x83
ffi!(unsafe {6x84
libc::getsockopt(6x85
stream.as_raw_fd(),6x86
libc::SOL_SOCKET,87
libc::SO_SNDBUF,88
&mut buf_size as *mut _ as _,6x89
&mut arg_size,6x90
)91
})?;92
Ok(buf_size as usize)6x93
}6x94
95
impl UdsVsock {96
fn allocate_port(&mut self) -> Option<u32> {3x97
let mut count: u64 = 0;3x98
while self.host_ports.contains_key(&self.next_port) && count < u32::MAX as u64 {3x99
self.next_port = self.next_port.wrapping_add(1);100
count += 1;101
}102
if count == u32::MAX as u64 {3x103
None104
} else {105
Some(self.next_port)3x106
}107
}3x108
109
fn create_socket(&mut self, registry: &Registry) -> Result<()> {3x110
let (stream, _) = self.listener.accept()?;3x111
stream.set_nonblocking(true)?;3x112
let token = Token(stream.as_raw_fd() as usize);3x113
registry.register(3x114
&mut SourceFd(&stream.as_raw_fd()),3x115
token,3x116
Interest::READABLE,117
)?;118
self.sockets.insert(token, stream);3x119
Ok(())3x120
}3x121
122
fn handle_conn_request<'m, Q, S>(3x123
&mut self,3x124
token: Token,3x125
socket: UnixStream,3x126
rx_q: &mut Queue<'_, 'm, Q>,3x127
irq_sender: &S,3x128
) -> Result<()>3x129
where3x130
Q: VirtQueue<'m>,3x131
S: IrqSender,3x132
{133
let mut msg = String::new();3x134
let writer = socket.try_clone()?;3x135
let mut reader = BufReader::new(socket);3x136
let buf_size = get_buf_size(&writer)?;3x137
reader.read_line(&mut msg)?;3x138
let port_str = msg.trim_start_matches("CONNECT ").trim_end();3x139
let Ok(port) = port_str.parse::<u32>() else {3x140
log::error!("{}: failed to parse port {port_str}", self.name);141
return Ok(());142
};143
let Some(host_port) = self.allocate_port() else {3x144
log::error!("{}: failed to allocate port", self.name);145
return Ok(());146
};147
let hdr = VsockHeader {3x148
src_cid: VSOCK_CID_HOST,3x149
dst_cid: self.config.guest_cid,3x150
src_port: host_port,3x151
dst_port: port,3x152
type_: SOCKET_TYPE,3x153
op: VsockOp::REQUEST,3x154
fwd_cnt: Wrapping(0),3x155
buf_alloc: buf_size as u32,3x156
..Default::default()3x157
};3x158
self.respond(&hdr, irq_sender, rx_q)?;3x159
let conn = Connection {3x160
state: ConnState::Requested,3x161
reader,3x162
writer: BufWriter::new(writer),3x163
buf_alloc: buf_size as u32,3x164
};3x165
self.connections.insert((host_port, port), conn);3x166
let count = self.host_ports.entry(host_port).or_default();3x167
*count += 1;3x168
log::trace!(3x169
"{}: host:{host_port}: count incremented to {count}",170
self.name171
);172
self.ports.insert(token, (host_port, port));3x173
log::trace!("{}: host:{host_port} -> vm:{port}: requested", self.name);3x174
Ok(())3x175
}3x176
177
fn respond_rst<'m, Q, S>(3x178
&self,3x179
hdr: &VsockHeader,3x180
irq_sender: &S,3x181
rx_q: &mut Queue<'_, 'm, Q>,3x182
) -> Result<()>3x183
where3x184
Q: VirtQueue<'m>,3x185
S: IrqSender,3x186
{187
let resp = VsockHeader {3x188
src_cid: VSOCK_CID_HOST,3x189
dst_cid: self.config.guest_cid,3x190
src_port: hdr.dst_port,3x191
dst_port: hdr.src_port,3x192
type_: hdr.type_,3x193
op: VsockOp::RST,3x194
..Default::default()3x195
};3x196
self.respond(&resp, irq_sender, rx_q)3x197
}3x198
199
fn respond<'m, Q, S>(9x200
&self,9x201
hdr: &VsockHeader,9x202
irq_sender: &S,9x203
rx_q: &mut Queue<'_, 'm, Q>,9x204
) -> Result<()>9x205
where9x206
Q: VirtQueue<'m>,9x207
S: IrqSender,9x208
{209
let mut hdr_buf = hdr.as_bytes();9x210
rx_q.handle_desc(VsockVirtq::RX.raw(), irq_sender, |desc| {9x211
if hdr_buf.is_empty() {6x212
return Ok(Status::Break);213
}6x214
let c = hdr_buf.read_vectored(&mut desc.writable)? as u32;6x215
Ok(Status::Done { len: c })6x216
})?;6x217
if !hdr_buf.is_empty() {9x218
log::error!(3x219
"{}: queue RX: no enough writable buffers for {:?}",220
self.name,221
hdr.op222
);223
return error::InvalidBuffer.fail();3x224
}6x225
Ok(())6x226
}9x227
228
fn handle_tx_response<'m, Q, S>(3x229
&mut self,3x230
hdr: &VsockHeader,3x231
rx_q: &mut Queue<'_, 'm, Q>,3x232
irq_sender: &S,3x233
) -> Result<()>3x234
where3x235
Q: VirtQueue<'m>,3x236
S: IrqSender,3x237
{238
let host_port = hdr.dst_port;3x239
let guest_port = hdr.src_port;3x240
let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {3x241
log::warn!(242
"{}: vm:{guest_port} -> host:{host_port}: unknown connection",243
self.name244
);245
return Ok(());246
};247
if conn.state != ConnState::Requested {3x248
log::error!(249
"{}: vm:{guest_port} -> host:{host_port}: found {:?}, expect {:?}",250
self.name,251
conn.state,252
ConnState::Requested253
);254
return Ok(());255
};3x256
writeln!(conn.writer, "OK {host_port}")?;3x257
conn.writer.flush()?;3x258
conn.state = ConnState::Established {3x259
fwd_cnt: Wrapping(0),3x260
};3x261
log::trace!(3x262
"{}: host:{host_port} -> vm:{guest_port}: established",263
self.name264
);265
self.transfer_rx_data(host_port, guest_port, rx_q, irq_sender)3x266
}3x267
268
fn remove_conn(&mut self, host_port: u32, guest_port: u32, registry: &Registry) -> Result<()> {6x269
let Some(conn) = self.connections.remove(&(host_port, guest_port)) else {6x270
log::warn!(271
"{}: vm:{guest_port} -> host:{host_port}: unknown connection",272
self.name273
);274
return Ok(());275
};276
let reader = conn.reader.into_inner();6x277
let token = Token(reader.as_raw_fd() as usize);6x278
self.ports.remove(&token);6x279
if let Some(count) = self.host_ports.get_mut(&host_port) {6x280
if *count == 1 {6x281
self.host_ports.remove(&host_port);6x282
log::trace!("{}: host:{host_port}: free port", self.name);6x283
} else {284
*count -= 1;285
log::trace!(286
"{}: host:{host_port}: count decremented to {count}",287
self.name288
);289
}290
} else {291
log::error!(292
"{}: vm:{guest_port} -> host:{host_port}: unknown host port",293
self.name294
);295
}296
registry.deregister(&mut SourceFd(&reader.as_raw_fd()))?;6x297
Ok(())6x298
}6x299
300
fn handle_tx_rst(&mut self, hdr: &VsockHeader, registry: &Registry) -> Result<()> {3x301
let host_port = hdr.dst_port;3x302
let guest_port = hdr.src_port;3x303
self.remove_conn(host_port, guest_port, registry)?;3x304
log::trace!("{}: vm:{guest_port} -> host:{host_port}: reset", self.name);3x305
Ok(())3x306
}3x307
308
fn handle_tx_shutdown<'m, Q, S>(6x309
&mut self,6x310
hdr: &VsockHeader,6x311
registry: &Registry,6x312
irq_sender: &S,6x313
rx_q: &mut Queue<'_, 'm, Q>,6x314
) -> Result<()>6x315
where6x316
Q: VirtQueue<'m>,6x317
S: IrqSender,6x318
{319
let host_port = hdr.dst_port;6x320
let guest_port = hdr.src_port;6x321
let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {6x322
log::warn!(323
"{}: vm:{guest_port} -> host:{host_port}: unknown connection",324
self.name325
);326
return Ok(());327
};328
let mut flags = if let ConnState::Shutdown { flags } = conn.state {6x329
flags3x330
} else {331
ShutdownFlag::empty()3x332
};333
flags |= ShutdownFlag::from_bits_truncate(hdr.flags);6x334
if flags != ShutdownFlag::all() {6x335
conn.state = ConnState::Shutdown { flags };3x336
log::trace!(3x337
"{}: vm:{guest_port} -> host:{host_port}: {flags:?}",338
self.name339
);340
} else {341
if let Err(e) = self.respond_rst(hdr, irq_sender, rx_q) {3x342
log::error!("{}: failed to respond to shutdown: {e:?}", self.name);3x343
}344
self.remove_conn(host_port, guest_port, registry)?;3x345
log::trace!(3x346
"{}: vm:{guest_port} -> host:{host_port}: shutdown",347
self.name348
);349
}350
Ok(())6x351
}6x352
353
fn handle_tx_request<'m, Q, S>(3x354
&mut self,3x355
hdr: &VsockHeader,3x356
registry: &Registry,3x357
irq_sender: &S,3x358
rx_q: &mut Queue<'_, 'm, Q>,3x359
) -> Result<()>3x360
where3x361
Q: VirtQueue<'m>,3x362
S: IrqSender,3x363
{364
let host_port = hdr.dst_port;3x365
let guest_port = hdr.src_port;3x366
let port_socket = format!("{}_{host_port}", self.path.to_string_lossy());3x367
let reader = match UnixStream::connect(&port_socket) {3x368
Ok(reader) => reader,3x369
Err(e) => {370
log::error!("{}: failed to connect to {port_socket}: {e:?}", self.name);371
return self.respond_rst(hdr, irq_sender, rx_q);372
}373
};374
reader.set_nonblocking(true)?;3x375
let writer = reader.try_clone()?;3x376
let token = Token(reader.as_raw_fd() as usize);3x377
registry.register(3x378
&mut SourceFd(&reader.as_raw_fd()),3x379
token,3x380
Interest::READABLE,381
)?;382
let buf_size = get_buf_size(&writer)?;3x383
let conn = Connection {3x384
reader: BufReader::new(reader),3x385
writer: BufWriter::new(writer),3x386
buf_alloc: buf_size as u32,3x387
state: ConnState::Established {3x388
fwd_cnt: Wrapping(0),3x389
},3x390
};3x391
let resp = VsockHeader {3x392
src_cid: VSOCK_CID_HOST,3x393
dst_cid: self.config.guest_cid,3x394
src_port: host_port,3x395
dst_port: guest_port,3x396
type_: hdr.type_,3x397
op: VsockOp::RESPONSE,3x398
fwd_cnt: Wrapping(0),3x399
buf_alloc: buf_size as u32,3x400
..Default::default()3x401
};3x402
self.respond(&resp, irq_sender, rx_q)?;3x403
self.connections.insert((host_port, guest_port), conn);3x404
let count = self.host_ports.entry(host_port).or_default();3x405
*count += 1;3x406
log::trace!(3x407
"{}: host:{host_port}: count incremented to {count}",408
self.name409
);410
self.ports.insert(token, (host_port, guest_port));3x411
log::trace!(3x412
"{}: vm:{guest_port} -> host:{host_port}: established",413
self.name414
);415
Ok(())3x416
}3x417
418
fn handle_tx_desc<'m, Q, S>(18x419
&mut self,18x420
desc: &mut DescChain,18x421
registry: &Registry,18x422
irq_sender: &S,18x423
rx_q: &mut Queue<'_, 'm, Q>,18x424
) -> Result<()>18x425
where18x426
Q: VirtQueue<'m>,18x427
S: IrqSender,18x428
{429
let name = &*self.name;18x430
let [buf, readable @ ..] = desc.readable.as_slice() else {18x431
return error::InvalidBuffer.fail();432
};433
let Some((header, body)) = buf.split_first_chunk::<HEADER_SIZE>() else {18x434
return error::InvalidBuffer.fail();435
};436
let Ok(hdr) = VsockHeader::ref_from_bytes(header) else {18x437
return error::InvalidBuffer.fail();438
};439
if hdr.src_cid != self.config.guest_cid || hdr.dst_cid != VSOCK_CID_HOST {18x440
log::warn!(441
"{name}: invalid CID pair: {} -> {}",442
hdr.src_cid,443
hdr.dst_cid444
);445
}18x446
log::trace!(18x447
"{name}: vm:{} -> host:{}: {:?}",448
hdr.src_port,449
hdr.dst_port,450
hdr.op451
);452
match hdr.op {18x453
VsockOp::REQUEST => self.handle_tx_request(hdr, registry, irq_sender, rx_q),3x454
VsockOp::RESPONSE => self.handle_tx_response(hdr, rx_q, irq_sender),3x455
VsockOp::RST => self.handle_tx_rst(hdr, registry),3x456
VsockOp::RW => self.transfer_tx_data(hdr, body, readable),3x457
VsockOp::CREDIT_UPDATE => {458
log::info!(459
"{name}: CREDIT_UPDATE: fwd_cnt: {}, buf_alloc: {}",460
hdr.fwd_cnt,461
hdr.buf_alloc462
);463
Ok(())464
}465
VsockOp::SHUTDOWN => self.handle_tx_shutdown(hdr, registry, irq_sender, rx_q),6x466
_ => {467
log::error!("{name}: unsupported operation: {:?}", hdr.op);468
Ok(())469
}470
}471
}18x472
473
fn handle_tx<'m, Q, S, E>(18x474
&mut self,18x475
active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,18x476
) -> Result<()>18x477
where18x478
Q: VirtQueue<'m>,18x479
S: IrqSender,18x480
E: IoeventFd,18x481
{482
let [Some(rx_q), Some(tx_q), ..] = active_mio.queues else {18x483
let tx_index = VsockVirtq::TX.raw();484
return error::InvalidQueueIndex { index: tx_index }.fail();485
};486
let irq_sender = active_mio.irq_sender;18x487
let registry = active_mio.poll.registry();18x488
tx_q.handle_desc(VsockVirtq::TX.raw(), irq_sender, |desc| {18x489
self.handle_tx_desc(desc, registry, irq_sender, rx_q)?;18x490
Ok(Status::Done { len: 0 })18x491
})18x492
}18x493
494
fn transfer_rx_data<'m, Q, S>(6x495
&mut self,6x496
host_port: u32,6x497
guest_port: u32,6x498
rx_q: &mut Queue<'_, 'm, Q>,6x499
irq_sender: &S,6x500
) -> Result<()>6x501
where6x502
Q: VirtQueue<'m>,6x503
S: IrqSender,6x504
{505
fn copy_to_rx(3x506
hdr: &mut VsockHeader,3x507
conn: &mut BufReader<UnixStream>,3x508
buffers: &mut [IoSliceMut],3x509
) -> Result<usize> {3x510
let mut nskip = 0;3x511
let mut nread = 0;3x512
for buf in buffers.iter_mut() {9x513
let r = if HEADER_SIZE > nskip {9x514
let Some((_, data)) = buf.split_at_mut_checked(HEADER_SIZE - nskip) else {6x515
nskip += buf.len();3x516
continue;3x517
};518
nskip = HEADER_SIZE;3x519
if data.is_empty() {3x520
continue;521
}3x522
conn.read(data)3x523
} else {524
conn.read(buf)3x525
};526
let n = match r {3x527
Ok(0) => break,528
Ok(n) => n,3x529
Err(e) if e.kind() == ErrorKind::WouldBlock => break,3x530
Err(e) => Err(e)?,531
};532
nread += n;3x533
}534
if nskip != HEADER_SIZE {3x535
return error::InvalidBuffer.fail();536
}3x537
hdr.len = nread as u32;3x538
let mut hdr_buf = hdr.as_bytes();3x539
let _ = hdr_buf.read_vectored(buffers);3x540
Ok(nread)3x541
}3x542
543
let rx_idx = VsockVirtq::RX.raw();6x544
let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {6x545
log::warn!(546
"{}: vm:{guest_port} -> host:{host_port}: unknown connection",547
self.name548
);549
return Ok(());550
};551
let ConnState::Established { fwd_cnt } = conn.state else {6x552
log::error!("{}: unexpected state {:?}", self.name, conn.state);553
return Ok(());554
};555
let mut hdr = VsockHeader {6x556
src_cid: VSOCK_CID_HOST,6x557
dst_cid: self.config.guest_cid,6x558
src_port: host_port,6x559
dst_port: guest_port,6x560
type_: SOCKET_TYPE,6x561
op: VsockOp::RW,6x562
fwd_cnt,6x563
buf_alloc: conn.buf_alloc,6x564
..Default::default()6x565
};6x566
rx_q.handle_desc(rx_idx, irq_sender, |desc| {6x567
let nread = copy_to_rx(&mut hdr, &mut conn.reader, &mut desc.writable)? as u32;3x568
if nread == 0 {3x569
return Ok(Status::Break);570
}3x571
log::trace!(3x572
"{}: host:{host_port} -> vm:{guest_port}: transfered {nread} bytes",573
self.name574
);575
Ok(Status::Done {3x576
len: nread + HEADER_SIZE as u32,3x577
})3x578
})?;3x579
Ok(())6x580
}6x581
582
fn transfer_tx_data(3x583
&mut self,3x584
hdr: &VsockHeader,3x585
body: &[u8],3x586
buffers: &[IoSlice],3x587
) -> Result<()> {3x588
fn copy_to_conn(3x589
buf: &[u8],3x590
conn: &mut BufWriter<UnixStream>,3x591
remain: &mut usize,3x592
) -> Result<()> {3x593
if let Some(b) = buf.get(..*remain) {3x594
conn.write_all(b)?;3x595
*remain = 0;3x596
} else {597
conn.write_all(buf)?;598
*remain -= buf.len();599
}600
Ok(())3x601
}3x602
603
let host_port = hdr.dst_port;3x604
let guest_port = hdr.src_port;3x605
let Some(conn) = self.connections.get_mut(&(host_port, guest_port)) else {3x606
log::warn!(607
"{}: vm:{guest_port} -> host:{host_port}: unknown connection",608
self.name609
);610
return Ok(());611
};612
let ConnState::Established { fwd_cnt } = &mut conn.state else {3x613
log::warn!("{}: invalid connection state {:?}", self.name, conn.state);614
return Ok(());615
};616
let mut remain = hdr.len as usize;3x617
if !body.is_empty() {3x618
copy_to_conn(body, &mut conn.writer, &mut remain)?;619
}3x620
for buf in buffers {3x621
if remain == 0 {3x622
break;623
}3x624
copy_to_conn(buf, &mut conn.writer, &mut remain)?;3x625
}626
if remain != 0 {3x627
log::error!("{}: missing {remain} bytes", self.name);628
return error::InvalidBuffer.fail();629
}3x630
*fwd_cnt += hdr.len;3x631
log::trace!(3x632
"{}: vm:{guest_port} -> host:{host_port}: transferred {} bytes",633
self.name,634
hdr.len635
);636
conn.writer.flush()?;3x637
Ok(())3x638
}3x639
}640
641
impl Drop for UdsVsock {642
fn drop(&mut self) {3x643
let Ok(addr) = self.listener.local_addr() else {3x644
return;645
};646
let Some(path) = addr.as_pathname() else {3x647
return;648
};649
if let Err(e) = fs::remove_file(path) {3x650
log::error!("{}: error removing {path:?}: {e:?}", self.name);651
}3x652
}3x653
}654
655
#[derive(Debug, PartialEq, Eq)]656
pub enum ConnState {657
Requested,658
Established { fwd_cnt: Wrapping<u32> },659
Shutdown { flags: ShutdownFlag },660
}661
662
#[derive(Debug)]663
pub struct Connection {664
state: ConnState,665
reader: BufReader<UnixStream>,666
writer: BufWriter<UnixStream>,667
buf_alloc: u32,668
}669
670
impl UdsVsock {671
fn new(param: UdsVsockParam, name: impl Into<Arc<str>>) -> Result<Self> {3x672
let name = name.into();3x673
let listener = UnixListener::bind(¶m.path)?;3x674
listener.set_nonblocking(true)?;3x675
let vsock = UdsVsock {3x676
name,3x677
path: param.path,3x678
config: Arc::new(VsockConfig {3x679
guest_cid: param.cid,3x680
..Default::default()3x681
}),3x682
listener,3x683
connections: HashMap::new(),3x684
sockets: HashMap::new(),3x685
ports: HashMap::new(),3x686
host_ports: HashMap::new(),3x687
next_port: 1024,3x688
};3x689
Ok(vsock)3x690
}3x691
}692
693
impl Virtio for UdsVsock {694
type Config = VsockConfig;695
type Feature = VsockFeature;696
697
fn id(&self) -> DeviceId {3x698
DeviceId::SOCKET3x699
}3x700
701
fn name(&self) -> &str {12x702
&self.name12x703
}12x704
705
fn num_queues(&self) -> u16 {3x706
33x707
}3x708
709
fn config(&self) -> Arc<VsockConfig> {3x710
self.config.clone()3x711
}3x712
713
fn feature(&self) -> u128 {3x714
VsockFeature::STREAM.bits() | FEATURE_BUILT_IN3x715
}3x716
717
fn spawn_worker<S, E>(3x718
self,3x719
event_rx: Receiver<WakeEvent<S, E>>,3x720
memory: Arc<RamBus>,3x721
queue_regs: Arc<[QueueReg]>,3x722
) -> Result<(JoinHandle<()>, Arc<Notifier>)>3x723
where3x724
S: IrqSender,3x725
E: IoeventFd,3x726
{727
Mio::spawn_worker(self, event_rx, memory, queue_regs)3x728
}3x729
}730
731
impl VirtioMio for UdsVsock {732
fn activate<'m, Q, S, E>(3x733
&mut self,3x734
_feature: u128,3x735
active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,3x736
) -> Result<()>3x737
where3x738
Q: VirtQueue<'m>,3x739
S: IrqSender,3x740
E: IoeventFd,3x741
{742
active_mio.poll.registry().register(3x743
&mut SourceFd(&self.listener.as_raw_fd()),3x744
Token(self.listener.as_raw_fd() as usize),3x745
Interest::READABLE,746
)?;747
Ok(())3x748
}3x749
750
fn handle_event<'m, Q, S, E>(9x751
&mut self,9x752
event: &Event,9x753
active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,9x754
) -> Result<()>9x755
where9x756
Q: VirtQueue<'m>,9x757
S: IrqSender,9x758
E: IoeventFd,9x759
{760
let token = event.token();9x761
let registry = active_mio.poll.registry();9x762
let irq_sender = active_mio.irq_sender;9x763
let rx_index = VsockVirtq::RX.raw();9x764
let Some(Some(rx_q)) = active_mio.queues.get_mut(rx_index as usize) else {9x765
return error::InvalidQueueIndex { index: rx_index }.fail();766
};767
if token.0 == self.listener.as_raw_fd() as usize {9x768
self.create_socket(registry)3x769
} else if let Some(socket) = self.sockets.remove(&token) {6x770
self.handle_conn_request(token, socket, rx_q, irq_sender)3x771
} else if let Some(port_pair) = self.ports.get(&token) {3x772
let (host_port, guest_port) = port_pair.to_owned();3x773
self.transfer_rx_data(host_port, guest_port, rx_q, irq_sender)3x774
} else {775
log::error!("{}: invalid token: {token:#x?}", self.name);776
Ok(())777
}778
}9x779
780
fn handle_queue<'m, Q, S, E>(21x781
&mut self,21x782
index: u16,21x783
active_mio: &mut ActiveMio<'_, '_, 'm, Q, S, E>,21x784
) -> Result<()>21x785
where21x786
Q: VirtQueue<'m>,21x787
S: IrqSender,21x788
E: IoeventFd,21x789
{790
let index = VsockVirtq::from(index);21x791
let name = &self.name;21x792
match index {21x793
VsockVirtq::TX => self.handle_tx(active_mio)?,18x794
VsockVirtq::RX => log::debug!("{name}: queue RX buffer available"),3x795
VsockVirtq::EVENT => log::debug!("{name}: queue EVENT buffer available"),796
_ => log::error!("{name}: unknown queue index {index:?}"),797
}798
Ok(())21x799
}21x800
801
fn reset(&mut self, registry: &Registry) {3x802
for (_, conn) in self.connections.drain() {3x803
let reader = conn.reader.into_inner();804
if let Err(err) = registry.deregister(&mut SourceFd(&reader.as_raw_fd())) {805
log::error!("{}: failed to deregister socket: {err}", self.name);806
}807
}808
for (_, socket) in self.sockets.drain() {3x809
if let Err(err) = registry.deregister(&mut SourceFd(&socket.as_raw_fd())) {810
log::error!("{}: failed to deregister socket: {err}", self.name);811
}812
}813
if let Err(err) = registry.deregister(&mut SourceFd(&self.listener.as_raw_fd())) {3x814
log::error!("{}: failed to deregister listener: {err}", self.name);815
}3x816
self.host_ports.clear();3x817
self.next_port = 1024;3x818
}3x819
}820
821
#[cfg(test)]822
#[path = "uds_vsock_test.rs"]823
mod tests;824