Skip to content

fix: Workaround Non-Threadsafe Use of Rc in bam::record::Record #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions src/bam/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

use std::collections::{vec_deque, VecDeque};
use std::mem;
use std::rc::Rc;
use std::str;

use crate::bam;
Expand All @@ -20,15 +19,14 @@ use crate::errors::{Error, Result};
#[derive(Debug)]
pub struct RecordBuffer {
reader: bam::IndexedReader,
inner: VecDeque<Rc<bam::Record>>,
overflow: Option<Rc<bam::Record>>,
inner: VecDeque<bam::Record>,
overflow: Option<bam::Record>,
cache_cigar: bool,
min_refetch_distance: u64,
buffer_record: Rc<bam::Record>,
buffer_record: bam::Record,
}

unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}

impl RecordBuffer {
/// Create a new `RecordBuffer`.
Expand All @@ -44,7 +42,7 @@ impl RecordBuffer {
overflow: None,
cache_cigar,
min_refetch_distance: 1,
buffer_record: Rc::new(bam::Record::new()),
buffer_record: bam::Record::new(),
}
}

Expand Down Expand Up @@ -78,9 +76,9 @@ impl RecordBuffer {
pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
let mut added = 0;
// move overflow from last fetch into ringbuffer
if self.overflow.is_some() {
if let Some(overflow) = self.overflow.take() {
added += 1;
self.inner.push_back(self.overflow.take().unwrap());
self.inner.push_back(overflow);
}

if let Some(tid) = self.reader.header.tid(chrom) {
Expand Down Expand Up @@ -110,10 +108,7 @@ impl RecordBuffer {

// extend to the right
loop {
match self
.reader
.read(Rc::get_mut(&mut self.buffer_record).unwrap())
{
match self.reader.read(&mut self.buffer_record) {
None => break,
Some(res) => res?,
}
Expand All @@ -131,10 +126,10 @@ impl RecordBuffer {

// Record is kept, do not reuse it for next iteration
// and thus create a new one.
let mut record = mem::replace(&mut self.buffer_record, Rc::new(bam::Record::new()));
let mut record = mem::take(&mut self.buffer_record);

if self.cache_cigar {
Rc::get_mut(&mut record).unwrap().cache_cigar();
record.cache_cigar();
}

if pos >= end as i64 {
Expand All @@ -155,12 +150,12 @@ impl RecordBuffer {
}

/// Iterate over records that have been fetched with `fetch`.
pub fn iter(&self) -> vec_deque::Iter<Rc<bam::Record>> {
pub fn iter(&self) -> vec_deque::Iter<bam::Record> {
self.inner.iter()
}

/// Iterate over mutable references to records that have been fetched with `fetch`.
pub fn iter_mut(&mut self) -> vec_deque::IterMut<Rc<bam::Record>> {
pub fn iter_mut(&mut self) -> vec_deque::IterMut<bam::Record> {
self.inner.iter_mut()
}

Expand Down
36 changes: 11 additions & 25 deletions src/bam/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,10 @@ pub trait Read: Sized {
#[derive(Debug)]
pub struct Reader {
htsfile: *mut htslib::htsFile,
header: Rc<HeaderView>,
header: HeaderView,
tpool: Option<ThreadPool>,
}

unsafe impl Send for Reader {}

impl Reader {
/// Create a new Reader from path.
///
Expand Down Expand Up @@ -271,7 +269,7 @@ impl Reader {

Ok(Reader {
htsfile,
header: Rc::new(HeaderView::new(header)),
header: HeaderView::new(header),
tpool: None,
})
}
Expand Down Expand Up @@ -355,11 +353,7 @@ impl Read for Reader {
-1 => None,
-2 => Some(Err(Error::BamTruncatedRecord)),
-4 => Some(Err(Error::BamInvalidRecord)),
_ => {
record.set_header(Rc::clone(&self.header));

Some(Ok(()))
}
_ => Some(Ok(())),
}
}

Expand Down Expand Up @@ -562,14 +556,12 @@ impl<'a, T: AsRef<[u8]>, X: Into<FetchCoordinate>, Y: Into<FetchCoordinate>> Fro
#[derive(Debug)]
pub struct IndexedReader {
htsfile: *mut htslib::htsFile,
header: Rc<HeaderView>,
header: HeaderView,
idx: *mut htslib::hts_idx_t,
itr: Option<*mut htslib::hts_itr_t>,
tpool: Option<ThreadPool>,
}

unsafe impl Send for IndexedReader {}

impl IndexedReader {
/// Create a new Reader from path.
///
Expand Down Expand Up @@ -608,7 +600,7 @@ impl IndexedReader {
} else {
Ok(IndexedReader {
htsfile,
header: Rc::new(HeaderView::new(header)),
header: HeaderView::new(header),
idx,
itr: None,
tpool: None,
Expand Down Expand Up @@ -636,7 +628,7 @@ impl IndexedReader {
} else {
Ok(IndexedReader {
htsfile,
header: Rc::new(HeaderView::new(header)),
header: HeaderView::new(header),
idx,
itr: None,
tpool: None,
Expand Down Expand Up @@ -793,11 +785,7 @@ impl Read for IndexedReader {
-1 => None,
-2 => Some(Err(Error::BamTruncatedRecord)),
-4 => Some(Err(Error::BamInvalidRecord)),
_ => {
record.set_header(Rc::clone(&self.header));

Some(Ok(()))
}
_ => Some(Ok(())),
}
}
None => None,
Expand Down Expand Up @@ -878,12 +866,10 @@ impl Format {
#[derive(Debug)]
pub struct Writer {
f: *mut htslib::htsFile,
header: Rc<HeaderView>,
header: HeaderView,
tpool: Option<ThreadPool>,
}

unsafe impl Send for Writer {}

impl Writer {
/// Create a new SAM/BAM/CRAM file.
///
Expand Down Expand Up @@ -955,7 +941,7 @@ impl Writer {

Ok(Writer {
f,
header: Rc::new(HeaderView::new(header_record)),
header: HeaderView::new(header_record),
tpool: None,
})
}
Expand Down Expand Up @@ -1097,7 +1083,7 @@ impl<'a, R: Read> Iterator for RcRecords<'a, R> {
type Item = Result<Rc<record::Record>>;

fn next(&mut self) -> Option<Self::Item> {
let mut record = match Rc::get_mut(&mut self.record) {
let record = match Rc::get_mut(&mut self.record) {
//not make_mut, we don't need a clone
Some(x) => x,
None => {
Expand All @@ -1106,7 +1092,7 @@ impl<'a, R: Read> Iterator for RcRecords<'a, R> {
}
};

match self.reader.read(&mut record) {
match self.reader.read(record) {
None => None,
Some(Ok(_)) => Some(Ok(Rc::clone(&self.record))),
Some(Err(err)) => Some(Err(err)),
Expand Down
57 changes: 38 additions & 19 deletions src/bam/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::marker::PhantomData;
use std::mem::{size_of, MaybeUninit};
use std::ops;
use std::os::raw::c_char;
use std::rc::Rc;
use std::slice;
use std::str;
use std::u32;
Expand Down Expand Up @@ -53,7 +52,6 @@ pub struct Record {
pub inner: htslib::bam1_t,
own: bool,
cigar: Option<CigarStringView>,
header: Option<Rc<HeaderView>>,
}

unsafe impl Send for Record {}
Expand Down Expand Up @@ -117,7 +115,6 @@ impl Record {
inner: unsafe { MaybeUninit::zeroed().assume_init() },
own: true,
cigar: None,
header: None,
}
}

Expand All @@ -137,7 +134,6 @@ impl Record {
},
own: false,
cigar: None,
header: None,
}
}

Expand Down Expand Up @@ -172,8 +168,29 @@ impl Record {
}
}

pub fn set_header(&mut self, header: Rc<HeaderView>) {
self.header = Some(header);
/// # Example
///
/// ```
/// use std::path::Path;
///
/// use bio_types::genome::AbstractInterval;
/// use rust_htslib::bam::{self, Read};
///
/// let mut bam_reader = bam::Reader::from_path(&Path::new("test/test.bam")).expect("Error opening file.");
/// let header = bam_reader.header().clone();
/// for record in bam_reader.records() {
/// let mut rec = record.expect("Expected valid record");
/// assert_eq!("CHROMOSOME_I", rec.supply_header(&header).contig());
/// }
/// ```
pub fn supply_header<'record, 'header>(
&'record self,
header: &'header HeaderView,
) -> HeaderRecord<'record, 'header> {
HeaderRecord {
record: self,
header,
}
}

pub(super) fn data(&self) -> &[u8] {
Expand Down Expand Up @@ -1133,36 +1150,38 @@ impl SequenceRead for Record {
}
}

impl genome::AbstractInterval for Record {
/// Struct that holds references to [`Record`] and [`HeaderView`] in order to
/// be able to impl [`genome::AbstractInterval`] for it.
/// Can be constructed via [`Record::supply_header()`].
pub struct HeaderRecord<'record, 'header> {
record: &'record Record,
header: &'header HeaderView,
}

impl genome::AbstractInterval for HeaderRecord<'_, '_> {
/// Return contig name. Panics if record does not know its header (which happens if it has not been read from a file).
fn contig(&self) -> &str {
let tid = self.tid();
let tid = self.record.tid();
if tid < 0 {
panic!("invalid tid, must be at least zero");
}
str::from_utf8(
self.header
.as_ref()
.expect(
"header must be set (this is the case if the record has been read from a file)",
)
.tid2name(tid as u32),
)
.expect("unable to interpret contig name as UTF-8")
str::from_utf8(self.header.tid2name(tid as u32))
.expect("unable to interpret contig name as UTF-8")
}

/// Return genomic range covered by alignment. Panics if `Record::cache_cigar()` has not been called first or `Record::pos()` is less than zero.
fn range(&self) -> ops::Range<genome::Position> {
let end_pos = self
.record
.cigar_cached()
.expect("cigar has not been cached yet, call cache_cigar() first")
.end_pos() as u64;

if self.pos() < 0 {
if self.record.pos() < 0 {
panic!("invalid position, must be positive")
}

self.pos() as u64..end_pos
self.record.pos() as u64..end_pos
}
}

Expand Down