首页 » 博客 » CancellationToken:程序员必读的悬空指针灾难案例

CancellationToken:程序员必读的悬空指针灾难案例

在现代多线程和异步编程中,CancellationToken是一个常见的工具,用于及时、安全地取消长时间运行的任务或操作。然而,在处理此类对象时,如果不慎操作,也可能导致悬空指针这一严重问题,进而引发程序崩溃或者难以预测的行为。本文将通过分析一个Rust环境下的实际案例,探讨由使用 tokio_util::sync::CancellationToken 所引起的悬空指针问题。

CancellationToken:程序员必读的悬空指针灾难案例

 

案情重现与分析

在多线程异步任务程序开发过程中,我们遭遇了一场由于 CancellationToken 的使用而导致的悬空指针问题。该问题的核心在于,当一个线程持有指向某个正在执行任务对象的指针,并通过CancellationToken请求取消该任务时,如果取消动作触发了对象的提前析构,而其他线程仍试图访问这个已被析构的对象,则会产生悬空指针异常。

use std::io::{Error, ErrorKind, Result};
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;

use tokio_util::sync::CancellationToken;

mod memory;

pub(crate) async fn asyncify<F, T>(f: F) -> Result<T>
where
    F: FnOnce() -> Result<T> + Send + 'static,
    T: Send + 'static,
{
    match tokio::task::spawn_blocking(f).await {
        Ok(res) => res,
        Err(e) => Err(Error::new(
            ErrorKind::Other,
            format!("background task failed: {:?}", e),
        )),
    }
}

fn check_err_size(e: libc::ssize_t) -> Result<usize> {
    if e == -1_isize {
        Err(Error::last_os_error())
    } else {
        Ok(e as usize)
    }
}

pub fn libc_pread(raw_fd: usize, pos: u64, len: usize, ptr: u64) -> Result<usize> {
    check_err_size(unsafe {
        libc::pread(
            raw_fd as std::os::fd::RawFd,
            ptr as *mut _,
            len as _,
            pos as libc::off_t,
        )
    })
}

async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
    let len = data.len();
    let ptr = data.as_ptr() as u64;
    let fd = file.as_ref().as_raw_fd() as usize;

    let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
    Ok(len)

    // libc_pread(fd, pos, len, ptr)
}

async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
    let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];

    println!("----- * before read");
    let len = read_file(file, 0, &mut buf).await.unwrap();
    println!("----- * after read");

    len
}

async fn test_read() {
    let cancel: CancellationToken = CancellationToken::new();

    let file_path = PathBuf::from("./a_big_big_file");
    let file = Arc::new(std::fs::File::open(file_path).unwrap());

    println!("----- begin test ------------");
    let can_tok = cancel.clone();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = can_tok.cancelled() => {
                    println!("----- cancelled break loop");
                    break;
                }

                 res = read_data_from_file(file.clone()) => {
                    println!("----- read data len: {}",res);
                 }
            }
        }
    });

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    cancel.cancel();
    println!(" ----- cancel.cancel()");

    tokio::time::sleep(tokio::time::Duration::from_millis(5 * 1000)).await;
    println!("----- test over");
}

// #[cfg(unix)]
// #[global_allocator]
// static A: memory::DebugMemoryAlloc = memory::DebugMemoryAlloc;

fn main() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .worker_threads(4)
        .thread_stack_size(4 * 1024 * 1024)
        .build()
        .unwrap();

    rt.block_on(async move {
        test_read().await;
    });

    std::thread::sleep(std::time::Duration::from_secs(10000));
}

在上述例子程序中启动一个异步任务读取文件,然后在主线程中等待100ms取消任务。请注意读取文件的方式采用:tokio::task::spawn_blocking。同时准备一个超过2G大小的文件,确保100ms内读取不会完成。

  1. 运行程序,在函数read_data_from_file()中会发生如下现象
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize {
    let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024];

    println!("----- * before read"); // 会执行输出
    let len = read_file(file, 0, &mut buf).await.unwrap();
    println!("----- * after read"); // 不会执行输出

    len
}

这说明异步任务被取消后,并没有等待read_file()执行完成后再取消,而是直接打断了read_data_from_file函数的执行。聪明的你有没有发现这将导致函数中局部变量buf的提前释放,但后面的异步读取文件还会继续使用这块内存,造成悬空指针问题。通过用文章后面附录提供的内存分配、释放跟踪可以证明这一点:异步任务被取消后局部内存立马释放。

  1. 我们可以尝试用阻塞式读取文件,而不是用tokio::task::spawn_blocking;也就是把读取相关代码改成如下方式,观察会发生什么现象。
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> {
    let len = data.len();
    let ptr = data.as_ptr() as u64;
    let fd = file.as_ref().as_raw_fd() as usize;

    // let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?;
    // Ok(len)

    libc_pread(fd, pos, len, ptr)
}

采用此种方式异步任务被取消后,会等待read_file()执行完成,不会造成悬空指针问题。因为在一个阻塞方法中,只有执行完成后异步任务才会有机会被取消。

重返现场

在CnosDB长期稳定性测试环境中我们会经常利用coredump文件排查故障。分析coredump文件发现每次挂掉的位置都不同,而且还经常挂掉在不同的第三方库,更甚至是一些不可能发生问题的代码处。我们也是怀疑内存写坏导致的,但苦于找不到具体原因,我们同时通过以下方式进行了排查:

  1. 分析近期的代码变更、并测试可疑的提交
  2. 排查可疑的第三方库
  3. 不同的编译、打包环境尝试
  4. 使用Valgrind进行测试排查
  5. 针对不同程序模块进行排查测试
  6. 添加调试、测试代码等
  7. 通过asan进行测试排查

最终,我们通过asan发现如下现象,说明存在内存释放后继续使用的问题。

然后,再通过addr2line找到相应调用栈定位到错误使用内存之处,也就是下面函数中的data参数被释放了但是继续使用。

虽然,通过工具发现了内存释放后继续使用的问题;但是,一时并没明白这块内存到底是怎么被释放的;最终,把怀疑的目光放到了CancellationToken这一块,然后通过测试确认了这一点。下面是具体使用之处,series_iter_closer是一个CancellationToken类型的变量,iter.next()会最终调用上面的pread()方法。

至此,由于悬空指针导致的惨案已经分析完毕。

解决方案

我们此处短期的解决方案是修改pread函数为直接调用os::pread(fd, pos, len, ptr)变为一个阻塞调用;长期会修改pread函数改为内部分配buffer的方式,不再通过外部传入参数来避免悬空指针问题。

 

附录

自定义内存分配释放器

extern crate core;
use core::alloc::{GlobalAlloc, Layout};

use libc::{c_int, c_void};
use tikv_jemalloc_sys as ffi;

#[cfg(all(any(
    target_arch = "arm",
    target_arch = "mips",
    target_arch = "mipsel",
    target_arch = "powerpc"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 8;
#[cfg(all(any(
    target_arch = "x86",
    target_arch = "x86_64",
    target_arch = "aarch64",
    target_arch = "powerpc64",
    target_arch = "powerpc64le",
    target_arch = "mips64",
    target_arch = "riscv64",
    target_arch = "s390x",
    target_arch = "sparc64"
)))]
const ALIGNOF_MAX_ALIGN_T: usize = 16;

fn layout_to_flags(align: usize, size: usize) -> c_int {
    if align <= ALIGNOF_MAX_ALIGN_T && align <= size {
        0
    } else {
        ffi::MALLOCX_ALIGN(align)
    }
}

// Assumes a condition that always must hold.
macro_rules! assume {
    ($e:expr) => {
        debug_assert!($e);
        if !($e) {
            core::hint::unreachable_unchecked();
        }
    };
}

#[derive(Copy, Clone, Default, Debug)]
pub struct DebugMemoryAlloc;

unsafe impl GlobalAlloc for DebugMemoryAlloc {
    #[inline]
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        assume!(layout.size() != 0);
        let flags = layout_to_flags(layout.align(), layout.size());
        let ptr = if flags == 0 {
            ffi::malloc(layout.size())
        } else {
            ffi::mallocx(layout.size(), flags)
        };

        ptr as *mut u8
    }

    #[inline]
    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        assume!(layout.size() != 0);
        let flags = layout_to_flags(layout.align(), layout.size());
        let ptr = if flags == 0 {
            ffi::calloc(1, layout.size())
        } else {
            ffi::mallocx(layout.size(), flags | ffi::MALLOCX_ZERO)
        };

        ptr as *mut u8
    }

    #[inline]
    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) };
        let new_ptr = unsafe { self.alloc(new_layout) };

        unsafe {
            let size = std::cmp::min(layout.size(), new_size);
            std::ptr::copy_nonoverlapping(ptr, new_ptr, size);
            self.dealloc(ptr, layout);
        }

        new_ptr
    }

    #[inline]
    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        assume!(!ptr.is_null());
        assume!(layout.size() != 0);

        let flags = layout_to_flags(layout.align(), layout.size());
        ffi::sdallocx(ptr as *mut c_void, layout.size(), flags);

        if layout.size() >=  2*1024*1024*1024 {
            panic!("-------- free big memory: {}", layout.size());
        }
    }
}

 

作者简介:

“Hi,我是允哥,一个内向且不善言谈的大厂老鸟,喜欢默默地在代码丛林中捉虫(Bug),目前是CnosDB的一名工程师。”

 

参与 CnosDB 社区交流群:

 

扫描下方二维码,加入 CC 进入 CnosDB 社区交流,CC 也会在群内分享直播链接哒