From c893e50f5f11b2e01219d9c4b2b75476912558bf Mon Sep 17 00:00:00 2001 From: Varun Ratnakar Date: Thu, 6 Nov 2014 17:14:39 +0000 Subject: [PATCH] Compatibility fixes with OODT-0.8 --- oodt-adapter/pom.xml | 4 +- .../api/impl/oodt/CurationServiceAPI.java | 32 +- .../data/api/impl/oodt/DataCreationFM.java | 6 + .../api/impl/oodt/DataCreationFM_Simple.java | 6 + .../api/impl/oodt/OODTExecutionEngine.java | 431 ++++++++++-------- portal/pom.xml | 2 +- 6 files changed, 260 insertions(+), 221 deletions(-) diff --git a/oodt-adapter/pom.xml b/oodt-adapter/pom.xml index 125e7102..d268696e 100644 --- a/oodt-adapter/pom.xml +++ b/oodt-adapter/pom.xml @@ -13,12 +13,12 @@ edu.isi.wings wings-core - 3.0-SNAPSHOT + 4.0-SNAPSHOT ../core/pom.xml - 0.7-SNAPSHOT + 0.8-SNAPSHOT UTF-8 UTF-8 diff --git a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/CurationServiceAPI.java b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/CurationServiceAPI.java index 72abaa3a..2c76f1d4 100644 --- a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/CurationServiceAPI.java +++ b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/CurationServiceAPI.java @@ -36,25 +36,23 @@ private String query(String method, String op, Object... args) { + URLEncoder.encode(args[i+1].toString(), "UTF-8"); } - if("GET".equals(method)) { - URL urlobj = new URL(url + "?" + params); - return IOUtils.toString(urlobj); - } - else { - URL urlobj = new URL(url); - HttpURLConnection con = (HttpURLConnection) urlobj.openConnection(); - con.setRequestMethod(method); - con.setDoOutput(true); - DataOutputStream out = new DataOutputStream(con.getOutputStream()); - out.writeBytes(params); - out.flush(); - out.close(); - - String result = IOUtils.toString(con.getInputStream()); - con.disconnect(); - return result; + URL urlobj = new URL(url); + if("GET".equals(method)) + urlobj = new URL(url + "?" + params); + HttpURLConnection con = (HttpURLConnection) urlobj.openConnection(); + con.setRequestMethod(method); + if(!"GET".equals(method)) { + con.setDoOutput(true); + DataOutputStream out = new DataOutputStream(con.getOutputStream()); + out.writeBytes(params); + out.flush(); + out.close(); } + String result = IOUtils.toString(con.getInputStream()); + con.disconnect(); + return result; + } catch (Exception e) { e.printStackTrace(); } diff --git a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM.java b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM.java index 36fe457c..de3f293e 100644 --- a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM.java +++ b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM.java @@ -336,6 +336,12 @@ public boolean moveDatatypeParent(String dtypeid, String fromtypeid, String toty // Not currently supported return false; } + + @Override + public boolean moveDataParent(String arg0, String arg1, String arg2) { + // Not currently supported + return false; + } @Override public boolean addData(String dataid, String dtypeid) { diff --git a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM_Simple.java b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM_Simple.java index 74f98b81..2510abd1 100644 --- a/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM_Simple.java +++ b/oodt-adapter/src/main/java/edu/isi/wings/catalog/data/api/impl/oodt/DataCreationFM_Simple.java @@ -262,6 +262,12 @@ public boolean moveDatatypeParent(String dtypeid, String fromtypeid, String toty // Not currently supported return false; } + + @Override + public boolean moveDataParent(String arg0, String arg1, String arg2) { + // Not currently supported + return false; + } @Override public boolean addData(String dataid, String dtypeid) { diff --git a/oodt-adapter/src/main/java/edu/isi/wings/execution/engine/api/impl/oodt/OODTExecutionEngine.java b/oodt-adapter/src/main/java/edu/isi/wings/execution/engine/api/impl/oodt/OODTExecutionEngine.java index 5a551161..d5c8502d 100644 --- a/oodt-adapter/src/main/java/edu/isi/wings/execution/engine/api/impl/oodt/OODTExecutionEngine.java +++ b/oodt-adapter/src/main/java/edu/isi/wings/execution/engine/api/impl/oodt/OODTExecutionEngine.java @@ -14,210 +14,239 @@ import edu.isi.wings.execution.engine.classes.RuntimeInfo; import edu.isi.wings.execution.engine.classes.RuntimePlan; import edu.isi.wings.execution.engine.classes.RuntimeStep; -import edu.isi.wings.execution.logger.api.ExecutionLoggerAPI; +import edu.isi.wings.execution.tools.api.ExecutionLoggerAPI; +import edu.isi.wings.execution.tools.api.ExecutionMonitorAPI; +import edu.isi.wings.execution.tools.api.ExecutionResourceAPI; public class OODTExecutionEngine implements PlanExecutionEngine, StepExecutionEngine{ - Properties props; - protected int maxParallel = 4; - - StepExecutionEngine stepEngine; - PlanExecutionEngine planEngine; - - ExecutionLoggerAPI monitor; - OODTWorkflowAdapter adapter; - Thread monitoringThread; - - String jobdir; - String wlogfile; - - public OODTExecutionEngine(Properties props) { - this.props = props; - this.stepEngine = this; - this.planEngine = this; - } - - @Override - public void execute(RuntimeStep exe, RuntimePlan plan) { - // Execute Step: Do nothing - } - - @Override - public void abort(RuntimeStep exe) { - // Abort Step: Do nothing - } - - @Override - public void setPlanExecutionEngine(PlanExecutionEngine engine) { - this.planEngine = engine; - } - - @Override - public PlanExecutionEngine getPlanExecutionEngine() { - return this.planEngine; - } - - @Override - public void execute(RuntimePlan exe) { - String wmurl = props.getProperty("oodt.wmurl"); - String fmurl = props.getProperty("oodt.fmurl"); - String wmsurl = props.getProperty("oodt.wmsurl"); - String libns = props.getProperty("lib.domain.data.url") + "#"; - - String codedir = props.getProperty("lib.domain.code.storage") + File.separator; - String datadir = props.getProperty("lib.domain.data.storage") + File.separator; - try { - File f = File.createTempFile("oodt-run-", ""); - if (f.delete() && f.mkdirs()) { - this.jobdir = f.getAbsolutePath() + File.separator; - this.wlogfile = "workflow.log"; - this.adapter = new OODTWorkflowAdapter(wmurl, wmsurl, - fmurl, libns, - codedir, datadir, - jobdir, wlogfile); - this.adapter.runWorkflow(exe); - - // Start Monitoring thread - this.monitoringThread = new Thread( - new ExecutionMonitoringThread(exe, this.monitor, this.jobdir, this.wlogfile)); - this.monitoringThread.start(); - } - } catch (Exception e) { - exe.onEnd(this.monitor, RuntimeInfo.Status.FAILURE, e.getMessage()); - //e.printStackTrace(); - } - } - - @Override - public void onStepEnd(RuntimePlan exe) { - // Do nothing - } - - @Override - public void abort(RuntimePlan exe) { - // Abort plan - if(this.monitoringThread != null && - this.monitoringThread.isAlive()) - this.monitoringThread.interrupt(); - - this.adapter.stopWorkflow(exe); - } - - @Override - public int getMaxParallelSteps() { - return 0; - } - - @Override - public void setMaxParallelSteps(int num) { - // Do nothing - } - - @Override - public void setStepExecutionEngine(StepExecutionEngine engine) { - this.stepEngine = engine; - } - - @Override - public StepExecutionEngine getStepExecutionEngine() { - return this.stepEngine; - } - - @Override - public void setExecutionLogger(ExecutionLoggerAPI monitor) { - this.monitor = monitor; - } - - @Override - public ExecutionLoggerAPI getExecutionLogger() { - return this.monitor; - - } - - class ExecutionMonitoringThread implements Runnable { - RuntimePlan planexe; - String jobdir; - String wlogfile; - PlanExecutionEngine planEngine; - ExecutionLoggerAPI logger; - - public ExecutionMonitoringThread(RuntimePlan planexe, - ExecutionLoggerAPI monitor, String jobdir, String wlogfile) { - this.planexe = planexe; - this.logger = monitor; - this.jobdir = jobdir; - this.wlogfile = wlogfile; - } - - @Override - public void run() { - planexe.onStart(this.logger); - try { - HashMap jobstatus = - new HashMap(); - - int osleeptime = 1000; - int maxsleeptime = 4*osleeptime; - int sleeptime = osleeptime; - - Pattern pattern = Pattern.compile("^(Job\\d+)\\s+\\((.+)\\)\\s*:\\s+(.+)$"); - while(true) { - for(String line : FileUtils.readLines(new File(this.jobdir + this.wlogfile))) { - Matcher mat = pattern.matcher(line); - if(mat.find()) { - String jobname = mat.group(2); - RuntimeInfo.Status status = RuntimeInfo.Status.valueOf(mat.group(3)); - jobstatus.put(jobname, status); - } - } - - ArrayList steps = new ArrayList(); - steps.addAll(planexe.getQueue().getStepsReadyToExecute()); - steps.addAll(planexe.getQueue().getRunningSteps()); - - boolean shortsleep = false; - for(RuntimeStep stepexe : steps) { - String jobname = stepexe.getStep().getName(); - if(jobstatus.containsKey(jobname)) { - if(stepexe.getRuntimeInfo().getStatus() == - RuntimeInfo.Status.QUEUED) { - stepexe.setRuntimePlan(planexe); - stepexe.onStart(logger); - shortsleep = true; - } - - RuntimeInfo.Status status = jobstatus.get(jobname); - if(status == RuntimeInfo.Status.SUCCESS || - status == RuntimeInfo.Status.FAILURE) { - stepexe.onEnd(logger, status, - FileUtils.readFileToString( - new File(this.jobdir + jobname + ".log"))); - shortsleep = true; - } - } - } - - steps = planexe.getQueue().getStepsReadyToExecute(); - if(steps.size() == 0) { - // Nothing to execute. Check if finished - if(planexe.getQueue().getRunningSteps().size() == 0) { - RuntimeInfo.Status status = RuntimeInfo.Status.FAILURE; - if(planexe.getQueue().getFinishedSteps().size() == - planexe.getQueue().getAllSteps().size()) - status = RuntimeInfo.Status.SUCCESS; - planexe.onEnd(this.logger, status, "Finished"); - break; - } - } - - sleeptime = shortsleep ? osleeptime : - (sleeptime >= maxsleeptime ? maxsleeptime : sleeptime*2); - Thread.sleep(sleeptime); - } - } - catch (Exception e) { - this.planexe.onEnd(this.logger, RuntimeInfo.Status.FAILURE, e.getMessage()); - } + Properties props; + protected int maxParallel = 4; + + StepExecutionEngine stepEngine; + PlanExecutionEngine planEngine; + + OODTWorkflowAdapter adapter; + Thread monitoringThread; + + protected ExecutionLoggerAPI logger; + protected ExecutionMonitorAPI monitor; + protected ExecutionResourceAPI resource; + + String jobdir; + String wlogfile; + + public OODTExecutionEngine(Properties props) { + System.out.println(props); + this.props = props; + this.stepEngine = this; + this.planEngine = this; + } + + @Override + public void execute(RuntimeStep exe, RuntimePlan plan) { + // Execute Step: Do nothing + } + + @Override + public void abort(RuntimeStep exe) { + // Abort Step: Do nothing + } + + @Override + public void setPlanExecutionEngine(PlanExecutionEngine engine) { + this.planEngine = engine; + } + + @Override + public PlanExecutionEngine getPlanExecutionEngine() { + return this.planEngine; + } + + @Override + public void execute(RuntimePlan exe) { + String wmurl = props.getProperty("oodt.wmurl"); + String fmurl = props.getProperty("oodt.fmurl"); + String wmsurl = props.getProperty("oodt.wmsurl"); + String libns = props.getProperty("lib.domain.data.url") + "#"; + + String codedir = props.getProperty("lib.domain.code.storage") + File.separator; + String datadir = props.getProperty("lib.domain.data.storage") + File.separator; + try { + File f = File.createTempFile("oodt-run-", ""); + if (f.delete() && f.mkdirs()) { + this.jobdir = f.getAbsolutePath() + File.separator; + this.wlogfile = "workflow.log"; + this.adapter = new OODTWorkflowAdapter(wmurl, wmsurl, + fmurl, libns, + codedir, datadir, + jobdir, wlogfile); + this.adapter.runWorkflow(exe); + + // Start Monitoring thread + this.monitoringThread = new Thread( + new ExecutionMonitoringThread(exe, this.logger, this.jobdir, this.wlogfile)); + this.monitoringThread.start(); + } + } catch (Exception e) { + exe.onEnd(this.logger, RuntimeInfo.Status.FAILURE, e.getMessage()); + //e.printStackTrace(); + } + } + + @Override + public void onStepEnd(RuntimePlan exe) { + // Do nothing + } + + @Override + public void abort(RuntimePlan exe) { + // Abort plan + if(this.monitoringThread != null && + this.monitoringThread.isAlive()) + this.monitoringThread.interrupt(); + + this.adapter.stopWorkflow(exe); + } + + @Override + public int getMaxParallelSteps() { + return 0; + } + + @Override + public void setMaxParallelSteps(int num) { + // Do nothing + } + + @Override + public void setStepExecutionEngine(StepExecutionEngine engine) { + this.stepEngine = engine; + } + + @Override + public StepExecutionEngine getStepExecutionEngine() { + return this.stepEngine; + } + + @Override + public void setExecutionLogger(ExecutionLoggerAPI logger) { + this.logger = logger; + } + + @Override + public ExecutionLoggerAPI getExecutionLogger() { + return this.logger; + } + + class ExecutionMonitoringThread implements Runnable { + RuntimePlan planexe; + String jobdir; + String wlogfile; + PlanExecutionEngine planEngine; + ExecutionLoggerAPI logger; + + public ExecutionMonitoringThread(RuntimePlan planexe, + ExecutionLoggerAPI monitor, String jobdir, String wlogfile) { + this.planexe = planexe; + this.logger = monitor; + this.jobdir = jobdir; + this.wlogfile = wlogfile; + } + + @Override + public void run() { + planexe.onStart(this.logger); + try { + HashMap jobstatus = + new HashMap(); + + int osleeptime = 1000; + int maxsleeptime = 4*osleeptime; + int sleeptime = osleeptime; + + Pattern pattern = Pattern.compile("^(Job\\d+)\\s+\\((.+)\\)\\s*:\\s+(.+)$"); + while(true) { + for(String line : FileUtils.readLines(new File(this.jobdir + this.wlogfile))) { + Matcher mat = pattern.matcher(line); + if(mat.find()) { + String jobname = mat.group(2); + RuntimeInfo.Status status = RuntimeInfo.Status.valueOf(mat.group(3)); + jobstatus.put(jobname, status); + } + } + + ArrayList steps = new ArrayList(); + steps.addAll(planexe.getQueue().getStepsReadyToExecute()); + steps.addAll(planexe.getQueue().getRunningSteps()); + + boolean shortsleep = false; + for(RuntimeStep stepexe : steps) { + String jobname = stepexe.getStep().getName(); + if(jobstatus.containsKey(jobname)) { + if(stepexe.getRuntimeInfo().getStatus() == + RuntimeInfo.Status.QUEUED) { + stepexe.setRuntimePlan(planexe); + stepexe.onStart(logger); + shortsleep = true; + } + + RuntimeInfo.Status status = jobstatus.get(jobname); + if(status == RuntimeInfo.Status.SUCCESS || + status == RuntimeInfo.Status.FAILURE) { + stepexe.onEnd(logger, status, + FileUtils.readFileToString( + new File(this.jobdir + jobname + ".log"))); + shortsleep = true; + } + } + } + + steps = planexe.getQueue().getStepsReadyToExecute(); + if(steps.size() == 0) { + // Nothing to execute. Check if finished + if(planexe.getQueue().getRunningSteps().size() == 0) { + RuntimeInfo.Status status = RuntimeInfo.Status.FAILURE; + if(planexe.getQueue().getFinishedSteps().size() == + planexe.getQueue().getAllSteps().size()) + status = RuntimeInfo.Status.SUCCESS; + planexe.onEnd(this.logger, status, "Finished"); + break; + } + } + + sleeptime = shortsleep ? osleeptime : + (sleeptime >= maxsleeptime ? maxsleeptime : sleeptime*2); + Thread.sleep(sleeptime); } + } + catch (Exception e) { + this.planexe.onEnd(this.logger, RuntimeInfo.Status.FAILURE, e.getMessage()); + } } + } + + @Override + public void setExecutionMonitor(ExecutionMonitorAPI monitor) { + this.monitor = monitor; + if (this.stepEngine != this) + this.stepEngine.setExecutionMonitor(monitor); + } + + @Override + public ExecutionMonitorAPI getExecutionMonitor() { + return this.monitor; + } + + @Override + public void setExecutionResource(ExecutionResourceAPI resource) { + this.resource = resource; + if(this.stepEngine != this) + this.stepEngine.setExecutionResource(resource); + } + + @Override + public ExecutionResourceAPI getExecutionResource() { + return this.resource; + } } diff --git a/portal/pom.xml b/portal/pom.xml index f71f9e3b..1f7b823b 100644 --- a/portal/pom.xml +++ b/portal/pom.xml @@ -39,7 +39,7 @@ org.apache.oodt cas-wm-services - 0.7-SNAPSHOT + 0.8-SNAPSHOT war