[go: nahoru, domu]

Skip to content

Commit

Permalink
added support of reading and writing directly to AWS using the nio-sp…
Browse files Browse the repository at this point in the history
…i-s3 tool. Currently only works locally.
  • Loading branch information
Danish Intizar committed Feb 1, 2024
1 parent dd73036 commit c9696d8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions gatk
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ GATK_RUN_SCRIPT = BUILD_LOCATION + projectName
GATK_LOCAL_JAR_ENV_VARIABLE = "GATK_LOCAL_JAR"
GATK_SPARK_JAR_ENV_VARIABLE = "GATK_SPARK_JAR"
BIN_PATH = script + "/build/libs"
NIO_SPI_S3_JAR = "src/main/resources/org/broadinstitute/hellbender/tools/aws/nio-spi-for-s3-2.0.0-dev-all.jar"

EXTRA_JAVA_OPTIONS_SPARK= "-DGATK_STACKTRACE_ON_USER_EXCEPTION=true " \
"-Dsamjdk.use_async_io_read_samtools=false " \
Expand Down Expand Up @@ -121,6 +122,7 @@ def main(args):
print(" --debug-suspend sets the Java VM debug agent up so that the run get immediatelly suspended")
print(" waiting for a debugger to connect. By default the port number is 5005 but")
print(" can be customized using --debug-port")
print(" --s3 (IN DEV) sets up GATK to use Amazon S3 as the default storage backend")
sys.exit(0)

if len(args) == 1 and args[0] == "--list":
Expand All @@ -131,6 +133,10 @@ def main(args):
dryRun = True
args.remove("--dry-run")

aws_s3 = "--s3" in args
if aws_s3:
aws_s3 = True
args.remove("--s3")

debugPort = getValueForArgument(args, "--debug-port")
if debugPort is not None:
Expand Down Expand Up @@ -174,7 +180,7 @@ def main(args):
del sparkArgs[i] #and its parameter
gatkArgs += ["--spark-master", sparkMaster]

runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend)
runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend, aws_s3)

except GATKLaunchException as e:
sys.stderr.write(str(e)+"\n")
Expand Down Expand Up @@ -202,7 +208,7 @@ def getGCloudSubmitCommand(gcloudCmd):
else:
return gcloudCmd

def getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend):
def getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend, aws_s3):
localJarFromEnv = getJarFromEnv(GATK_LOCAL_JAR_ENV_VARIABLE)

# Add java options to our packaged local jar options
Expand All @@ -217,7 +223,7 @@ def getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend):


if localJarFromEnv is not None:
return formatLocalJarCommand(localJarFromEnv)
return formatLocalJarCommand(localJarFromEnv, aws_s3)

wrapperScript = getGatkWrapperScript(throwIfNotFound=False)
if wrapperScript is not None:
Expand All @@ -233,11 +239,15 @@ def getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend):

return [wrapperScript]

return formatLocalJarCommand(getLocalJar()) # will throw if local jar not found
return formatLocalJarCommand(getLocalJar(), aws_s3) # will throw if local jar not found


def formatLocalJarCommand(localJar):
return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + [ "-jar", localJar]
def formatLocalJarCommand(localJar, aws_s3):
if aws_s3:
classpath = "{aws_jar}:{local_jar}".format(aws_jar=NIO_SPI_S3_JAR,local_jar=localJar)
return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + ["-classpath", classpath, "org.broadinstitute.hellbender.Main"]
else:
return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + [ "-jar", localJar]

def getGatkWrapperScript(throwIfNotFound=True):
if not os.path.exists(GATK_RUN_SCRIPT):
Expand Down Expand Up @@ -354,9 +364,9 @@ def cacheJarOnGCS(jar, dryRun):
return jar


def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend):
def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend, aws_s3):
if sparkRunner is None or sparkRunner == "LOCAL":
cmd = getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend) + gatkArgs + sparkArgs
cmd = getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend, aws_s3) + gatkArgs + sparkArgs
runCommand(cmd, dryrun)
elif sparkRunner == "SPARK":
sparkSubmitCmd = getSparkSubmitCommand(suppliedSparkSubmitCommand)
Expand Down
Binary file not shown.

0 comments on commit c9696d8

Please sign in to comment.