Skip to content

Commit

Permalink
Merge pull request #490 from zkaoudi/main
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
2pk03 authored Jan 16, 2025
2 parents 576583f + cdb7f70 commit acd7570
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public static void main(String[] args){

/* Get a plan builder */
WayangContext wayangContext = new WayangContext(new Configuration())
// .withPlugin(Java.basicPlugin())
// .withPlugin(Spark.basicPlugin());
.withPlugin(Flink.basicPlugin());
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
// .withPlugin(Flink.basicPlugin());

JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("WordCount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public KafkaTopicSink(String topicName, Class<T> typeClass) {
)
)
);
System.out.println("### 11 ... ");

}


Expand All @@ -92,8 +90,6 @@ public KafkaTopicSink(String topicName,
topicName,
new TransformationDescriptor<>(formattingFunction, typeClass, String.class)
);
System.out.println("### 12 ... ");

}

/**
Expand All @@ -106,7 +102,6 @@ public KafkaTopicSink(String topicName, TransformationDescriptor<T, String> form
super(DataSetType.createDefault(formattingDescriptor.getInputType()));
this.topicName = topicName;
this.formattingDescriptor = formattingDescriptor;
System.out.println("### 13 ... ");
}

/**
Expand All @@ -118,7 +113,6 @@ public KafkaTopicSink(KafkaTopicSink<T> that) {
super(that);
this.topicName = that.topicName;
this.formattingDescriptor = that.formattingDescriptor;
System.out.println("### 14 ... ");
}

boolean isInitialized = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,12 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval

logger.info("---> WRITE TO KAFKA SINK...");

logger.info("### 9 ... ");

JavaChannelInstance input = (JavaChannelInstance) inputs[0];

initProducer( (KafkaTopicSink<T>) this );

final Function<T, String> formatter = javaExecutor.getCompiler().compile(this.formattingDescriptor);

logger.info("### 10 ... ");

try ( KafkaProducer<String,String> producer = getProducer() ) {
input.<T>provideStream().forEach(
dataQuantum -> {
Expand Down Expand Up @@ -121,8 +117,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
throw new WayangException("Writing to Kafka topic failed.", e);
}

logger.info("### 11 ... ");

return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ProtocolException;
import java.net.URL;
import java.net.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -45,7 +44,6 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.stream.Stream;

Expand Down Expand Up @@ -82,26 +80,15 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();


String urlStr = this.getInputUrl().trim();
URL sourceUrl = null;

try {

FileSystem fs = FileSystems.getFileSystem(urlStr).get(); //.orElseThrow(
//() -> new WayangException(String.format("FileSystems.getFileSystem( urlStr ).get() => Cannot access file system of %s. ", urlStr))
//);

final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);

}
catch (Exception e) {

try {

URL url = new URL(urlStr);

HttpURLConnection connection2 = (HttpURLConnection) url.openConnection();
sourceUrl = new URL(urlStr);
String protocol = sourceUrl.getProtocol();
if ( protocol.startsWith("https") || protocol.startsWith("http") ) {
HttpURLConnection connection2 = (HttpURLConnection) sourceUrl.openConnection();
connection2.setRequestMethod("GET");

// Check if the response code indicates success (HTTP status code 200)
Expand All @@ -112,12 +99,21 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
((StreamChannel.Instance) outputs[0]).accept(lines2);
}
}
catch (IOException ioException) {
ioException.printStackTrace();
throw new WayangException(String.format("Reading from URL: %s failed.", urlStr), ioException);
else {
FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
() -> new WayangException(String.format("Cannot access file system of %s.", urlStr))
);

final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);
}

// connection2.disconnect();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
} catch (ProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new WayangException(String.format("Reading %s failed.", urlStr), e);
}

ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.java.Java;
import org.apache.wayang.tensorflow.Tensorflow;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -38,6 +39,7 @@

/**
* Test the Tensorflow integration with Wayang.
* Note: this test fails on M1 Macs because of Tensorflow-Java incompatibility.
*/
public class TensorflowIntegrationIT {

Expand Down Expand Up @@ -66,7 +68,7 @@ public class TensorflowIntegrationIT {

public static String[] LABELS = new String[]{"Iris-setosa", "Iris-versicolor", "Iris-virginica"};

@Test
@Ignore
public void test() {
/* training features */
CollectionSource<float[]> trainXSource = new CollectionSource<>(trainX, float[].class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.java.Java;
import org.apache.wayang.tensorflow.Tensorflow;
import org.junit.Ignore;
import org.junit.Test;

import java.net.URI;
Expand All @@ -42,6 +43,7 @@

/**
* Test the Tensorflow integration with Wayang.
* Note: this test fails on M1 Macs because of Tensorflow-Java incompatibility.
*/
public class TensorflowIrisIT {

Expand All @@ -54,7 +56,7 @@ public class TensorflowIrisIT {
"Iris-virginica", 2
);

@Test
@Ignore
public void test() {
final Tuple<Operator, Operator> trainSource = fileOperation(TRAIN_PATH, true);
final Tuple<Operator, Operator> testSource = fileOperation(TEST_PATH, false);
Expand Down

0 comments on commit acd7570

Please sign in to comment.