[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Static types for process inputs/outputs #4553

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e974900
Refactor ast xform classes
bentsherman Nov 19, 2023
4612de3
Move process and workflow DSLs into separate classes
bentsherman Nov 20, 2023
5ad9813
Add ProcessFn annotation
bentsherman Nov 30, 2023
01ef1db
Rename ProcessDsl -> ProcessBuilder, add separate builder for process…
bentsherman Nov 30, 2023
c465000
Add WorkflowFn annotation
bentsherman Nov 30, 2023
8f2c090
Add support for native processes, use reflection to invoke workflows
bentsherman Dec 1, 2023
48fdfc2
Separate process input channel logic from task processor
bentsherman Dec 1, 2023
041e10a
Remove params from WorkflowFn
bentsherman Dec 1, 2023
a52a829
Simplify ProcessFn param names
bentsherman Dec 2, 2023
570892c
Separate `InParam`s from task config
bentsherman Dec 2, 2023
cf0e4b2
Fix process input channel logic
bentsherman Dec 2, 2023
0c490e8
Fix bugs
bentsherman Dec 6, 2023
8733ba6
Refactor process inputs and outputs
bentsherman Dec 8, 2023
d2268b2
Refactor process inputs/outputs DSL
bentsherman Dec 9, 2023
bca231b
Move ProcessBuilder#applyConfig() into subclass
bentsherman Dec 9, 2023
872a3e2
Add CombineManyOp to combine process input channels
bentsherman Dec 9, 2023
72b54f6
Save variable refs in ProcessFn
bentsherman Dec 9, 2023
dfd5aea
Fix bugs
bentsherman Dec 9, 2023
1e77a22
Fix task hash (resume still not working)
bentsherman Dec 10, 2023
c00ee3f
Update tests
bentsherman Dec 13, 2023
f7b3fa8
Move annotation API to separate branch
bentsherman Dec 13, 2023
c300f00
Minor edits
bentsherman Dec 13, 2023
ce2de32
Minor edits
bentsherman Dec 13, 2023
47a85be
Fix storeDir warning and task context caching
bentsherman Dec 17, 2023
cc2c08e
Merge upstream changes
bentsherman Dec 17, 2023
8b5fbb6
Merge branch 'master' into ben-programmatic-api
bentsherman Dec 17, 2023
48423e4
Fix failing integration tests
bentsherman Dec 18, 2023
36510f4
Fix failing integration tests, minor changes
bentsherman Dec 18, 2023
353493e
Update tests
bentsherman Dec 18, 2023
177120c
Add comments
bentsherman Dec 18, 2023
8441989
Fix stdout evaluation
bentsherman Dec 18, 2023
700ea34
Merge branch 'master' into ben-programmatic-api
bentsherman Mar 28, 2024
1f6705b
Move LazyHelper to script package, update copyright
bentsherman Mar 28, 2024
ecdaaa4
cleanup
bentsherman Mar 28, 2024
4509b28
Infer staging of file inputs from input types
bentsherman Mar 29, 2024
8efcfc0
Update docs
bentsherman Mar 29, 2024
8a3a827
Fix error with legacy syntax
bentsherman Mar 29, 2024
1cd6fce
Rename CombineManyOp -> MergeWithEachOp
bentsherman Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor process inputs and outputs
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
bentsherman committed Dec 8, 2023
commit 8733ba6700fd96e0cf602d4914c10f17c709cf74
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,20 @@ class ProcessFnXform extends ClassCodeVisitorSupport {
fixDirectiveWithNegativeValue(stmt)
}

// fix outputs
final outputs = annotation.getMember('outputs')
if( outputs != null && outputs instanceof ClosureExpression ) {
final block = (BlockStatement)outputs.getCode()
for( Statement stmt : block.getStatements() )
fixOutputMethod((ExpressionStatement)stmt)
}

// insert `task` method parameter
final params = method.getParameters() as List<Parameter>
params.push(new Parameter(new ClassNode(TaskConfig), 'task'))
method.setParameters(params as Parameter[])

// TODO: append stub source

// append method params
final params = method.getParameters() as List<Parameter>
annotation.addMember( 'params', new ListExpression(
params.collect(p -> (Expression)constX(p.getName()))
) )

// append script source
annotation.addMember( 'source', constX( getSource(method.getCode()) ) )

// prepend `task` method parameter
params.push(new Parameter(new ClassNode(TaskConfig), 'task'))
method.setParameters(params as Parameter[])
}

/**
Expand Down Expand Up @@ -138,24 +130,6 @@ class ProcessFnXform extends ClassCodeVisitorSupport {
) )
}

private static final VALID_OUTPUT_METHODS = ['val','env','file','path','stdout','tuple']

/**
* Fix output method calls.
*
* @param stmt
*/
protected void fixOutputMethod(ExpressionStatement stmt) {
final methodCall = (MethodCallExpression)stmt.getExpression()
final name = methodCall.getMethodAsString()
final args = (ArgumentListExpression)methodCall.getArguments()

if( name !in VALID_OUTPUT_METHODS )
syntaxError(stmt, "Invalid output method '${name}'")

methodCall.setMethod( constX('_out_' + name) )
}

private String getSource(ASTNode node) {
final buffer = new StringBuilder()
final colx = node.getColumnNumber()
Expand Down
38 changes: 10 additions & 28 deletions modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@ import nextflow.NF
import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.processor.TaskProcessor
import nextflow.script.params.DefaultOutParam
import nextflow.script.params.EachInParam
import nextflow.script.params.InParam
import nextflow.script.params.InputsList
import nextflow.script.params.OutParam
import nextflow.script.params.OutputsList
import nextflow.script.params.TupleInParam
import nextflow.script.params.TupleOutParam
import nextflow.script.ProcessInputs
import nextflow.script.ProcessOutputs

import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -95,7 +89,7 @@ class DAG {
* @param inputs The list of inputs entering in the process
* @param outputs the list of outputs leaving the process
*/
void addProcessNode( String label, InputsList inputs, OutputsList outputs, TaskProcessor process=null ) {
void addProcessNode( String label, ProcessInputs inputs, ProcessOutputs outputs, TaskProcessor process=null ) {
assert label
assert inputs!=null
assert outputs
Expand Down Expand Up @@ -234,30 +228,18 @@ class DAG {

}

private List<ChannelHandler> normalizeInputs( InputsList inputs ) {
private List<ChannelHandler> normalizeInputs( ProcessInputs inputs ) {

inputs.collect { InParam p -> new ChannelHandler(channel: p.rawChannel, label: inputName0(p)) }

}

private String inputName0(InParam param) {
if( param instanceof TupleInParam ) return null
if( param instanceof EachInParam ) return null
return param.name
inputs.collect { p ->
new ChannelHandler(channel: p.getChannel(), label: p.getName())
}
}

private List<ChannelHandler> normalizeOutputs( OutputsList outputs ) {
private List<ChannelHandler> normalizeOutputs( ProcessOutputs outputs ) {

def result = []
for(OutParam p :outputs) {
if( p instanceof DefaultOutParam )
break
final it = p.getOutChannel()
if( it!=null )
result << new ChannelHandler(channel: it, label: p instanceof TupleOutParam ? null : p.name)
outputs.collect { p ->
new ChannelHandler(channel: p.getChannel(), label: p.getName())
}

return result
}

private List<ChannelHandler> normalizeChannels( entry ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import groovyx.gpars.dataflow.operator.DataflowProcessor
import nextflow.Global
import nextflow.Session
import nextflow.processor.TaskProcessor
import nextflow.script.params.InputsList
import nextflow.script.params.OutputsList
import nextflow.script.ProcessInputs
import nextflow.script.ProcessOutputs
/**
* Helper class to mark DAG node with the proper labels
*
Expand All @@ -46,7 +46,7 @@ class NodeMarker {
* @param inputs The list of inputs entering in the process
* @param outputs the list of outputs leaving the process
*/
static void addProcessNode( TaskProcessor process, InputsList inputs, OutputsList outputs ) {
static void addProcessNode( TaskProcessor process, ProcessInputs inputs, ProcessOutputs outputs ) {
if( session && session.dag && !session.aborted )
session.dag.addProcessNode( process.name, inputs, outputs, process )
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,12 @@ class PublishDir {
/**
* Apply the publishing process to the specified {@link TaskRun} instance
*
* @param files Set of output files
* @param task The task whose output need to be published
*/
@CompileStatic
void apply( Set<Path> files, TaskRun task ) {
void apply( TaskRun task ) {

if( !files || !enabled )
if( !task.outputFiles || !enabled )
return

if( !path )
Expand All @@ -283,7 +282,7 @@ class PublishDir {
this.stageInMode = task.config.stageInMode
this.taskName = task.name

apply0(files)
apply0(task.outputFiles)
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import java.nio.file.Path

import groovy.transform.CompileStatic
/**
* Implements the collection of environment variables
* from the environment of a task execution.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
* @author Ben Sherman <bentshermann@gmail.com>
*/
@CompileStatic
class TaskEnvCollector {

private Path workDir

TaskEnvCollector(Path workDir) {
this.workDir = workDir
}

Map collect() {
final env = workDir.resolve(TaskRun.CMD_ENV).text
final result = new HashMap(50)
for( String line : env.readLines() ) {
def tokens = tokenize0(line)
def key = tokens[0]
def value = tokens[1]
if( !key )
continue
result.put(key, value)
}
return result
}

private List<String> tokenize0(String line) {
int p = line.indexOf('=')
return p == -1
? List.of(line, '')
: List.of(line.substring(0,p), line.substring(p+1))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import java.nio.file.LinkOption
import java.nio.file.Path
import java.nio.file.NoSuchFileException

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.exception.IllegalArityException
import nextflow.exception.MissingFileException
import nextflow.file.FileHelper
import nextflow.file.FilePatternSplitter
import nextflow.script.ProcessFileOutput
/**
* Implements the collection of files from the work directory
* of a task execution.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
@CompileStatic
class TaskFileCollecter {

private ProcessFileOutput param

private TaskRun task

TaskFileCollecter(ProcessFileOutput param, TaskRun task) {
this.param = param
this.task = task
}

Object collect() {
final List<Path> allFiles = []
final filePatterns = param.getFilePatterns(task.context, task.workDir)
boolean inputsExcluded = false

for( String filePattern : filePatterns ) {
List<Path> result = null

final splitter = param.glob ? FilePatternSplitter.glob().parse(filePattern) : null
if( splitter?.isPattern() ) {
result = fetchResultFiles(filePattern, task.workDir)
if( result && !param.includeInputs ) {
result = excludeStagedInputs(task, result)
log.trace "Process ${task.lazyName()} > after removing staged inputs: ${result}"
inputsExcluded |= (result.size()==0)
}
}
else {
final path = param.glob ? splitter.strip(filePattern) : filePattern
final file = task.workDir.resolve(path)
final exists = checkFileExists(file)
if( exists )
result = List.of(file)
else
log.debug "Process `${task.lazyName()}` is unable to find [${file.class.simpleName}]: `$file` (pattern: `$filePattern`)"
}

if( result )
allFiles.addAll(result)

else if( !param.optional && (!param.arity || param.arity.min > 0) ) {
def msg = "Missing output file(s) `$filePattern` expected by process `${task.lazyName()}`"
if( inputsExcluded )
msg += " (note: input files are not included in the default matching set)"
throw new MissingFileException(msg)
}
}

if( !param.isValidArity(allFiles.size()) )
throw new IllegalArityException("Incorrect number of output files for process `${task.lazyName()}` -- expected ${param.arity}, found ${allFiles.size()}")

return allFiles.size()==1 && param.isSingle() ? allFiles[0] : allFiles
}

/**
* Collect the file(s) matching the specified name or glob pattern
* in the given task work directory.
*
* @param pattern
* @param workDir
*/
protected List<Path> fetchResultFiles(String pattern, Path workDir) {
final opts = [
relative: false,
hidden: param.hidden ?: pattern.startsWith('.'),
followLinks: param.followLinks,
maxDepth: param.maxDepth,
type: param.type ? param.type : ( pattern.contains('**') ? 'file' : 'any' )
]

List<Path> files = []
try {
FileHelper.visitFiles(opts, workDir, pattern) { Path it -> files.add(it) }
}
catch( NoSuchFileException e ) {
throw new MissingFileException("Cannot access directory: '$workDir'", e)
}

return files.sort()
}

/**
* Remove each path in the given list whose name matches the name of
* an input file for the specified {@code TaskRun}
*
* @param task
* @param collectedFiles
*/
protected List<Path> excludeStagedInputs(TaskRun task, List<Path> collectedFiles) {

final List<String> allStagedFiles = task.getStagedInputs()
final List<Path> result = new ArrayList<>(collectedFiles.size())

for( int i = 0; i < collectedFiles.size(); i++ ) {
final file = collectedFiles.get(i)
final relativeName = task.workDir.relativize(file).toString()
if( !allStagedFiles.contains(relativeName) )
result.add(file)
}

return result
}

protected boolean checkFileExists(Path file) {
param.followLinks ? file.exists() : file.exists(LinkOption.NOFOLLOW_LINKS)
}
}
Loading