在现代多线程和异步编程中,CancellationToken是一个常见的工具,用于及时、安全地取消长时间运行的任务或操作。然而,在处理此类对象时,如果不慎操作,也可能导致悬空指针这一严重问题,进而引发程序崩溃或者难以预测的行为。本文将通过分析一个Rust环境下的实际案例,探讨由使用 tokio_util::sync::CancellationToken 所引起的悬空指针问题。
CancellationToken:程序员必读的悬空指针灾难案例
案情重现与分析
在多线程异步任务程序开发过程中,我们遭遇了一场由于 CancellationToken 的使用而导致的悬空指针问题。该问题的核心在于,当一个线程持有指向某个正在执行任务对象的指针,并通过CancellationToken请求取消该任务时,如果取消动作触发了对象的提前析构,而其他线程仍试图访问这个已被析构的对象,则会产生悬空指针异常。
use std::io::{Error, ErrorKind, Result};
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
mod memory;
pub(crate) async fn asyncify<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::task::spawn_blocking(f).await {
Ok(res) => res,
Err(e) => Err(Error::new(
ErrorKind::Other,
format!("background task failed: {:?}", e),
)),
}
}
fn check_err_size(e: libc::ssize_t) -> Result<usize> {
if e == -1_isize {
Err(Error::last_os_error())
} else {
Ok(e as usize)
}
}
pub fn libc_pread(raw_fd: usize, pos: u64, len: usize, ptr: u64) -> Result<usize> {
check_err_size(unsafe {
libc::pread(
raw_fd as std::os::fd::RawFd,
ptr as *mut _,
len as _,
pos as libc::off_t,
)
})
}
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
let len = data.len();
let ptr = data.as_ptr() as u64;
let fd = file.as_ref().as_raw_fd() as usize;
let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
Ok(len)
// libc_pread(fd, pos, len, ptr)
}
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];
println!("----- * before read");
let len = read_file(file, 0, &mut buf).await.unwrap();
println!("----- * after read");
len
}
async fn test_read() {
let cancel: CancellationToken = CancellationToken::new();
let file_path = PathBuf::from("./a_big_big_file");
let file = Arc::new(std::fs::File::open(file_path).unwrap());
println!("----- begin test ------------");
let can_tok = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = can_tok.cancelled() => {
println!("----- cancelled break loop");
break;
}
res = read_data_from_file(file.clone()) => {
println!("----- read data len: {}",res);
}
}
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
println!(" ----- cancel.cancel()");
tokio::time::sleep(tokio::time::Duration::from_millis(5 * 1000)).await;
println!("----- test over");
}
// #[cfg(unix)]
// #[global_allocator]
// static A: memory::DebugMemoryAlloc = memory::DebugMemoryAlloc;
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(4)
.thread_stack_size(4 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(async move {
test_read().await;
});
std::thread::sleep(std::time::Duration::from_secs(10000));
}
在上述例子程序中启动一个异步任务读取文件,然后在主线程中等待100ms取消任务。请注意读取文件的方式采用:tokio::task::spawn_blocking。同时准备一个超过2G大小的文件,确保100ms内读取不会完成。
-
运行程序,在函数read_data_from_file()中会发生如下现象
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];
println!("----- * before read"); // 会执行输出
let len = read_file(file, 0, &mut buf).await.unwrap();
println!("----- * after read"); // 不会执行输出
len
}
这说明异步任务被取消后,并没有等待read_file()执行完成后再取消,而是直接打断了read_data_from_file函数的执行。聪明的你有没有发现这将导致函数中局部变量buf的提前释放,但后面的异步读取文件还会继续使用这块内存,造成悬空指针问题。通过用文章后面附录提供的内存分配、释放跟踪可以证明这一点:异步任务被取消后局部内存立马释放。
-
我们可以尝试用阻塞式读取文件,而不是用tokio::task::spawn_blocking;也就是把读取相关代码改成如下方式,观察会发生什么现象。
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
let len = data.len();
let ptr = data.as_ptr() as u64;
let fd = file.as_ref().as_raw_fd() as usize;
// let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
// Ok(len)
libc_pread(fd, pos, len, ptr)
}
采用此种方式异步任务被取消后,会等待read_file()执行完成,不会造成悬空指针问题。因为在一个阻塞方法中,只有执行完成后异步任务才会有机会被取消。
重返现场
在CnosDB长期稳定性测试环境中我们会经常利用coredump文件排查故障。分析coredump文件发现每次挂掉的位置都不同,而且还经常挂掉在不同的第三方库,更甚至是一些不可能发生问题的代码处。我们也是怀疑内存写坏导致的,但苦于找不到具体原因,我们同时通过以下方式进行了排查:
- 分析近期的代码变更、并测试可疑的提交
- 排查可疑的第三方库
- 不同的编译、打包环境尝试
- 使用Valgrind进行测试排查
- 针对不同程序模块进行排查测试
- 添加调试、测试代码等
- 通过asan进行测试排查
最终,我们通过asan发现如下现象,说明存在内存释放后继续使用的问题。
然后,再通过addr2line找到相应调用栈定位到错误使用内存之处,也就是下面函数中的data参数被释放了但是继续使用。
虽然,通过工具发现了内存释放后继续使用的问题;但是,一时并没明白这块内存到底是怎么被释放的;最终,把怀疑的目光放到了CancellationToken这一块,然后通过测试确认了这一点。下面是具体使用之处,series_iter_closer是一个CancellationToken类型的变量,iter.next()会最终调用上面的pread()方法。
至此,由于悬空指针导致的惨案已经分析完毕。
解决方案
我们此处短期的解决方案是修改pread函数为直接调用os::pread(fd, pos, len, ptr)变为一个阻塞调用;长期会修改pread函数改为内部分配buffer的方式,不再通过外部传入参数来避免悬空指针问题。
附录
自定义内存分配释放器
extern crate core;
use core::alloc::{GlobalAlloc, Layout};
use libc::{c_int, c_void};
use tikv_jemalloc_sys as ffi;
#[cfg(all(any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mipsel",
target_arch = "powerpc"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 8;
#[cfg(all(any(
target_arch = "x86",
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "powerpc64le",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
target_arch = "sparc64"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 16;
fn layout_to_flags(align: usize, size: usize) -> c_int {
if align <= ALIGNOF_MAX_ALIGN_T && align <= size {
0
} else {
ffi::MALLOCX_ALIGN(align)
}
}
// Assumes a condition that always must hold.
macro_rules! assume {
($e:expr) => {
debug_assert!($e);
if !($e) {
core::hint::unreachable_unchecked();
}
};
}
#[derive(Copy, Clone, Default, Debug)]
pub struct DebugMemoryAlloc;
unsafe impl GlobalAlloc for DebugMemoryAlloc {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
let ptr = if flags == 0 {
ffi::malloc(layout.size())
} else {
ffi::mallocx(layout.size(), flags)
};
ptr as *mut u8
}
#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
let ptr = if flags == 0 {
ffi::calloc(1, layout.size())
} else {
ffi::mallocx(layout.size(), flags | ffi::MALLOCX_ZERO)
};
ptr as *mut u8
}
#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) };
let new_ptr = unsafe { self.alloc(new_layout) };
unsafe {
let size = std::cmp::min(layout.size(), new_size);
std::ptr::copy_nonoverlapping(ptr, new_ptr, size);
self.dealloc(ptr, layout);
}
new_ptr
}
#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
assume!(!ptr.is_null());
assume!(layout.size() != 0);
let flags = layout_to_flags(layout.align(), layout.size());
ffi::sdallocx(ptr as *mut c_void, layout.size(), flags);
if layout.size() >= 2*1024*1024*1024 {
panic!("-------- free big memory: {}", layout.size());
}
}
}
作者简介:
“Hi,我是允哥,一个内向且不善言谈的大厂老鸟,喜欢默默地在代码丛林中捉虫(Bug),目前是CnosDB的一名工程师。”
参与 CnosDB 社区交流群:
扫描下方二维码,加入 CC 进入 CnosDB 社区交流,CC 也会在群内分享直播链接哒