[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ReadsHtsgetData source, refactor HtsgetReader #6662

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a476ce7
Implement ReadsHtsgetData source, refactor HtsgetReader
andersleung Jun 16, 2020
ff50eec
Update HtsgetReader command line tests
andersleung Jun 16, 2020
aa4d48a
Commit missing files
andersleung Jun 16, 2020
ba2dd80
Add javadoc, further refactoring to allow easier testing, lazily stre…
andersleung Jun 17, 2020
0cb9530
Use MergingSamRecordIterator internally to ensure proper ordering of …
andersleung Jun 24, 2020
dce337f
Close out all iterators, request headers asynchronously, minor refact…
andersleung Jun 30, 2020
edda05c
Perform map insertion outside of future to avoid concurrent modification
andersleung Jun 30, 2020
3415528
Address PR comments
andersleung Jul 6, 2020
74f7d70
Fix test
andersleung Jul 6, 2020
890da67
WIP Start adding ReadsHtsgetDataSource tests
andersleung Jul 10, 2020
c0cc6ee
Merge branch 'master' into readsHtsgetDataSource
andersleung Jul 10, 2020
8afa04b
WIP fix broken tests
andersleung Jul 13, 2020
946bb4b
WIP Add tests for filtering duplicates, try 127.0.0.1 instead of loca…
andersleung Jul 13, 2020
100b0a4
WIP Try spawning sibling docker container for refserver
andersleung Jul 13, 2020
c6a36cf
WIP try 0.0.0.0:3000 instead of 127.0.0.1
andersleung Jul 13, 2020
1022cbe
WIP Specify 127.0.0.1 in port mapping when running docker
andersleung Jul 13, 2020
062d9ca
WIP configure refserver to listen on 0.0.0.0
andersleung Jul 13, 2020
f166712
WIP configure all IP addresses to 0.0.0.0:3000
andersleung Jul 14, 2020
37eb57d
WIP run test container with net=host
andersleung Jul 14, 2020
1e77370
Add comment to .travis.yml
andersleung Jul 14, 2020
f71ea84
Add end to end tests in PrintReads using htsget source
andersleung Jul 24, 2020
7967843
Merge branch 'master' into readsHtsgetDataSource
andersleung Jul 24, 2020
c437c29
WIP use htsjdk HtsgetBAMFileReader, refactor ReadsPathDataSource to u…
andersleung Aug 18, 2020
27e7dc7
Merge branch 'master' into readsHtsgetDataSource
andersleung Aug 18, 2020
a32a9e3
Add readme to htsgetScripts, move htsget_config.json
andersleung Aug 19, 2020
ea540d1
Remove ReadsHtsgetDataSource and GATK versions of htsget classes
andersleung Aug 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class GATKPath extends PathSpecifier implements TaggedArgument, Serializa
private static final long serialVersionUID = 1L;

public static final String HDFS_SCHEME = "hdfs";
public static final String HTSGET_SCHEME = "htsget";

private String tagName;
private Map<String, String> tagAttributes;
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/broadinstitute/hellbender/engine/GATKTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.ArgumentCollection;
Expand Down Expand Up @@ -455,8 +456,16 @@ else if (hasCramInput()) {
factory = factory.enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES);
}

reads = new ReadsPathDataSource(readArguments.getReadPaths(), readArguments.getReadIndexPaths(), factory, cloudPrefetchBuffer,
(cloudIndexPrefetchBuffer < 0 ? cloudPrefetchBuffer : cloudIndexPrefetchBuffer));
final Map<String, List<GATKPath>> pathsByScheme = readArguments.getReadPathSpecifiers().stream()
.collect(Collectors.groupingBy(path -> path.getURI().getScheme()));

if (pathsByScheme.get(GATKPath.HTSGET_SCHEME) != null) {
reads = new ReadsHtsgetDataSource(pathsByScheme.get(GATKPath.HTSGET_SCHEME), factory);
} else {
andersleung marked this conversation as resolved.
Show resolved Hide resolved
reads = new ReadsPathDataSource(readArguments.getReadPaths(), readArguments.getReadIndexPaths(), factory, cloudPrefetchBuffer,
(cloudIndexPrefetchBuffer < 0 ? cloudPrefetchBuffer : cloudIndexPrefetchBuffer));
}

}
else {
reads = null;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package org.broadinstitute.hellbender.engine.filters;

import htsjdk.samtools.util.Locatable;
import htsjdk.samtools.util.OverlapDetector;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.ReadFilterArgumentDefinitions;
import org.broadinstitute.hellbender.utils.GenomeLocParser;
import org.broadinstitute.hellbender.utils.GenomeLocSortedSet;
import org.broadinstitute.hellbender.utils.IntervalMergingRule;
import org.broadinstitute.hellbender.utils.IntervalSetRule;
import org.broadinstitute.hellbender.utils.IntervalUtils;
import org.broadinstitute.hellbender.utils.*;
andersleung marked this conversation as resolved.
Show resolved Hide resolved
import org.broadinstitute.hellbender.utils.help.HelpConstants;
import org.broadinstitute.hellbender.utils.logging.OneShotLogger;
import org.broadinstitute.hellbender.utils.read.GATKRead;
Expand Down
208 changes: 49 additions & 159 deletions src/main/java/org/broadinstitute/hellbender/tools/HtsgetReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,12 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
Expand All @@ -37,13 +20,7 @@
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.ExampleProgramGroup;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetClass;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetErrorResponse;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetFormat;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetRequestBuilder;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetRequestField;
import org.broadinstitute.hellbender.tools.htsgetreader.HtsgetResponse;
import org.broadinstitute.hellbender.utils.HttpUtils;
import org.broadinstitute.hellbender.tools.htsgetreader.*;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.Utils;

Expand Down Expand Up @@ -75,7 +52,7 @@ public class HtsgetReader extends CommandLineProgram {
public static final String FIELDS_LONG_NAME = "field";
public static final String TAGS_LONG_NAME = "tag";
public static final String NOTAGS_LONG_NAME = "notag";
public static final String NUM_THREADS_LONG_NAME = "reader-threads";
public static final String PARALLEL_DOWNLOAD_LONG_NAME = "parallel";
public static final String CHECK_MD5_LONG_NAME = "check-md5";

@Argument(doc = "Output file.",
Expand Down Expand Up @@ -130,161 +107,74 @@ public class HtsgetReader extends CommandLineProgram {
private List<String> notags;

@Advanced
@Argument(fullName = NUM_THREADS_LONG_NAME,
shortName = NUM_THREADS_LONG_NAME,
doc = "How many simultaneous threads to use when reading data from an htsget response;" +
"higher values may improve performance when network latency is an issue.",
optional = true,
minValue = 1)
private int readerThreads = 1;
@Argument(doc = "Whether to try to download blocks in parallel",
fullName = PARALLEL_DOWNLOAD_LONG_NAME,
shortName = PARALLEL_DOWNLOAD_LONG_NAME,
optional = true)
private final boolean parallelDownload = false;

@Argument(fullName = CHECK_MD5_LONG_NAME, shortName = CHECK_MD5_LONG_NAME, doc = "Boolean determining whether to calculate the md5 digest of the assembled file "
+ "and validate it against the provided md5 hash, if it exists.", optional = true)
private boolean checkMd5 = false;

private ExecutorService executorService;

private CloseableHttpClient client;

@Override
public void onStartup() {
if (this.readerThreads > 1) {
logger.info("Initializing with " + this.readerThreads + " threads");
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("htsgetReader-thread-%d")
.setDaemon(true).build();
this.executorService = Executors.newFixedThreadPool(readerThreads, threadFactory);
}
this.client = HttpUtils.getClient();
}

@Override
public void onShutdown() {
if (this.executorService != null) {
this.executorService.shutdownNow();
}
super.onShutdown();
}

/**
* Downloads data blocks provided by response to outputFile in serial
*/
private void getData(final HtsgetResponse response) {
try (final OutputStream ostream = new FileOutputStream(this.outputFile)) {
response.getBlocks().forEach(b -> {
try (final InputStream istream = b.getData()) {
IOUtils.copy(istream, ostream);
} catch (final IOException e) {
throw new UserException("Failed to copy data block to output file", e);
}
});
} catch (final IOException e) {
throw new UserException("Could not create output file: " + outputFile, e);
}
}

/**
* Downloads data blocks provided by response to outputFile in parallel, using
* the number of threads specified by user
*/
private void getDataParallel(final HtsgetResponse response) {
final List<Future<InputStream>> futures = new ArrayList<>(response.getBlocks().size());
response.getBlocks().forEach(b -> futures.add(this.executorService.submit(b::getData)));

try (final OutputStream ostream = new FileOutputStream(this.outputFile)) {
futures.forEach(f -> {
try (final InputStream istream = f.get()) {
IOUtils.copy(istream, ostream);
} catch (final IOException e) {
throw new UserException("Error while copying data block to output file", e);
} catch (final ExecutionException | InterruptedException e) {
throw new UserException("Error while waiting to download block", e);
}
});
} catch (final IOException e) {
throw new UserException("Could not create output file", e);
}
}

/**
* Checks md5 digest provided in response, if one exists, against calculated md5
* hash of downloaded file, warning user if they differ
*/
private void checkMd5(final HtsgetResponse resp) {
final String expectedMd5 = resp.getMd5();
if (expectedMd5 == null) {
logger.warn("No md5 digest provided by response");
} else {
try {
final String actualMd5 = Utils.calculateFileMD5(outputFile);
if (!actualMd5.equals(expectedMd5)) {
throw new UserException("Expected md5: " + expectedMd5 + " did not match actual md5: " + actualMd5);
}
} catch (final IOException e) {
throw new UserException("Unable to calculate md5 digest", e);
private void checkMd5(final String expectedMd5) {
try {
final String actualMd5 = Utils.calculateFileMD5(this.outputFile);
if (!actualMd5.equals(expectedMd5)) {
throw new UserException("Expected md5: " + expectedMd5 + " and actual md5: " + actualMd5 + " do not match");
}
} catch (final IOException e) {
throw new UserException("Could not calculate md5 checksum from downloaded file", e);
}
}

private ObjectMapper getObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
mapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
return mapper;
}

@Override
public Object doWork() {
// construct request from command line args and convert to URI
final HtsgetRequestBuilder req = new HtsgetRequestBuilder(endpoint, id)
// Construct request from command line args and convert to URI
final HtsgetRequest req = new HtsgetRequest(endpoint, id)
.withFormat(format)
.withDataClass(dataClass)
.withInterval(interval)
.withFields(fields)
.withTags(tags)
.withNotags(notags);
final URI reqURI = req.toURI();

final HttpGet getReq = new HttpGet(reqURI);
try (final CloseableHttpResponse resp = this.client.execute(getReq)) {
// get content of response
final HttpEntity entity = resp.getEntity();
final Header encodingHeader = entity.getContentEncoding();
final Charset encoding = encodingHeader == null
? StandardCharsets.UTF_8
: Charsets.toCharset(encodingHeader.getValue());
final String jsonBody = EntityUtils.toString(entity, encoding);

final ObjectMapper mapper = this.getObjectMapper();

if (resp.getStatusLine() == null) {
throw new UserException("htsget server response did not contain status line");
}
final int statusCode = resp.getStatusLine().getStatusCode();
if (400 <= statusCode && statusCode < 500) {
final HtsgetErrorResponse err = mapper.readValue(jsonBody, HtsgetErrorResponse.class);
throw new UserException("Invalid request, received error code: " + statusCode + ", error type: "
+ err.getError() + ", message: " + err.getMessage());
} else if (statusCode == 200) {
final HtsgetResponse response = mapper.readValue(jsonBody, HtsgetResponse.class);

if (this.readerThreads > 1) {
this.getDataParallel(response);
} else {
this.getData(response);
}

logger.info("Successfully wrote to: " + outputFile);
final HtsgetResponse resp = req.getResponse();
if (resp.getMd5() == null) {
this.checkMd5 = false;
logger.info("No md5 checksum received");
}

if (checkMd5) {
this.checkMd5(response);
}
try (final OutputStream outputstream = new FileOutputStream(this.outputFile)) {
if (this.parallelDownload) {
resp.getBlocks().parallelStream().map(block -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very clean but we might want to keep the ability to tune by number of threads

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you seen that java actually runs the map operations in parallel and then rejoins them in the right oder? I'm not 100% sure that it's guaranteed to do either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revert the change with the threads. Whether the stream is ordered/not ordered is separate from whether it's parallel/sequential, so the operations might run in parallel and out of order, but the forEachOrdered is makes sure they wait in order for the final step.

final Path tempFile = org.broadinstitute.hellbender.utils.io.IOUtils.createTempPath("htsget-temp", "");
try (final OutputStream ostream = Files.newOutputStream(tempFile)) {
org.apache.commons.io.IOUtils.copy(block.getData(), ostream);
return Files.newInputStream(tempFile);
} catch (final IOException e) {
throw new UserException("Error while downloading htsget block", e);
}
}).forEachOrdered(inputStream -> {
try {
IOUtils.copy(inputStream, outputstream);
} catch (final IOException e) {
throw new UserException("IOException while writing output file", e);
}
});
} else {
throw new UserException("Unrecognized status code: " + statusCode);
try {
IOUtils.copy(resp.getDataStream(), outputstream);
} catch (final IOException e) {
throw new UserException("IOException while writing output file", e);
}
}
} catch (final IOException e) {
throw new UserException("IOException during htsget download", e);
}

if (this.checkMd5) this.checkMd5(resp.getMd5());
andersleung marked this conversation as resolved.
Show resolved Hide resolved

return null;
}
}
Loading