< 返回版块

heliping 发表于 2023-06-13 17:00

需要类似c++的进程间内存共享。 使用场景为车载系统的同一主机下的不同终端的进程间通信。 由于处理视频流,需要高速通信。

评论区

写评论
作者 heliping 2023-06-16 10:08

shm库亲测可用。多谢各位大佬。 https://crates.io/crates/shm

Bai-Jinlin 2023-06-14 21:37

试试dbus吧,https://docs.rs/zbus

asuper 2023-06-14 17:27

用过C和C#的nng,感觉还不错,看到有rust的绑定,不过似乎停止维护了

gqf2008 2023-06-14 13:03

原理和平台接口大同小异,如果对你的程序来说是非常关键的功能,那么最好自己能亲手实现并改进它。

--
👇
heliping: 多谢大神, 请问有linux的方案吗?

--
👇
gqf2008:

各个平台都有完整的方案,下面是windows平台的实现代码,供您参考

/// 进程间自旋锁
#[cfg(target_os = "windows")]
use super::windows::{Error, SharedMemory};
use core::hint;
use std::cell::UnsafeCell;
use std::intrinsics::*;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};

pub struct Mutex<T>(SharedMemory, UnsafeCell<T>);

unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Sync {}

pub struct MutexGuard<'a, T>(&'a Mutex<T>, PhantomData<T>);

impl<'a, T> Drop for MutexGuard<'a, T> {
   fn drop(&mut self) {
       let _ = self.0.unlock();
   }
}

impl<'a, T> Deref for MutexGuard<'a, T> {
   type Target = T;
   fn deref(&self) -> &T {
       unsafe { &*self.0 .1.get() }
   }
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
   fn deref_mut(&mut self) -> &mut T {
       unsafe { &mut *self.0 .1.get() }
   }
}

impl<T> Mutex<T> {
   pub fn new<S: AsRef<str>>(key: S, resource: T) -> Result<Self, Error> {
       Ok(Self(SharedMemory::new(key, 8)?, UnsafeCell::new(resource)))
   }
}

impl<T> Mutex<T> {
   pub fn lock(&self) -> MutexGuard<'_, T> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       loop {
           if compare_exchange_weak(shm, 0, 1) {
               return MutexGuard(self, PhantomData);
           }
           if num_cpus::get() > 1 {
               for n in (0..2048).map(|i| i << 1).into_iter() {
                   (0..n).for_each(|_| hint::spin_loop());
                   if compare_exchange_weak(shm, 0, 1) {
                       return MutexGuard(self, PhantomData);
                   }
               }
           }
           std::thread::yield_now();
       }
   }

   pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange_weak(shm, 0, 1) {
           Ok(MutexGuard(self, PhantomData))
       } else {
           Err(())
       }
   }

   pub fn is_locked(&self) -> bool {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       load_relaxed(shm) == 1
   }

   fn unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       compare_exchange(shm, 1, 0);
   }

   fn try_unlock(&self) -> Result<(), ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange(shm, 1, 0) {
           Ok(())
       } else {
           Err(())
       }
   }

   pub fn force_unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       store_seqcst(shm, 0)
   }
}

pub struct RwLock<T: ?Sized> {
   lock: SharedMemory,
   data: T,
}

const READER: usize = 1 << 2;
const UPGRADED: usize = 1 << 1;
const WRITER: usize = 1;

pub struct RwLockReadGuard<'a, T: 'a + ?Sized> {
   lock: &'a SharedMemory,
   data: &'a T,
}

pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a mut T,
}

pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a T,
}

unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}

#[inline]
fn compare_exchange_weak<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchgweak_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn compare_exchange<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchg_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn store_seqcst<T: Copy>(dst: *mut T, val: T) {
   unsafe { atomic_store_seqcst(dst, val) }
}

#[inline]
fn load_relaxed<T: Copy>(dst: *mut T) -> T {
   unsafe { atomic_load_relaxed(dst) }
}


use std::ffi::c_void;
use std::ffi::CString;
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::ptr::null_mut as NULL;
use std::ptr::NonNull;
use std::time::Duration;
use thiserror::Error;
use winapi::shared::winerror::ERROR_SUCCESS;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::handleapi::CloseHandle;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::memoryapi::*;
use winapi::um::processthreadsapi::*;
use winapi::um::securitybaseapi::AdjustTokenPrivileges;
use winapi::um::synchapi::*;
use winapi::um::winbase::*;
use winapi::um::winnt::*;

#[derive(Debug, Error)]
pub enum Error {
   #[error("OsWaitAbandoned")]
   WaitAbandoned,
   #[error("OsWaitTimeout")]
   WaitTimeout,
   #[error("OsError {0}")]
   OsError(u32),
   #[error("OsCallError {0}")]
   Error(i32),
   #[error("OsErrorMappingSize {0}/{1}")]
   ErrorMappingSize(usize, usize),
}

pub struct Semaphore(NonNull<c_void>);

impl Drop for Semaphore {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Semaphore {
   pub fn new<S: AsRef<str>>(key: S, init: i32, max: i32) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateSemaphoreA(NULL(), init, max, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Semaphore {
   pub fn release(&self, n: i32) -> Result<(), Error> {
       if unsafe { ReleaseSemaphore(self.0.as_ptr(), n, NULL()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Event(NonNull<c_void>);

impl Drop for Event {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Event {
   pub fn new<S: AsRef<str>>(key: S, manual_reset: bool, init_state: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe {
           CreateEventA(
               NULL(),
               if manual_reset { 1 } else { 0 },
               if init_state { 1 } else { 0 },
               key.as_ptr(),
           )
       };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}
// 自动模式下的WaitForSingleObject + SetEvent,一次唤醒一个线程
// 手段模式下的WaitForSingleObject + SetEvent + ResetEvent,一次唤醒一批线程
// 手段模式下的WaitForSingleObject + PulseEvent ,一次唤醒所有线程
impl Event {
   pub fn set(&self) -> Result<(), Error> {
       if unsafe { SetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn reset(&self) -> Result<(), Error> {
       if unsafe { ResetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn pulse(&self) -> Result<(), Error> {
       if unsafe { PulseEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Mutex(NonNull<c_void>);

impl Drop for Mutex {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Mutex {
   pub fn new<S: AsRef<str>>(key: S, owner: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateMutexA(NULL(), if owner { 1 } else { 0 }, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Mutex {
   pub fn unlock(&self) -> Result<(), Error> {
       if unsafe { ReleaseMutex(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn lock(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn lock_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct WaitPid(NonNull<c_void>);

impl Drop for WaitPid {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}
impl WaitPid {
   pub fn new(pid: u32) -> Result<Self, Error> {
       unsafe {
           try_adjust_privilege()
               .map_err(|err| {
                   log::trace!("Adjust Privilege PID({}) {:?}", pid, err);
               })
               .ok();

           let handle = OpenProcess(PROCESS_ALL_ACCESS, 0, pid);
           if handle.is_null() {
               return Err(last_error());
           }
           Ok(Self(NonNull::new_unchecked(handle)))
       }
   }
}

impl WaitPid {
   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct SharedMemory(NonNull<c_void>, usize, NonNull<u8>);

impl Drop for SharedMemory {
   fn drop(&mut self) {
       unsafe {
           UnmapViewOfFile(self.2.as_ptr() as _);
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Deref for SharedMemory {
   type Target = [u8];

   fn deref(&self) -> &Self::Target {
       unsafe { self.as_bytes() }
   }
}

impl DerefMut for SharedMemory {
   fn deref_mut(&mut self) -> &mut Self::Target {
       unsafe { self.as_bytes_mut() }
   }
}

impl SharedMemory {
   pub fn new<S: AsRef<str>>(key: S, size: usize) -> Result<Self, Error> {
       let name = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { OpenFileMappingA(FILE_MAP_READ | FILE_MAP_WRITE, 0, name.as_ptr()) };
       let handle = if handle.is_null() {
           let high: u32 = ((size as u64 & 0xFFFF_FFFF_0000_0000_u64) >> 32) as u32;
           let low: u32 = (size as u64 & 0xFFFF_FFFF_u64) as u32;
           let handle = unsafe {
               CreateFileMappingA(
                   INVALID_HANDLE_VALUE,
                   NULL(),
                   PAGE_READWRITE,
                   high,
                   low,
                   name.as_ptr(),
               )
           };
           if handle.is_null() {
               return Err(last_error());
           }
           handle
       } else {
           handle
       };
       Self::with_handle(unsafe { NonNull::new_unchecked(handle) }, size)
   }

   pub fn with_handle(handle: NonNull<c_void>, size: usize) -> Result<Self, Error> {
       unsafe {
           let mmap = MapViewOfFile(handle.as_ptr(), FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, size);
           if mmap.is_null() {
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           let mut info = MEMORY_BASIC_INFORMATION::default();
           let bytes_written = VirtualQuery(
               mmap,
               &mut info,
               std::mem::size_of::<MEMORY_BASIC_INFORMATION>(),
           );
           if (bytes_written as usize) < std::mem::size_of::<MEMORY_BASIC_INFORMATION>() {
               UnmapViewOfFile(mmap);
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           if info.RegionSize < size {
               return Err(Error::ErrorMappingSize(size, info.RegionSize));
           }
           Ok(Self(
               handle,
               info.RegionSize,
               NonNull::new_unchecked(mmap as _),
           ))
       }
   }
}

impl SharedMemory {
   pub fn len(&self) -> usize {
       self.1
   }

   pub fn as_ptr(&self) -> *const u8 {
       self.2.as_ptr()
   }

   pub fn as_mut_ptr(&self) -> *mut u8 {
       self.2.as_ptr()
   }

   pub unsafe fn as_bytes(&self) -> &[u8] {
       std::slice::from_raw_parts(self.2.as_ptr(), self.len())
   }

   pub unsafe fn as_bytes_mut(&self) -> &mut [u8] {
       std::slice::from_raw_parts_mut(self.2.as_ptr(), self.len())
   }
}

fn last_error() -> Error {
   Error::OsError(unsafe { GetLastError() })
}

unsafe fn try_adjust_privilege() -> Result<(), Error> {
   let mut h_token = NULL();
   let mut sedebugname_value = std::mem::MaybeUninit::uninit();
   let mut tkp: MaybeUninit<TOKEN_PRIVILEGES> = std::mem::MaybeUninit::uninit();

   let ret = OpenProcessToken(
       GetCurrentProcess(),
       TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
       &mut h_token,
   );
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let name = CString::new(SE_DEBUG_NAME).unwrap();
   let ret = LookupPrivilegeValueA(NULL(), name.as_ptr(), sedebugname_value.as_mut_ptr());
   if ret != 1 {
       CloseHandle(h_token);
       return Err(Error::Error(ret));
   }
   tkp.assume_init_mut().PrivilegeCount = 1;
   tkp.assume_init_mut().Privileges[0].Luid = sedebugname_value.assume_init();
   tkp.assume_init_mut().Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
   let ret = AdjustTokenPrivileges(
       h_token,
       0,
       tkp.as_mut_ptr(),
       std::mem::size_of::<TOKEN_PRIVILEGES>() as u32,
       NULL(),
       NULL(),
   );
   CloseHandle(h_token);
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let errno = GetLastError();
   if errno != ERROR_SUCCESS {
       return Err(Error::OsError(errno));
   }
   Ok(())
}

unsafe fn wait_for_single_object(
   handle: *mut c_void,
   timeout: Option<Duration>,
) -> Result<(), Error> {
   let timeout = if let Some(timeout) = timeout {
       timeout.as_millis() as u32
   } else {
       INFINITE
   };
   match WaitForSingleObject(handle, timeout) {
       0x00000000 => Ok(()),
       0x00000080 => Err(Error::WaitAbandoned),
       0x00000102 => Err(Error::WaitTimeout),
       0xFFFFFFFF => Err(last_error()),
       ret => Err(Error::OsError(ret)),
   }
}

共享内存使用示例


//进程间原子操作
#![feature(atomic_from_mut)]

use byteorder::{BigEndian, ByteOrder, LittleEndian};
use std::io::Write;
use std::sync::atomic::{fence, AtomicU64, Ordering};
use std::thread;
use std::{sync::Arc, time::Duration};
use xtalk_ipc::windows::SharedMemory;

fn main() {
   {
       let mut shm = SharedMemory::new("1111", 8).unwrap();
       let ptr = &mut shm[8..16];
       let ptr = unsafe { &mut *(ptr.as_mut_ptr() as *mut u64) };
       println!("{:p} {:p}", shm.as_mut_ptr(), ptr);

       let lock = AtomicU64::from_mut(ptr);
       let val = lock.load(Ordering::Relaxed);
       println!("value1 {}", val);

       lock.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));

       lock.compare_exchange_weak(1, 0, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));
   }
}

// 读共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let shm = spin.lock();
           println!(
               "{:?} 主线程{:?} 获得锁 {:?}",
               Instant::now(),
               thread::current().id(),
               String::from_utf8_lossy(&shm[0..12])
           );
       }
       thread::sleep(Duration::from_secs(1));
   }
}


// 写共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let mut shm = spin.lock();
           write!(&mut shm[0..12], "萨芬卡拉圣诞节");
       }
       thread::sleep(Duration::from_secs(1));
   }
}

代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc

sanri 2023-06-14 10:55

可以参考下 dora 项目. https://github.com/dora-rs/dora

作者 heliping 2023-06-14 09:24

多谢大神, 请问有linux的方案吗?

--
👇
gqf2008:

各个平台都有完整的方案,下面是windows平台的实现代码,供您参考

/// 进程间自旋锁
#[cfg(target_os = "windows")]
use super::windows::{Error, SharedMemory};
use core::hint;
use std::cell::UnsafeCell;
use std::intrinsics::*;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};

pub struct Mutex<T>(SharedMemory, UnsafeCell<T>);

unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Sync {}

pub struct MutexGuard<'a, T>(&'a Mutex<T>, PhantomData<T>);

impl<'a, T> Drop for MutexGuard<'a, T> {
   fn drop(&mut self) {
       let _ = self.0.unlock();
   }
}

impl<'a, T> Deref for MutexGuard<'a, T> {
   type Target = T;
   fn deref(&self) -> &T {
       unsafe { &*self.0 .1.get() }
   }
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
   fn deref_mut(&mut self) -> &mut T {
       unsafe { &mut *self.0 .1.get() }
   }
}

impl<T> Mutex<T> {
   pub fn new<S: AsRef<str>>(key: S, resource: T) -> Result<Self, Error> {
       Ok(Self(SharedMemory::new(key, 8)?, UnsafeCell::new(resource)))
   }
}

impl<T> Mutex<T> {
   pub fn lock(&self) -> MutexGuard<'_, T> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       loop {
           if compare_exchange_weak(shm, 0, 1) {
               return MutexGuard(self, PhantomData);
           }
           if num_cpus::get() > 1 {
               for n in (0..2048).map(|i| i << 1).into_iter() {
                   (0..n).for_each(|_| hint::spin_loop());
                   if compare_exchange_weak(shm, 0, 1) {
                       return MutexGuard(self, PhantomData);
                   }
               }
           }
           std::thread::yield_now();
       }
   }

   pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange_weak(shm, 0, 1) {
           Ok(MutexGuard(self, PhantomData))
       } else {
           Err(())
       }
   }

   pub fn is_locked(&self) -> bool {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       load_relaxed(shm) == 1
   }

   fn unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       compare_exchange(shm, 1, 0);
   }

   fn try_unlock(&self) -> Result<(), ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange(shm, 1, 0) {
           Ok(())
       } else {
           Err(())
       }
   }

   pub fn force_unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       store_seqcst(shm, 0)
   }
}

pub struct RwLock<T: ?Sized> {
   lock: SharedMemory,
   data: T,
}

const READER: usize = 1 << 2;
const UPGRADED: usize = 1 << 1;
const WRITER: usize = 1;

pub struct RwLockReadGuard<'a, T: 'a + ?Sized> {
   lock: &'a SharedMemory,
   data: &'a T,
}

pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a mut T,
}

pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a T,
}

unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}

#[inline]
fn compare_exchange_weak<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchgweak_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn compare_exchange<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchg_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn store_seqcst<T: Copy>(dst: *mut T, val: T) {
   unsafe { atomic_store_seqcst(dst, val) }
}

#[inline]
fn load_relaxed<T: Copy>(dst: *mut T) -> T {
   unsafe { atomic_load_relaxed(dst) }
}


use std::ffi::c_void;
use std::ffi::CString;
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::ptr::null_mut as NULL;
use std::ptr::NonNull;
use std::time::Duration;
use thiserror::Error;
use winapi::shared::winerror::ERROR_SUCCESS;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::handleapi::CloseHandle;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::memoryapi::*;
use winapi::um::processthreadsapi::*;
use winapi::um::securitybaseapi::AdjustTokenPrivileges;
use winapi::um::synchapi::*;
use winapi::um::winbase::*;
use winapi::um::winnt::*;

#[derive(Debug, Error)]
pub enum Error {
   #[error("OsWaitAbandoned")]
   WaitAbandoned,
   #[error("OsWaitTimeout")]
   WaitTimeout,
   #[error("OsError {0}")]
   OsError(u32),
   #[error("OsCallError {0}")]
   Error(i32),
   #[error("OsErrorMappingSize {0}/{1}")]
   ErrorMappingSize(usize, usize),
}

pub struct Semaphore(NonNull<c_void>);

impl Drop for Semaphore {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Semaphore {
   pub fn new<S: AsRef<str>>(key: S, init: i32, max: i32) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateSemaphoreA(NULL(), init, max, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Semaphore {
   pub fn release(&self, n: i32) -> Result<(), Error> {
       if unsafe { ReleaseSemaphore(self.0.as_ptr(), n, NULL()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Event(NonNull<c_void>);

impl Drop for Event {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Event {
   pub fn new<S: AsRef<str>>(key: S, manual_reset: bool, init_state: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe {
           CreateEventA(
               NULL(),
               if manual_reset { 1 } else { 0 },
               if init_state { 1 } else { 0 },
               key.as_ptr(),
           )
       };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}
// 自动模式下的WaitForSingleObject + SetEvent,一次唤醒一个线程
// 手段模式下的WaitForSingleObject + SetEvent + ResetEvent,一次唤醒一批线程
// 手段模式下的WaitForSingleObject + PulseEvent ,一次唤醒所有线程
impl Event {
   pub fn set(&self) -> Result<(), Error> {
       if unsafe { SetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn reset(&self) -> Result<(), Error> {
       if unsafe { ResetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn pulse(&self) -> Result<(), Error> {
       if unsafe { PulseEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Mutex(NonNull<c_void>);

impl Drop for Mutex {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Mutex {
   pub fn new<S: AsRef<str>>(key: S, owner: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateMutexA(NULL(), if owner { 1 } else { 0 }, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Mutex {
   pub fn unlock(&self) -> Result<(), Error> {
       if unsafe { ReleaseMutex(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn lock(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn lock_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct WaitPid(NonNull<c_void>);

impl Drop for WaitPid {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}
impl WaitPid {
   pub fn new(pid: u32) -> Result<Self, Error> {
       unsafe {
           try_adjust_privilege()
               .map_err(|err| {
                   log::trace!("Adjust Privilege PID({}) {:?}", pid, err);
               })
               .ok();

           let handle = OpenProcess(PROCESS_ALL_ACCESS, 0, pid);
           if handle.is_null() {
               return Err(last_error());
           }
           Ok(Self(NonNull::new_unchecked(handle)))
       }
   }
}

impl WaitPid {
   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct SharedMemory(NonNull<c_void>, usize, NonNull<u8>);

impl Drop for SharedMemory {
   fn drop(&mut self) {
       unsafe {
           UnmapViewOfFile(self.2.as_ptr() as _);
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Deref for SharedMemory {
   type Target = [u8];

   fn deref(&self) -> &Self::Target {
       unsafe { self.as_bytes() }
   }
}

impl DerefMut for SharedMemory {
   fn deref_mut(&mut self) -> &mut Self::Target {
       unsafe { self.as_bytes_mut() }
   }
}

impl SharedMemory {
   pub fn new<S: AsRef<str>>(key: S, size: usize) -> Result<Self, Error> {
       let name = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { OpenFileMappingA(FILE_MAP_READ | FILE_MAP_WRITE, 0, name.as_ptr()) };
       let handle = if handle.is_null() {
           let high: u32 = ((size as u64 & 0xFFFF_FFFF_0000_0000_u64) >> 32) as u32;
           let low: u32 = (size as u64 & 0xFFFF_FFFF_u64) as u32;
           let handle = unsafe {
               CreateFileMappingA(
                   INVALID_HANDLE_VALUE,
                   NULL(),
                   PAGE_READWRITE,
                   high,
                   low,
                   name.as_ptr(),
               )
           };
           if handle.is_null() {
               return Err(last_error());
           }
           handle
       } else {
           handle
       };
       Self::with_handle(unsafe { NonNull::new_unchecked(handle) }, size)
   }

   pub fn with_handle(handle: NonNull<c_void>, size: usize) -> Result<Self, Error> {
       unsafe {
           let mmap = MapViewOfFile(handle.as_ptr(), FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, size);
           if mmap.is_null() {
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           let mut info = MEMORY_BASIC_INFORMATION::default();
           let bytes_written = VirtualQuery(
               mmap,
               &mut info,
               std::mem::size_of::<MEMORY_BASIC_INFORMATION>(),
           );
           if (bytes_written as usize) < std::mem::size_of::<MEMORY_BASIC_INFORMATION>() {
               UnmapViewOfFile(mmap);
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           if info.RegionSize < size {
               return Err(Error::ErrorMappingSize(size, info.RegionSize));
           }
           Ok(Self(
               handle,
               info.RegionSize,
               NonNull::new_unchecked(mmap as _),
           ))
       }
   }
}

impl SharedMemory {
   pub fn len(&self) -> usize {
       self.1
   }

   pub fn as_ptr(&self) -> *const u8 {
       self.2.as_ptr()
   }

   pub fn as_mut_ptr(&self) -> *mut u8 {
       self.2.as_ptr()
   }

   pub unsafe fn as_bytes(&self) -> &[u8] {
       std::slice::from_raw_parts(self.2.as_ptr(), self.len())
   }

   pub unsafe fn as_bytes_mut(&self) -> &mut [u8] {
       std::slice::from_raw_parts_mut(self.2.as_ptr(), self.len())
   }
}

fn last_error() -> Error {
   Error::OsError(unsafe { GetLastError() })
}

unsafe fn try_adjust_privilege() -> Result<(), Error> {
   let mut h_token = NULL();
   let mut sedebugname_value = std::mem::MaybeUninit::uninit();
   let mut tkp: MaybeUninit<TOKEN_PRIVILEGES> = std::mem::MaybeUninit::uninit();

   let ret = OpenProcessToken(
       GetCurrentProcess(),
       TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
       &mut h_token,
   );
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let name = CString::new(SE_DEBUG_NAME).unwrap();
   let ret = LookupPrivilegeValueA(NULL(), name.as_ptr(), sedebugname_value.as_mut_ptr());
   if ret != 1 {
       CloseHandle(h_token);
       return Err(Error::Error(ret));
   }
   tkp.assume_init_mut().PrivilegeCount = 1;
   tkp.assume_init_mut().Privileges[0].Luid = sedebugname_value.assume_init();
   tkp.assume_init_mut().Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
   let ret = AdjustTokenPrivileges(
       h_token,
       0,
       tkp.as_mut_ptr(),
       std::mem::size_of::<TOKEN_PRIVILEGES>() as u32,
       NULL(),
       NULL(),
   );
   CloseHandle(h_token);
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let errno = GetLastError();
   if errno != ERROR_SUCCESS {
       return Err(Error::OsError(errno));
   }
   Ok(())
}

unsafe fn wait_for_single_object(
   handle: *mut c_void,
   timeout: Option<Duration>,
) -> Result<(), Error> {
   let timeout = if let Some(timeout) = timeout {
       timeout.as_millis() as u32
   } else {
       INFINITE
   };
   match WaitForSingleObject(handle, timeout) {
       0x00000000 => Ok(()),
       0x00000080 => Err(Error::WaitAbandoned),
       0x00000102 => Err(Error::WaitTimeout),
       0xFFFFFFFF => Err(last_error()),
       ret => Err(Error::OsError(ret)),
   }
}

共享内存使用示例


//进程间原子操作
#![feature(atomic_from_mut)]

use byteorder::{BigEndian, ByteOrder, LittleEndian};
use std::io::Write;
use std::sync::atomic::{fence, AtomicU64, Ordering};
use std::thread;
use std::{sync::Arc, time::Duration};
use xtalk_ipc::windows::SharedMemory;

fn main() {
   {
       let mut shm = SharedMemory::new("1111", 8).unwrap();
       let ptr = &mut shm[8..16];
       let ptr = unsafe { &mut *(ptr.as_mut_ptr() as *mut u64) };
       println!("{:p} {:p}", shm.as_mut_ptr(), ptr);

       let lock = AtomicU64::from_mut(ptr);
       let val = lock.load(Ordering::Relaxed);
       println!("value1 {}", val);

       lock.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));

       lock.compare_exchange_weak(1, 0, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));
   }
}

// 读共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let shm = spin.lock();
           println!(
               "{:?} 主线程{:?} 获得锁 {:?}",
               Instant::now(),
               thread::current().id(),
               String::from_utf8_lossy(&shm[0..12])
           );
       }
       thread::sleep(Duration::from_secs(1));
   }
}


// 写共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let mut shm = spin.lock();
           write!(&mut shm[0..12], "萨芬卡拉圣诞节");
       }
       thread::sleep(Duration::from_secs(1));
   }
}

代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc

gqf2008 2023-06-14 08:37

各个平台都有完整的方案,下面是windows平台的实现代码,供您参考

/// 进程间自旋锁
#[cfg(target_os = "windows")]
use super::windows::{Error, SharedMemory};
use core::hint;
use std::cell::UnsafeCell;
use std::intrinsics::*;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};

pub struct Mutex<T>(SharedMemory, UnsafeCell<T>);

unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Sync {}

pub struct MutexGuard<'a, T>(&'a Mutex<T>, PhantomData<T>);

impl<'a, T> Drop for MutexGuard<'a, T> {
   fn drop(&mut self) {
       let _ = self.0.unlock();
   }
}

impl<'a, T> Deref for MutexGuard<'a, T> {
   type Target = T;
   fn deref(&self) -> &T {
       unsafe { &*self.0 .1.get() }
   }
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
   fn deref_mut(&mut self) -> &mut T {
       unsafe { &mut *self.0 .1.get() }
   }
}

impl<T> Mutex<T> {
   pub fn new<S: AsRef<str>>(key: S, resource: T) -> Result<Self, Error> {
       Ok(Self(SharedMemory::new(key, 8)?, UnsafeCell::new(resource)))
   }
}

impl<T> Mutex<T> {
   pub fn lock(&self) -> MutexGuard<'_, T> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       loop {
           if compare_exchange_weak(shm, 0, 1) {
               return MutexGuard(self, PhantomData);
           }
           if num_cpus::get() > 1 {
               for n in (0..2048).map(|i| i << 1).into_iter() {
                   (0..n).for_each(|_| hint::spin_loop());
                   if compare_exchange_weak(shm, 0, 1) {
                       return MutexGuard(self, PhantomData);
                   }
               }
           }
           std::thread::yield_now();
       }
   }

   pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange_weak(shm, 0, 1) {
           Ok(MutexGuard(self, PhantomData))
       } else {
           Err(())
       }
   }

   pub fn is_locked(&self) -> bool {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       load_relaxed(shm) == 1
   }

   fn unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       compare_exchange(shm, 1, 0);
   }

   fn try_unlock(&self) -> Result<(), ()> {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       if compare_exchange(shm, 1, 0) {
           Ok(())
       } else {
           Err(())
       }
   }

   pub fn force_unlock(&self) {
       let shm = unsafe { &mut *(self.0.as_mut_ptr() as *mut u64) };
       store_seqcst(shm, 0)
   }
}

pub struct RwLock<T: ?Sized> {
   lock: SharedMemory,
   data: T,
}

const READER: usize = 1 << 2;
const UPGRADED: usize = 1 << 1;
const WRITER: usize = 1;

pub struct RwLockReadGuard<'a, T: 'a + ?Sized> {
   lock: &'a SharedMemory,
   data: &'a T,
}

pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a mut T,
}

pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized> {
   inner: &'a RwLock<T>,
   data: &'a T,
}

unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}

#[inline]
fn compare_exchange_weak<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchgweak_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn compare_exchange<T: Copy>(dst: *mut T, old: T, new: T) -> bool {
   let (_val, ok) = unsafe { atomic_cxchg_acquire_relaxed(dst, old, new) };
   ok
}
#[inline]
fn store_seqcst<T: Copy>(dst: *mut T, val: T) {
   unsafe { atomic_store_seqcst(dst, val) }
}

#[inline]
fn load_relaxed<T: Copy>(dst: *mut T) -> T {
   unsafe { atomic_load_relaxed(dst) }
}


use std::ffi::c_void;
use std::ffi::CString;
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::ptr::null_mut as NULL;
use std::ptr::NonNull;
use std::time::Duration;
use thiserror::Error;
use winapi::shared::winerror::ERROR_SUCCESS;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::handleapi::CloseHandle;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::memoryapi::*;
use winapi::um::processthreadsapi::*;
use winapi::um::securitybaseapi::AdjustTokenPrivileges;
use winapi::um::synchapi::*;
use winapi::um::winbase::*;
use winapi::um::winnt::*;

#[derive(Debug, Error)]
pub enum Error {
   #[error("OsWaitAbandoned")]
   WaitAbandoned,
   #[error("OsWaitTimeout")]
   WaitTimeout,
   #[error("OsError {0}")]
   OsError(u32),
   #[error("OsCallError {0}")]
   Error(i32),
   #[error("OsErrorMappingSize {0}/{1}")]
   ErrorMappingSize(usize, usize),
}

pub struct Semaphore(NonNull<c_void>);

impl Drop for Semaphore {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Semaphore {
   pub fn new<S: AsRef<str>>(key: S, init: i32, max: i32) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateSemaphoreA(NULL(), init, max, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Semaphore {
   pub fn release(&self, n: i32) -> Result<(), Error> {
       if unsafe { ReleaseSemaphore(self.0.as_ptr(), n, NULL()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Event(NonNull<c_void>);

impl Drop for Event {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Event {
   pub fn new<S: AsRef<str>>(key: S, manual_reset: bool, init_state: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe {
           CreateEventA(
               NULL(),
               if manual_reset { 1 } else { 0 },
               if init_state { 1 } else { 0 },
               key.as_ptr(),
           )
       };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}
// 自动模式下的WaitForSingleObject + SetEvent,一次唤醒一个线程
// 手段模式下的WaitForSingleObject + SetEvent + ResetEvent,一次唤醒一批线程
// 手段模式下的WaitForSingleObject + PulseEvent ,一次唤醒所有线程
impl Event {
   pub fn set(&self) -> Result<(), Error> {
       if unsafe { SetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn reset(&self) -> Result<(), Error> {
       if unsafe { ResetEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn pulse(&self) -> Result<(), Error> {
       if unsafe { PulseEvent(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct Mutex(NonNull<c_void>);

impl Drop for Mutex {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Mutex {
   pub fn new<S: AsRef<str>>(key: S, owner: bool) -> Result<Self, Error> {
       let key = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { CreateMutexA(NULL(), if owner { 1 } else { 0 }, key.as_ptr()) };
       if handle.is_null() {
           return Err(last_error());
       }
       Ok(Self(unsafe { NonNull::new_unchecked(handle) }))
   }
}

impl Mutex {
   pub fn unlock(&self) -> Result<(), Error> {
       if unsafe { ReleaseMutex(self.0.as_ptr()) } == 0 {
           return Err(last_error());
       }
       Ok(())
   }

   pub fn lock(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn lock_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct WaitPid(NonNull<c_void>);

impl Drop for WaitPid {
   fn drop(&mut self) {
       unsafe {
           CloseHandle(self.0.as_ptr());
       }
   }
}
impl WaitPid {
   pub fn new(pid: u32) -> Result<Self, Error> {
       unsafe {
           try_adjust_privilege()
               .map_err(|err| {
                   log::trace!("Adjust Privilege PID({}) {:?}", pid, err);
               })
               .ok();

           let handle = OpenProcess(PROCESS_ALL_ACCESS, 0, pid);
           if handle.is_null() {
               return Err(last_error());
           }
           Ok(Self(NonNull::new_unchecked(handle)))
       }
   }
}

impl WaitPid {
   pub fn wait(&self) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), None) }
   }

   pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
       unsafe { wait_for_single_object(self.0.as_ptr(), Some(timeout)) }
   }
}

pub struct SharedMemory(NonNull<c_void>, usize, NonNull<u8>);

impl Drop for SharedMemory {
   fn drop(&mut self) {
       unsafe {
           UnmapViewOfFile(self.2.as_ptr() as _);
           CloseHandle(self.0.as_ptr());
       }
   }
}

impl Deref for SharedMemory {
   type Target = [u8];

   fn deref(&self) -> &Self::Target {
       unsafe { self.as_bytes() }
   }
}

impl DerefMut for SharedMemory {
   fn deref_mut(&mut self) -> &mut Self::Target {
       unsafe { self.as_bytes_mut() }
   }
}

impl SharedMemory {
   pub fn new<S: AsRef<str>>(key: S, size: usize) -> Result<Self, Error> {
       let name = CString::new(key.as_ref()).unwrap();
       let handle = unsafe { OpenFileMappingA(FILE_MAP_READ | FILE_MAP_WRITE, 0, name.as_ptr()) };
       let handle = if handle.is_null() {
           let high: u32 = ((size as u64 & 0xFFFF_FFFF_0000_0000_u64) >> 32) as u32;
           let low: u32 = (size as u64 & 0xFFFF_FFFF_u64) as u32;
           let handle = unsafe {
               CreateFileMappingA(
                   INVALID_HANDLE_VALUE,
                   NULL(),
                   PAGE_READWRITE,
                   high,
                   low,
                   name.as_ptr(),
               )
           };
           if handle.is_null() {
               return Err(last_error());
           }
           handle
       } else {
           handle
       };
       Self::with_handle(unsafe { NonNull::new_unchecked(handle) }, size)
   }

   pub fn with_handle(handle: NonNull<c_void>, size: usize) -> Result<Self, Error> {
       unsafe {
           let mmap = MapViewOfFile(handle.as_ptr(), FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, size);
           if mmap.is_null() {
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           let mut info = MEMORY_BASIC_INFORMATION::default();
           let bytes_written = VirtualQuery(
               mmap,
               &mut info,
               std::mem::size_of::<MEMORY_BASIC_INFORMATION>(),
           );
           if (bytes_written as usize) < std::mem::size_of::<MEMORY_BASIC_INFORMATION>() {
               UnmapViewOfFile(mmap);
               CloseHandle(handle.as_ptr());
               return Err(last_error());
           }
           if info.RegionSize < size {
               return Err(Error::ErrorMappingSize(size, info.RegionSize));
           }
           Ok(Self(
               handle,
               info.RegionSize,
               NonNull::new_unchecked(mmap as _),
           ))
       }
   }
}

impl SharedMemory {
   pub fn len(&self) -> usize {
       self.1
   }

   pub fn as_ptr(&self) -> *const u8 {
       self.2.as_ptr()
   }

   pub fn as_mut_ptr(&self) -> *mut u8 {
       self.2.as_ptr()
   }

   pub unsafe fn as_bytes(&self) -> &[u8] {
       std::slice::from_raw_parts(self.2.as_ptr(), self.len())
   }

   pub unsafe fn as_bytes_mut(&self) -> &mut [u8] {
       std::slice::from_raw_parts_mut(self.2.as_ptr(), self.len())
   }
}

fn last_error() -> Error {
   Error::OsError(unsafe { GetLastError() })
}

unsafe fn try_adjust_privilege() -> Result<(), Error> {
   let mut h_token = NULL();
   let mut sedebugname_value = std::mem::MaybeUninit::uninit();
   let mut tkp: MaybeUninit<TOKEN_PRIVILEGES> = std::mem::MaybeUninit::uninit();

   let ret = OpenProcessToken(
       GetCurrentProcess(),
       TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
       &mut h_token,
   );
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let name = CString::new(SE_DEBUG_NAME).unwrap();
   let ret = LookupPrivilegeValueA(NULL(), name.as_ptr(), sedebugname_value.as_mut_ptr());
   if ret != 1 {
       CloseHandle(h_token);
       return Err(Error::Error(ret));
   }
   tkp.assume_init_mut().PrivilegeCount = 1;
   tkp.assume_init_mut().Privileges[0].Luid = sedebugname_value.assume_init();
   tkp.assume_init_mut().Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
   let ret = AdjustTokenPrivileges(
       h_token,
       0,
       tkp.as_mut_ptr(),
       std::mem::size_of::<TOKEN_PRIVILEGES>() as u32,
       NULL(),
       NULL(),
   );
   CloseHandle(h_token);
   if ret != 1 {
       return Err(Error::Error(ret));
   }
   let errno = GetLastError();
   if errno != ERROR_SUCCESS {
       return Err(Error::OsError(errno));
   }
   Ok(())
}

unsafe fn wait_for_single_object(
   handle: *mut c_void,
   timeout: Option<Duration>,
) -> Result<(), Error> {
   let timeout = if let Some(timeout) = timeout {
       timeout.as_millis() as u32
   } else {
       INFINITE
   };
   match WaitForSingleObject(handle, timeout) {
       0x00000000 => Ok(()),
       0x00000080 => Err(Error::WaitAbandoned),
       0x00000102 => Err(Error::WaitTimeout),
       0xFFFFFFFF => Err(last_error()),
       ret => Err(Error::OsError(ret)),
   }
}

共享内存使用示例


//进程间原子操作
#![feature(atomic_from_mut)]

use byteorder::{BigEndian, ByteOrder, LittleEndian};
use std::io::Write;
use std::sync::atomic::{fence, AtomicU64, Ordering};
use std::thread;
use std::{sync::Arc, time::Duration};
use xtalk_ipc::windows::SharedMemory;

fn main() {
   {
       let mut shm = SharedMemory::new("1111", 8).unwrap();
       let ptr = &mut shm[8..16];
       let ptr = unsafe { &mut *(ptr.as_mut_ptr() as *mut u64) };
       println!("{:p} {:p}", shm.as_mut_ptr(), ptr);

       let lock = AtomicU64::from_mut(ptr);
       let val = lock.load(Ordering::Relaxed);
       println!("value1 {}", val);

       lock.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));

       lock.compare_exchange_weak(1, 0, Ordering::Acquire, Ordering::Relaxed)
           .ok();
       fence(Ordering::Acquire);
       let val = lock.load(Ordering::Relaxed);
       println!("value2 {} {}", val, LittleEndian::read_u64(&shm[8..16]));
   }
}

// 读共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let shm = spin.lock();
           println!(
               "{:?} 主线程{:?} 获得锁 {:?}",
               Instant::now(),
               thread::current().id(),
               String::from_utf8_lossy(&shm[0..12])
           );
       }
       thread::sleep(Duration::from_secs(1));
   }
}


// 写共享内存

use std::io::Write;
use std::thread;
use std::time::{Duration, Instant};
use xtalk_ipc::spinlock::Mutex;
use xtalk_ipc::windows::SharedMemory;

fn main() {
   let spin = Mutex::new("spinlock", SharedMemory::new("xxxxxxx", 4096).unwrap()).unwrap();
   loop {
       {
           let mut shm = spin.lock();
           write!(&mut shm[0..12], "萨芬卡拉圣诞节");
       }
       thread::sleep(Duration::from_secs(1));
   }
}

代码来源: https://github.com/gqf2008/Xtalk/tree/main/xtalk-ipc

1 共 7 条评论, 1 页