[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ReadsHtsgetData source, refactor HtsgetReader #6662

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a476ce7
Implement ReadsHtsgetData source, refactor HtsgetReader
andersleung Jun 16, 2020
ff50eec
Update HtsgetReader command line tests
andersleung Jun 16, 2020
aa4d48a
Commit missing files
andersleung Jun 16, 2020
ba2dd80
Add javadoc, further refactoring to allow easier testing, lazily stre…
andersleung Jun 17, 2020
0cb9530
Use MergingSamRecordIterator internally to ensure proper ordering of …
andersleung Jun 24, 2020
dce337f
Close out all iterators, request headers asynchronously, minor refact…
andersleung Jun 30, 2020
edda05c
Perform map insertion outside of future to avoid concurrent modification
andersleung Jun 30, 2020
3415528
Address PR comments
andersleung Jul 6, 2020
74f7d70
Fix test
andersleung Jul 6, 2020
890da67
WIP Start adding ReadsHtsgetDataSource tests
andersleung Jul 10, 2020
c0cc6ee
Merge branch 'master' into readsHtsgetDataSource
andersleung Jul 10, 2020
8afa04b
WIP fix broken tests
andersleung Jul 13, 2020
946bb4b
WIP Add tests for filtering duplicates, try 127.0.0.1 instead of loca…
andersleung Jul 13, 2020
100b0a4
WIP Try spawning sibling docker container for refserver
andersleung Jul 13, 2020
c6a36cf
WIP try 0.0.0.0:3000 instead of 127.0.0.1
andersleung Jul 13, 2020
1022cbe
WIP Specify 127.0.0.1 in port mapping when running docker
andersleung Jul 13, 2020
062d9ca
WIP configure refserver to listen on 0.0.0.0
andersleung Jul 13, 2020
f166712
WIP configure all IP addresses to 0.0.0.0:3000
andersleung Jul 14, 2020
37eb57d
WIP run test container with net=host
andersleung Jul 14, 2020
1e77370
Add comment to .travis.yml
andersleung Jul 14, 2020
f71ea84
Add end to end tests in PrintReads using htsget source
andersleung Jul 24, 2020
7967843
Merge branch 'master' into readsHtsgetDataSource
andersleung Jul 24, 2020
c437c29
WIP use htsjdk HtsgetBAMFileReader, refactor ReadsPathDataSource to u…
andersleung Aug 18, 2020
27e7dc7
Merge branch 'master' into readsHtsgetDataSource
andersleung Aug 18, 2020
a32a9e3
Add readme to htsgetScripts, move htsget_config.json
andersleung Aug 19, 2020
ea540d1
Remove ReadsHtsgetDataSource and GATK versions of htsget classes
andersleung Aug 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,11 @@ else if (hasCramInput()) {
final Map<String, List<GATKPath>> pathsByScheme = readArguments.getReadPathSpecifiers().stream()
.collect(Collectors.groupingBy(path -> path.getURI().getScheme()));

if (pathsByScheme.get(GATKPath.HTSGET_SCHEME) != null) {
final List<GATKPath> htsgetPaths = pathsByScheme.get(GATKPath.HTSGET_SCHEME);
if (htsgetPaths != null) {
if (htsgetPaths.size() != readArguments.getReadPathSpecifiers().size()) {
throw new UserException.UnimplementedFeature("A combination of htsget sources and other sources is currently not supported.");
}
reads = new ReadsHtsgetDataSource(pathsByScheme.get(GATKPath.HTSGET_SCHEME), factory);
} else {
andersleung marked this conversation as resolved.
Show resolved Hide resolved
reads = new ReadsPathDataSource(readArguments.getReadPaths(), readArguments.getReadIndexPaths(), factory, cloudPrefetchBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* Manages traversals and queries over sources of reads which are accessible via {@link GATKPath}s pointing to a file
* behind an htsget server
* (for now, SAM/BAM/CRAM files only).
* (for now, BAM/CRAM files only).
*
* Two basic operations are available:
*
Expand All @@ -38,6 +38,8 @@
public final class ReadsHtsgetDataSource implements ReadsDataSource {
private static final Logger logger = LogManager.getLogger(ReadsHtsgetDataSource.class);

private static final int numThreads = 8;
andersleung marked this conversation as resolved.
Show resolved Hide resolved

/**
* The sources provided to this data source
*/
Expand Down Expand Up @@ -127,49 +129,69 @@ public ReadsHtsgetDataSource(final List<GATKPath> sources, final SamReaderFactor
Utils.nonNull(sources);
Utils.nonEmpty(sources, "ReadsHtsgetDataSource cannot be created from empty source list");
andersleung marked this conversation as resolved.
Show resolved Hide resolved

final String nonHtsgetSources = sources.stream()
.filter(source -> !source.getScheme().equals(GATKPath.HTSGET_SCHEME))
.map(GATKPath::toString)
.collect(Collectors.joining(", "));

if (!nonHtsgetSources.isEmpty()) {
throw new UserException("This source can only be instantiated from htsget paths: " + nonHtsgetSources);
}

this.readerFactory =
customSamReaderFactory == null
? SamReaderFactory.makeDefault().validationStringency(ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY)
: customSamReaderFactory;

this.sources = sources;
this.headers = new HashMap<>(sources.size());
this.headers = new LinkedHashMap<>(sources.size());
this.iterators = new ArrayList<>();

this.executorService = Executors.newFixedThreadPool(8, new ThreadFactoryBuilder()
.setNameFormat("reads-htsget-data-source-thread-%d")
this.executorService = Executors.newFixedThreadPool(ReadsHtsgetDataSource.numThreads, new ThreadFactoryBuilder()
.setNameFormat("htsget-reader-thread-%d")
.setDaemon(true)
.build());

andersleung marked this conversation as resolved.
Show resolved Hide resolved
final List<Future<AbstractMap.SimpleImmutableEntry<GATKPath, SAMFileHeader>>> futures = new ArrayList<>(sources.size());
final List<Future<Map.Entry<GATKPath, SAMFileHeader>>> futures = new ArrayList<>(sources.size());

logger.info("Downloading headers from htsget.");

for (final GATKPath source : sources) {
futures.add(this.executorService.submit(() -> {
try {
// Ensure each source can be obtained from htsget server
final HtsgetRequest req = new HtsgetRequest(source).withDataClass(HtsgetClass.header);
// Request only the headers and use them to construct SAMFileHeader for each source
final InputStream headerStream = req.getResponse().getDataStream();
return new AbstractMap.SimpleImmutableEntry<>(source, readerFactory.open(SamInputResource.of(headerStream)).getFileHeader());
try (final InputStream headerStream = req.getResponse().getDataStream()) {
final SAMFileHeader header = readerFactory.open(SamInputResource.of(headerStream)).getFileHeader();
return new AbstractMap.SimpleImmutableEntry<>(source, header);
}
} catch (final UserException e) {
throw new UserException(source.toString(), e);
throw new UserException("Failed to load header from htsget source " + source.toString(), e);
}
}));
}

try {
for (final Future<AbstractMap.SimpleImmutableEntry<GATKPath, SAMFileHeader>> future : futures) {
final AbstractMap.SimpleImmutableEntry<GATKPath, SAMFileHeader> entry = future.get();
for (final Future<Map.Entry<GATKPath, SAMFileHeader>> future : futures) {
final Map.Entry<GATKPath, SAMFileHeader> entry = future.get();
this.headers.put(entry.getKey(), entry.getValue());
}
} catch (final ExecutionException | InterruptedException e) {
throw new UserException("Interrupted while initializing iterator", e);
}

logger.info("Finished loading headers from htsget.");

if (sources.size() > 1) {
this.header = createHeaderMerger(this.headers.values()).getMergedHeader();
} else {
this.header = this.headers.values().iterator().next();
}

if (this.header.getSortOrder() != SAMFileHeader.SortOrder.coordinate) {
logger.warn("Files not in coordinate sorted order");
}
}

/**
Expand Down Expand Up @@ -234,14 +256,18 @@ public Iterator<GATKRead> query(final SimpleInterval interval) {

/**
* @return An iterator over just the unmapped reads with no assigned position. This operation is not affected
* by prior calls to {@link #setTraversalBounds}. The underlying file must be indexed.
* by prior calls to {@link #setTraversalBounds}.
*/
@Override
public Iterator<GATKRead> queryUnmapped() {
return this.prepareIteratorsForTraversal(null, true);
}

private Iterator<GATKRead> prepareIteratorsForTraversal(final List<SimpleInterval> intervals, final boolean traverseUnmapped) {
if (this.executorService.isShutdown()) {
throw new UserException("This data source has already been shut down.");
}

final Map<SamReader, CloseableIterator<SAMRecord>> mapping = intervals == null
? this.getReaders(traverseUnmapped)
: this.getReadersWithIntervals(intervals, traverseUnmapped);
Expand All @@ -253,35 +279,41 @@ private Iterator<GATKRead> prepareIteratorsForTraversal(final List<SimpleInterva
if (mapping.size() == 1) {
samRecordIterator = mapping.values().iterator().next();
} else {
final Set<SAMFileHeader> headers = mapping.keySet().stream().map(SamReader::getFileHeader).collect(Collectors.toSet());
samRecordIterator = new MergingSamRecordIterator(this.createHeaderMerger(headers), mapping, true);
final Set<SAMFileHeader> headers = mapping.keySet().stream()
.map(SamReader::getFileHeader)
.collect(Collectors.toCollection(LinkedHashSet::new));
samRecordIterator = new MergingSamRecordIterator(createHeaderMerger(headers), mapping, true);
}
this.iterators.add(samRecordIterator);
return new SAMRecordToReadIterator(samRecordIterator);
}

private Map<SamReader, CloseableIterator<SAMRecord>> getReaders(final boolean traverseUnmapped) {
final Map<SamReader, CloseableIterator<SAMRecord>> mapping = new ConcurrentHashMap<>(this.sources.size());
final List<Future<?>> futures = new ArrayList<>(this.sources.size());
this.sources.parallelStream()
.forEach(source -> futures.add(this.executorService.submit(() -> {
final HtsgetRequest req = new HtsgetRequest(source);
if (traverseUnmapped) req.setInterval(HtsgetRequest.UNMAPPED_UNPLACED_INTERVAL);
final SamReader reader = this.readerFactory.open(SamInputResource.of(req.getResponse().getDataStream()));
mapping.put(reader, this.wrapIteratorWithClose(reader.iterator(), reader));
})));
private Map<SamReader, CloseableIterator<SAMRecord>> getReaders(final boolean onlyUnplacedUnmapped) {
final Map<SamReader, CloseableIterator<SAMRecord>> mapping = new LinkedHashMap<>(this.sources.size());
final List<Future<Map.Entry<SamReader, CloseableIterator<SAMRecord>>>> futures = new ArrayList<>(this.sources.size());
this.sources.forEach(source -> futures.add(this.executorService.submit(() -> {
final HtsgetRequest req = new HtsgetRequest(source);
if (onlyUnplacedUnmapped) {
req.setInterval(HtsgetRequest.UNMAPPED_UNPLACED_INTERVAL);
}
final SamReader reader = this.readerFactory.open(SamInputResource.of(req.getResponse().getDataStream()));
return new AbstractMap.SimpleImmutableEntry<>(reader, wrapIteratorWithClose(reader.iterator(), reader));
})));
try {
for (final Future<?> future : futures) future.get();
for (final Future<Map.Entry<SamReader, CloseableIterator<SAMRecord>>> future : futures) {
final Map.Entry<SamReader, CloseableIterator<SAMRecord>> entry = future.get();
mapping.put(entry.getKey(), entry.getValue());
}
} catch (final ExecutionException | InterruptedException e) {
throw new UserException("Interrupted while initializing iterator", e);
}
return mapping;
}

private Map<SamReader, CloseableIterator<SAMRecord>> getReadersWithIntervals(final List<SimpleInterval> intervals, final boolean traverseUnmapped) {
final Map<SamReader, CloseableIterator<SAMRecord>> mapping = new ConcurrentHashMap<>(this.sources.size() * intervals.size());
private Map<SamReader, CloseableIterator<SAMRecord>> getReadersWithIntervals(final List<SimpleInterval> intervals, final boolean includeUnplacedUnmapped) {
final Map<SamReader, CloseableIterator<SAMRecord>> mapping = new LinkedHashMap<>(this.sources.size() * intervals.size());
SimpleInterval prevInterval = null;
final List<Future<?>> futures = new ArrayList<>(this.sources.size() * intervals.size());
final List<Future<Map.Entry<SamReader, CloseableIterator<SAMRecord>>>> futures = new ArrayList<>(this.sources.size() * intervals.size());
for (final SimpleInterval interval : intervals) {
final SimpleInterval finalPrevInterval = prevInterval;
this.sources.parallelStream()
Expand All @@ -291,24 +323,27 @@ private Map<SamReader, CloseableIterator<SAMRecord>> getReadersWithIntervals(fin
.forEach(source -> futures.add(this.executorService.submit(() -> {
final HtsgetRequest req = new HtsgetRequest(source).withInterval(interval);
final SamReader reader = this.readerFactory.open(SamInputResource.of(req.getResponse().getDataStream()));
mapping.put(reader, this.getIterWithInterval(reader, interval, finalPrevInterval));
return new AbstractMap.SimpleEntry<>(reader, getIterWithInterval(reader, interval, finalPrevInterval));
})));
prevInterval = interval;
}
try {
for (final Future<?> future : futures) future.get();
for (final Future<Map.Entry<SamReader, CloseableIterator<SAMRecord>>> future : futures) {
final Map.Entry<SamReader, CloseableIterator<SAMRecord>> entry = future.get();
mapping.put(entry.getKey(), entry.getValue());
}
} catch (final ExecutionException | InterruptedException e) {
throw new UserException("Interrupted while initializing iterator", e);
}
if (traverseUnmapped) {
if (includeUnplacedUnmapped) {
mapping.putAll(this.getReaders(true));
}
return mapping;
}

@Override
public boolean supportsSerialIteration() {
andersleung marked this conversation as resolved.
Show resolved Hide resolved
return false;
return true;
}

/**
Expand All @@ -331,7 +366,7 @@ public SAMFileHeader getHeader() {
return this.header;
}

private CloseableIterator<SAMRecord> getIterWithInterval(final SamReader samReader, final SimpleInterval currInterval, final SimpleInterval prevInterval) {
private static CloseableIterator<SAMRecord> getIterWithInterval(final SamReader samReader, final SimpleInterval currInterval, final SimpleInterval prevInterval) {
/*
To remove reads duplicated across two subsequent intervals, we take any read which is in
the current interval but NOT in the previous interval, unless the current interval is the first,
Expand All @@ -348,7 +383,7 @@ public boolean filterOut(final SAMRecord first, final SAMRecord second) {
throw new UnsupportedOperationException();
}
});
return this.wrapIteratorWithClose(filteredSamRecords, samReader);
return wrapIteratorWithClose(filteredSamRecords, samReader);
}

/**
Expand All @@ -357,7 +392,7 @@ public boolean filterOut(final SAMRecord first, final SAMRecord second) {
* @param samReader the SamReader to close once the iterator has been used up
* @return a wrapped CloseableIterator
*/
private CloseableIterator<SAMRecord> wrapIteratorWithClose(final CloseableIterator<SAMRecord> iterator, final SamReader samReader) {
private static CloseableIterator<SAMRecord> wrapIteratorWithClose(final CloseableIterator<SAMRecord> iterator, final SamReader samReader) {
return new DelegatingIterator<SAMRecord>(iterator) {
@Override
public void close() {
Expand All @@ -371,7 +406,8 @@ public void close() {
};
}

private SamFileHeaderMerger createHeaderMerger(final Collection<SAMFileHeader> headers) {
// TODO: Push this and following method down into htsjdk
private static SamFileHeaderMerger createHeaderMerger(final Collection<SAMFileHeader> headers) {
return new SamFileHeaderMerger(identifySortOrder(headers), headers, true);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.broadinstitute.hellbender.engine.filters;

import htsjdk.samtools.util.Locatable;
import htsjdk.samtools.util.OverlapDetector;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.ReadFilterArgumentDefinitions;
import org.broadinstitute.hellbender.utils.*;
import org.broadinstitute.hellbender.utils.GenomeLocParser;
import org.broadinstitute.hellbender.utils.GenomeLocSortedSet;
import org.broadinstitute.hellbender.utils.IntervalMergingRule;
import org.broadinstitute.hellbender.utils.IntervalSetRule;
import org.broadinstitute.hellbender.utils.IntervalUtils;
import org.broadinstitute.hellbender.utils.help.HelpConstants;
import org.broadinstitute.hellbender.utils.logging.OneShotLogger;
import org.broadinstitute.hellbender.utils.read.GATKRead;
Expand Down
Loading