heliping 发表于 2023-06-13 17:00
需要类似c++的进程间内存共享。 使用场景为车载系统的同一主机下的不同终端的进程间通信。 由于处理视频流,需要高速通信。
shm库亲测可用。多谢各位大佬。 https://crates.io/crates/shm
试试dbus吧,https://docs.rs/zbus
用过C和C#的nng,感觉还不错,看到有rust的绑定,不过似乎停止维护了
原理和平台接口大同小异,如果对你的程序来说是非常关键的功能,那么最好自己能亲手实现并改进它。
-- 👇 heliping: 多谢大神, 请问有linux的方案吗?
-- 👇 gqf2008:
/// 进程间自旋锁 #[cfg(target_os = "windows")] use super::windows::{Error, SharedMemory}; use core::hint; use std::cell::UnsafeCell; use std::intrinsics::*; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; pub struct Mutex<T>(SharedMemory, UnsafeCell<T>); unsafe impl<T> Send for Mutex<T> where T: Send {} unsafe impl<T> Sync for Mutex<T> where T: Sync {} pub struct MutexGuard<'a, T>(&'a Mutex<T>, PhantomData<T>); impl<'a, T> Drop for MutexGuard<'a, T> { fn drop(&mut self) { let _ = self.0.unlock(); } } impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; fn deref(&self) -> &T { unsafe { &*self.0 .1.get() } } } impl<'a, T> DerefMut for MutexGuard<'a, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.0 .1.get() } } } impl<T> Mutex<T> { pub fn new<S: AsRef<str>>(key: S, resource: T) -> Result<Self, Error> { Ok(Self(SharedMemory::new(key, 8)?, UnsafeCell::new(resource))) } } impl<T> Mutex<T> { pub fn lock(&self) -> MutexGuard<'_, T> { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; loop { if compare_exchange_weak(shm, 0, 1) { return MutexGuard(self, PhantomData); } if num_cpus::get() > 1 { for n in (0..2048).map(|i| i << 1).into_iter() { (0..n).for_each(|_| hint::spin_loop()); if compare_exchange_weak(shm, 0, 1) { return MutexGuard(self, PhantomData); } } } std::thread::yield_now(); } } pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, ()> { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; if compare_exchange_weak(shm, 0, 1) { Ok(MutexGuard(self, PhantomData)) } else { Err(()) } } pub fn is_locked(&self) -> bool { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; load_relaxed(shm) == 1 } fn unlock(&self) { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; compare_exchange(shm, 1, 0); } fn try_unlock(&self) -> Result<(), ()> { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; if compare_exchange(shm, 1, 0) { Ok(()) } else { Err(()) } } pub fn force_unlock(&self) { let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) }; store_seqcst(shm, 0) } } pub struct RwLock<T: ?Sized> { lock: SharedMemory, data: T, } const READER: usize = 1 << 2; const UPGRADED: usize = 1 << 1; const WRITER: usize = 1; pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { lock: &'a SharedMemory, data: &'a T, } pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> { inner: &'a RwLock<T>, data: &'a mut T, } pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized> { inner: &'a RwLock<T>, data: &'a T, } unsafe impl<T: ?Sized + Send> Send for RwLock<T> {} unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {} #[inline] fn compare_exchange_weak<T: Copy>(dst: *mut T, old: T, new: T) -> bool { let (_val, ok) = unsafe { atomic_cxchgweak_acquire_relaxed(dst, old, new) }; ok } #[inline] fn compare_exchange<T: Copy>(dst: *mut T, old: T, new: T) -> bool { let (_val, ok) = unsafe { atomic_cxchg_acquire_relaxed(dst, old, new) }; ok } #[inline] fn store_seqcst<T: Copy>(dst: *mut T, val: T) { unsafe { atomic_store_seqcst(dst, val) } } #[inline] fn load_relaxed<T: Copy>(dst: *mut T) -> T { unsafe { atomic_load_relaxed(dst) } } use std::ffi::c_void; use std::ffi::CString; use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use std::ptr::null_mut as NULL; use std::ptr::NonNull; use std::time::Duration; use thiserror::Error; use winapi::shared::winerror::ERROR_SUCCESS; use winapi::um::errhandlingapi::GetLastError; use winapi::um::handleapi::CloseHandle; use winapi::um::handleapi::INVALID_HANDLE_VALUE; use winapi::um::memoryapi::*; use winapi::um::processthreadsapi::*; use winapi::um::securitybaseapi::AdjustTokenPrivileges; use winapi::um::synchapi::*; use winapi::um::winbase::*; use winapi::um::winnt::*; #[derive(Debug, Error)] pub enum Error { #[error("OsWaitAbandoned")] WaitAbandoned, #[error("OsWaitTimeout")] WaitTimeout, #[error("OsError {0}")] OsError(u32), #[error("OsCallError {0}")] Error(i32), #[error("OsErrorMappingSize {0}/{1}")] ErrorMappingSize(usize, usize), } pub struct Semaphore(NonNull<c_void>); impl Drop for Semaphore { fn drop(&mut self) { unsafe { CloseHandle(self.0.as_ptr()); } } } impl Semaphore { pub fn new<S: AsRef<str>>(key: S, init: i32, max: i32) -> Result<Self, Error> { let key = CString::new(key.as_ref()).unwrap(); let handle = unsafe { CreateSemaphoreA(NULL(), init, max, key.as_ptr()) }; if handle.is_null() { return Err(last_error()); } Ok(Self(unsafe { NonNull::new_unchecked(handle) })) } } impl Semaphore { pub fn release(&self, n: i32) -> Result<(), Error> { if unsafe { ReleaseSemaphore(self.0.as_ptr(), n, NULL()) } == 0 { return Err(last_error()); } Ok(()) } pub fn wait(&self) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), None) } } pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) } } } pub struct Event(NonNull<c_void>); impl Drop for Event { fn drop(&mut self) { unsafe { CloseHandle(self.0.as_ptr()); } } } impl Event { pub fn new<S: AsRef<str>>(key: S, manual_reset: bool, init_state: bool) -> Result<Self, Error> { let key = CString::new(key.as_ref()).unwrap(); let handle = unsafe { CreateEventA( NULL(), if manual_reset { 1 } else { 0 }, if init_state { 1 } else { 0 }, key.as_ptr(), ) }; if handle.is_null() { return Err(last_error()); } Ok(Self(unsafe { NonNull::new_unchecked(handle) })) } } // 自动模式下的WaitForSingleObject + SetEvent,一次唤醒一个线程 // 手段模式下的WaitForSingleObject + SetEvent + ResetEvent,一次唤醒一批线程 // 手段模式下的WaitForSingleObject + PulseEvent ,一次唤醒所有线程 impl Event { pub fn set(&self) -> Result<(), Error> { if unsafe { SetEvent(self.0.as_ptr()) } == 0 { return Err(last_error()); } Ok(()) } pub fn reset(&self) -> Result<(), Error> { if unsafe { ResetEvent(self.0.as_ptr()) } == 0 { return Err(last_error()); } Ok(()) } pub fn pulse(&self) -> Result<(), Error> { if unsafe { PulseEvent(self.0.as_ptr()) } == 0 { return Err(last_error()); } Ok(()) } pub fn wait(&self) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), None) } } pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) } } } pub struct Mutex(NonNull<c_void>); impl Drop for Mutex { fn drop(&mut self) { unsafe { CloseHandle(self.0.as_ptr()); } } } impl Mutex { pub fn new<S: AsRef<str>>(key: S, owner: bool) -> Result<Self, Error> { let key = CString::new(key.as_ref()).unwrap(); let handle = unsafe { CreateMutexA(NULL(), if owner { 1 } else { 0 }, key.as_ptr()) }; if handle.is_null() { return Err(last_error()); } Ok(Self(unsafe { NonNull::new_unchecked(handle) })) } } impl Mutex { pub fn unlock(&self) -> Result<(), Error> { if unsafe { ReleaseMutex(self.0.as_ptr()) } == 0 { return Err(last_error()); } Ok(()) } pub fn lock(&self) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), None) } } pub fn lock_timeout(&self, timeout: Duration) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) } } } pub struct WaitPid(NonNull<c_void>); impl Drop for WaitPid { fn drop(&mut self) { unsafe { CloseHandle(self.0.as_ptr()); } } } impl WaitPid { pub fn new(pid: u32) -> Result<Self, Error> { unsafe { try_adjust_privilege() .map_err(|err| { log::trace!("Adjust Privilege PID({}) {:?}", pid, err); }) .ok(); let handle = OpenProcess(PROCESS_ALL_ACCESS, 0, pid); if handle.is_null() { return Err(last_error()); } Ok(Self(NonNull::new_unchecked(handle))) } } } impl WaitPid { pub fn wait(&self) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), None) } } pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> { unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) } } } pub struct SharedMemory(NonNull<c_void>, usize, NonNull<u8>); impl Drop for SharedMemory { fn drop(&mut self) { unsafe { UnmapViewOfFile(self.2.as_ptr() as _); CloseHandle(self.0.as_ptr()); } } } impl Deref for SharedMemory { type Target = [u8]; fn deref(&self) -> &Self::Target { unsafe { self.as_bytes() } } } impl DerefMut for SharedMemory { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { self.as_bytes_mut() } } } impl SharedMemory { pub fn new<S: AsRef<str>>(key: S, size: usize) -> Result<Self, Error> { let name = CString::new(key.as_ref()).unwrap(); let handle = unsafe { OpenFileMappingA(FILE_MAP_READ | FILE_MAP_WRITE, 0, name.as_ptr()) }; let handle = if handle.is_null() { let high: u32 = ((size as u64 & 0xFFFF_FFFF_0000_0000_u64) >> 32) as u32; let low: u32 = (size as u64 & 0xFFFF_FFFF_u64) as u32; let handle = unsafe { CreateFileMappingA( INVALID_HANDLE_VALUE, NULL(), PAGE_READWRITE, high, low, name.as_ptr(), ) }; if handle.is_null() { return Err(last_error()); } handle } else { handle }; Self::with_handle(unsafe { NonNull::new_unchecked(handle) }, size) } pub fn with_handle(handle: NonNull<c_void>, size: usize) -> Result<Self, Error> { unsafe { let mmap = MapViewOfFile(handle.as_ptr(), FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, size); if mmap.is_null() { CloseHandle(handle.as_ptr()); return Err(last_error()); } let mut info = MEMORY_BASIC_INFORMATION::default(); let bytes_written = VirtualQuery( mmap, &mut info, std::mem::size_of::<MEMORY_BASIC_INFORMATION>(), ); if (bytes_written as usize) < std::mem::size_of::<MEMORY_BASIC_INFORMATION>() { UnmapViewOfFile(mmap); CloseHandle(handle.as_ptr()); return Err(last_error()); } if info.RegionSize < size { return Err(Error::ErrorMappingSize(size, info.RegionSize)); } Ok(Self( handle, info.RegionSize, NonNull::new_unchecked(mmap as _), )) } } } impl SharedMemory { pub fn len(&self) -> usize { self.1 } pub fn as_ptr(&self) -> *const u8 { self.2.as_ptr() } pub fn as_mut_ptr(&self) -> *mut u8 { self.2.as_ptr() } pub unsafe fn as_bytes(&self) -> &[u8] { std::slice::from_raw_parts(self.2.as_ptr(), self.len()) } pub unsafe fn as_bytes_mut(&self) -> &mut [u8] { std::slice::from_raw_parts_mut(self.2.as_ptr(), self.len()) } } fn last_error() -> Error { Error::OsError(unsafe { GetLastError() }) } unsafe fn try_adjust_privilege() -> Result<(), Error> { let mut h_token = NULL(); let mut sedebugname_value = std::mem::MaybeUninit::uninit(); let mut tkp: MaybeUninit<TOKEN_PRIVILEGES> = std::mem::MaybeUninit::uninit(); let ret = OpenProcessToken( GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, &mut h_token, ); if ret != 1 { return Err(Error::Error(ret)); } let name = CString::new(SE_DEBUG_NAME).unwrap(); let ret = LookupPrivilegeValueA(NULL(), name.as_ptr(), sedebugname_value.as_mut_ptr()); if ret != 1 { CloseHandle(h_token); return Err(Error::Error(ret)); } tkp.assume_init_mut().PrivilegeCount = 1; tkp.assume_init_mut().Privileges[0].Luid = sedebugname_value.assume_init(); tkp.assume_init_mut().Privileges[0].Attributes = SE_PRIVILEGE_ENABLED; let ret = AdjustTokenPrivileges( h_token, 0, tkp.as_mut_ptr(), std::mem::size_of::<TOKEN_PRIVILEGES>() as u32, NULL(), NULL(), ); CloseHandle(h_token); if ret != 1 { return Err(Error::Error(ret)); } let errno = GetLastError(); if errno != ERROR_SUCCESS { return Err(Error::OsError(errno)); } Ok(()) } unsafe fn wait_for_single_object( handle: *mut c_void, timeout: Option<Duration>, ) -> Result<(), Error> { let timeout = if let Some(timeout) = timeout { timeout.as_millis() as u32 } else { INFINITE }; match WaitForSingleObject(handle, timeout) { 0x00000000 => Ok(()), 0x00000080 => Err(Error::WaitAbandoned), 0x00000102 => Err(Error::WaitTimeout), 0xFFFFFFFF => Err(last_error()), ret => Err(Error::OsError(ret)), } }
//进程间原子操作 #![feature(atomic_from_mut)] use byteorder::{BigEndian, ByteOrder, LittleEndian}; use std::io::Write; use std::sync::atomic::{fence, AtomicU64, Ordering}; use std::thread; use std::{sync::Arc, time::Duration}; use xtalk_ipc::windows::SharedMemory; fn main() { { let mut shm = SharedMemory::new("1111", 8).unwrap(); let ptr = &mut shm[8..16]; let ptr = unsafe { &mut *(ptr.as_mut_ptr() as *mut u64) }; println!("{:p} {:p}", shm.as_mut_ptr(), ptr); let lock = AtomicU64::from_mut(ptr); let val = lock.load(Ordering::Relaxed); println!("value1 {}", val); lock.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed) .ok(); fence(Ordering::Acquire); let val = lock.load(Ordering::Relaxed); println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16])); lock.compare_exchange_weak(1, 0, Ordering::Acquire, Ordering::Relaxed) .ok(); fence(Ordering::Acquire); let val = lock.load(Ordering::Relaxed); println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16])); } } // 读共享内存 use std::io::Write; use std::thread; use std::time::{Duration, Instant}; use xtalk_ipc::spinlock::Mutex; use xtalk_ipc::windows::SharedMemory; fn main() { let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap(); loop { { let shm = spin.lock(); println!( "{:?} 主线程{:?} 获得锁 {:?}", Instant::now(), thread::current().id(), String::from_utf8_lossy(&shm[0..12]) ); } thread::sleep(Duration::from_secs(1)); } } // 写共享内存 use std::io::Write; use std::thread; use std::time::{Duration, Instant}; use xtalk_ipc::spinlock::Mutex; use xtalk_ipc::windows::SharedMemory; fn main() { let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap(); loop { { let mut shm = spin.lock(); write!(&mut shm[0..12], "萨芬卡拉圣诞节"); } thread::sleep(Duration::from_secs(1)); } }
代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc
可以参考下 dora 项目. https://github.com/dora-rs/dora
多谢大神, 请问有linux的方案吗?
评论区
写评论shm库亲测可用。多谢各位大佬。 https://crates.io/crates/shm
试试dbus吧,https://docs.rs/zbus
用过C和C#的nng,感觉还不错,看到有rust的绑定,不过似乎停止维护了
原理和平台接口大同小异,如果对你的程序来说是非常关键的功能,那么最好自己能亲手实现并改进它。
--
👇
heliping: 多谢大神, 请问有linux的方案吗?
--
👇
gqf2008:
各个平台都有完整的方案,下面是windows平台的实现代码,供您参考
共享内存使用示例
代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc
可以参考下 dora 项目. https://github.com/dora-rs/dora
多谢大神, 请问有linux的方案吗?
--
👇
gqf2008:
各个平台都有完整的方案,下面是windows平台的实现代码,供您参考
共享内存使用示例
代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc
各个平台都有完整的方案,下面是windows平台的实现代码,供您参考
共享内存使用示例
代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc