diff --git a/.gitignore b/.gitignore index 6214bfd68b..d6fa2496d0 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,5 @@ plugins-prod sandbox wave-tests x/* +**/CondorExecutorNextflow.groovy +modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutorNextflow.groovy \ No newline at end of file diff --git a/docs/executor.md b/docs/executor.md index 8595b4917f..6f712e497f 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -193,6 +193,7 @@ To enable the HTCondor executor, set `process.executor = 'condor'` in the `nextf Resource requests and other job characteristics can be controlled via the following process directives: - {ref}`process-clusterOptions` +- {ref}`process-container` - {ref}`process-cpus` - {ref}`process-disk` - {ref}`process-memory` diff --git a/gradle.properties b/gradle.properties index 26137bfbae..1e989cc8ed 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,2 @@ org.gradle.caching=true org.gradle.jvmargs=-Xmx4g -org.gradle.parallel=true diff --git a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy index 37d6d5993b..8e207e5e89 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy @@ -98,6 +98,7 @@ class NextflowMeta { private NextflowMeta() { version = new VersionNumber(BuildInfo.version) build = BuildInfo.buildNum as int + build += 10000 timestamp = BuildInfo.timestampUTC } diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy index ec60408ecb..298873b75d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/Launcher.groovy @@ -687,7 +687,7 @@ class Launcher { """ N E X T F L O W - version ${BuildInfo.version} build ${BuildInfo.buildNum} + version ${BuildInfo.version} build ${BuildInfo.buildNum} - JLL created ${BuildInfo.timestampUTC} ${BuildInfo.timestampDelta} cite doi:10.1038/nbt.3820 http://nextflow.io diff --git a/modules/nextflow/src/main/groovy/nextflow/container/ContainerConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/container/ContainerConfig.groovy index 0709832455..57dca4ef2c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/container/ContainerConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/container/ContainerConfig.groovy @@ -124,6 +124,11 @@ class ContainerConfig extends LinkedHashMap { final eng = getEngine() if( !eng ) return null + // JLL code to implement docker/podman rootless fuse access. I don't think it works. I am currently trying to identify the source of another bug, and this code is currently not being used. Reverting to nextflow standard. + // if( eng=='docker' ) + // return '--rm --device /dev/fuse --security-opt apparmor=unconfined --security-opt seccomp=unconfined' + // if( eng=='podman' ) + // return '--rm --device /dev/fuse' if( eng=='docker' || eng=='podman' ) return '--rm --privileged' if( isSingularityOciMode() ) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index c82493ab1a..5d1cd0fbfa 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -387,7 +387,7 @@ class BashWrapperBuilder { binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && (shopt -s extglob; GLOBIGNORE='..'; chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*}) || true" : null binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null - + return binding } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutor.groovy index 6e7dcafd88..65fdc9adb3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutor.groovy @@ -19,7 +19,24 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.transform.InheritConstructors +import nextflow.container.ContainerBuilder import nextflow.processor.TaskRun +import nextflow.fusion.FusionHelper +import nextflow.file.FileHelper +import nextflow.extension.FilesEx +import nextflow.exception.ProcessException +import nextflow.util.MemoryUnit +import nextflow.SysEnv + +import static java.nio.file.StandardOpenOption.* + +import java.nio.file.FileSystemException +import java.nio.file.FileSystems +import java.nio.file.Files +import java.nio.file.Path + + + /** * HTCondor executor * @@ -41,23 +58,47 @@ class CondorExecutor extends AbstractGridExecutor { protected String getDirectivesText(TaskRun task) { def lines = getDirectives(task) - lines << '' lines.join('\n') } - @Override - protected String getHeaderToken() { - throw new UnsupportedOperationException() + // @Override + // protected String getHeaderToken() { + // throw new UnsupportedOperationException() + // } + + + // Condor does not require a special token or header + protected String getHeaderToken() { return '' } + + /** + * Defines the jobs directive headers + * + * @param task + * @return A multi-line string containing the job directives + */ + String getHeaders( TaskRun task ) { + return getDirectivesText(task) } + + // We currently assume that the system has been configured so that + // anyone (user) who can run an HTCondor job can also run docker. It's + // also apparently a security worry to run Docker as root, so let's not. + // https://github.com/htcondor/htcondor/blob/02c6bc70543951cf9d352e3fbf3343a925a47e3a/src/condor_utils/docker-api.cpp#L99C1-L102C1 + @Override protected List getDirectives(TaskRun task, List result) { result << "universe = vanilla" - result << "executable = ${TaskRun.CMD_RUN}".toString() - result << "log = ${TaskRun.CMD_LOG}".toString() + + result << "out = ${TaskRun.CMD_OUTFILE}".toString() + result << "error = ${TaskRun.CMD_ERRFILE}".toString() + result << "log = .condor_runlog.uuid-${session.uniqueId}.log".toString() result << "getenv = true" + result << "transfer_executable = False" // handled by nextflow + result << "transfer_output_files=\"\"" // ditto + if( task.config.getCpus()>1 ) { result << "request_cpus = ${task.config.getCpus()}".toString() result << "machine_count = 1" @@ -84,14 +125,21 @@ class CondorExecutor extends AbstractGridExecutor { result.addAll( opts.toString().tokenize(';\n').collect{ it.trim() }) } } - - result<< "queue" - + if ( ! pipeLauncherScript() && ! task.isContainerEnabled() ) { + result << "executable = ${task.CMD_RUN}".toString() + result << "environment = ${task.getEnvironment()}".toString() + } + result << 'queue' + } + return result } + @Override List getSubmitCommandLine(TaskRun task, Path scriptFile) { - return ['condor_submit', '--terse', CMD_CONDOR] + return pipeLauncherScript() + ? List.of('condor_submit', '-', '-terse',) + : List.of('condor_submit', '-terse', CMD_CONDOR) } @Override @@ -106,7 +154,7 @@ class CondorExecutor extends AbstractGridExecutor { @Override protected List queueStatusCommand(Object queue) { - ["condor_q", "-nobatch"] + ["condor_history", "-userlog", ".condor_runlog.uuid-${session.uniqueId}.log".toString(), "-wide","-af:j", "JobStatus"] } @@ -117,23 +165,31 @@ class CondorExecutor extends AbstractGridExecutor { 'X': QueueStatus.ERROR, // Removed 'C': QueueStatus.DONE, // Completed 'H': QueueStatus.HOLD, // Held - 'E': QueueStatus.ERROR // Error + 'E': QueueStatus.ERROR, // Error + + // numeric encoding + '0': QueueStatus.PENDING, // Unexpanded + '1': QueueStatus.PENDING, // Idle + '2': QueueStatus.RUNNING, // Running + '3': QueueStatus.ERROR, // Removed + '4': QueueStatus.DONE, // Completed + '5': QueueStatus.HOLD, // Held + '6': QueueStatus.ERROR // Error ] @Override protected Map parseQueueStatus(String text) { final result = new LinkedHashMap() - if( !text ) return result + if( !text ) { + println("escaping because !text") + return result + } - boolean started = false def itr = text.readLines().iterator() while( itr.hasNext() ) { String line = itr.next() - if( !started ) { - started = line.startsWith(' ID ') - continue - } + if( line.startsWith(' ID ') ) continue if( !line.trim() ) { break @@ -141,13 +197,119 @@ class CondorExecutor extends AbstractGridExecutor { def cols = line.tokenize(' ') def id = cols[0] - def st = cols[5] + def st = cols[1] result[id] = DECODE_STATUS[st] } return result } + @Override + protected boolean pipeLauncherScript() { + return isFusionEnabled() + } + + @Override + boolean isFusionEnabled() { + return FusionHelper.isFusionEnabled(session) + } + + + /* + * Prepare and launch the task in the underlying execution platform + */ + CondorTaskHandler createTaskHandler(TaskRun task) { + assert task + assert task.workDir + + new CondorTaskHandler(task, this) + } + + + /** + * Handles a job execution in the underlying grid platform + */ + @CompileStatic + @InheritConstructors + class CondorTaskHandler extends GridTaskHandler { + // creates text of bash wrapper executable file to run on remote server + protected String generateFusionBashWrapperCommand() { + final submit = fusionSubmitCli() + final launcher = fusionLauncher() + final config = task.getContainerConfig() + final containerOpts = task.config.getContainerOptions() + final cmd = FusionHelper.runWithContainer(launcher, config, task.getContainer(), containerOpts, submit) + + return '#!/bin/bash\n' + cmd + '\n' + } + + // creates condor submit file that is fed to stdin. The submit file specifies an executable + protected String fusionStdinWrapper() { + final fusionBashWrapperText = generateFusionBashWrapperCommand() + + final String tmp_launch_script = ".condor.${task.id}.${task.hash}.sh" + final Path executable_file_name = FileHelper.getLocalTempPath().resolve(tmp_launch_script) + // save the bash command to a script on disk + executable_file_name.text = fusionBashWrapperText + + FilesEx.setExecutable(executable_file_name, true) + def submit_file_commands = getDirectives(task) + submit_file_commands << "executable = ${executable_file_name}".toString() + submit_file_commands << "transfer_executable = True" + submit_file_commands << "queue" + submit_file_commands << "" + + return submit_file_commands.join('\n') + + } + + + // code copied from BashWrapperBuilder to allow for writing a wrapper script to give to condor + private static MemoryUnit DEFAULT_STAGE_FILE_THRESHOLD = MemoryUnit.of('1 MB') + private static int DEFAULT_WRITE_BACK_OFF_BASE = 3 + private static int DEFAULT_WRITE_BACK_OFF_DELAY = 250 + private static int DEFAULT_WRITE_MAX_ATTEMPTS = 5 + + private MemoryUnit stageFileThreshold = SysEnv.get('NXF_WRAPPER_STAGE_FILE_THRESHOLD') as MemoryUnit ?: DEFAULT_STAGE_FILE_THRESHOLD + private int writeBackOffBase = SysEnv.get('NXF_WRAPPER_BACK_OFF_BASE') as Integer ?: DEFAULT_WRITE_BACK_OFF_BASE + private int writeBackOffDelay = SysEnv.get('NXF_WRAPPER_BACK_OFF_DELAY') as Integer ?: DEFAULT_WRITE_BACK_OFF_DELAY + private int writeMaxAttempts = SysEnv.get('NXF_WRAPPER_MAX_ATTEMPTS') as Integer ?: DEFAULT_WRITE_MAX_ATTEMPTS + static protected boolean isRetryable0(Exception e) { + if( e instanceof FileSystemException ) + return true + if( e instanceof SocketException ) + return true + if( e instanceof RuntimeException ) + return true + if( e.class.getSimpleName() == 'HttpResponseException' ) + return true + return false + } + + private Path write0(Path path, String data) { + int attempt=0 + while( true ) { + try { + try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) { + writer.write(data) + } + return path + } + catch (Exception e) { + if( !this.isRetryable0(e) ) + throw e + final isLocalFS = path.getFileSystem()==FileSystems.default + // the retry logic is needed for non-local file system such as S3. + // when the file is local fail without retrying + if( isLocalFS || ++attempt>=writeMaxAttempts ) + throw new ProcessException("Unable to create file ${path.toUriString()}", e) + // use an exponential delay before making another attempt + final delay = (Math.pow(writeBackOffBase, attempt) as long) * writeBackOffDelay + Thread.sleep(delay) + } + } + } + } @InheritConstructors static class CondorWrapperBuilder extends BashWrapperBuilder { @@ -163,5 +325,6 @@ class CondorExecutor extends AbstractGridExecutor { return wrapper } + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/CondorExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/CondorExecutorTest.groovy index a9f7c02bc4..7395a39e17 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/CondorExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/CondorExecutorTest.groovy @@ -1,5 +1,6 @@ /* - * Copyright 2013-2024, Seqera Labs + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ */ package nextflow.executor + import java.nio.file.Files import nextflow.Session @@ -24,6 +26,8 @@ import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun import spock.lang.Specification +import spock.lang.Unroll + /** * * @author Paolo Di Tommaso @@ -130,14 +134,23 @@ class CondorExecutorTest extends Specification { } - + @Unroll def 'should return launch command line' () { given: - def executor = [:] as CondorExecutor + def session = Mock(Session) { getConfig() >> [:] } + def exec = Spy(CondorExecutor) { getSession() >> session } - expect: - executor.getSubmitCommandLine( Mock(TaskRun), null) == ['condor_submit', '--terse', '.command.condor'] + when: + def result = exec.getSubmitCommandLine(Mock(TaskRun), null) + then: + exec.pipeLauncherScript() >> PIPE + result == EXPECTED + + where: + PIPE | EXPECTED + false | ['condor_submit', '-terse', '.command.condor'] + true | ['condor_submit', '-terse'] } @@ -270,4 +283,4 @@ class CondorExecutorTest extends Specification { } -} +} \ No newline at end of file diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java index 83ef032059..da661dc9e7 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3FileSystemProvider.java @@ -686,7 +686,7 @@ public A readAttributes(Path path, Class type : readAttr0(s3Path)); } // not support attribute class - throw new UnsupportedOperationException(format("only %s supported", BasicFileAttributes.class)); + throw new UnsupportedOperationException(format("while trying readAttributes of %s, only %s supported", s3Path.toString(), BasicFileAttributes.class)); } private Optional readAttr1(S3Path s3Path) throws IOException {