Skip to content

Commit

Permalink
[FLINK-36352][doc] Remove Scala DataStream API related doc
Browse files Browse the repository at this point in the history
  • Loading branch information
X-czh committed Jan 26, 2025
1 parent 7f8752b commit 2e7c0ea
Show file tree
Hide file tree
Showing 33 changed files with 16 additions and 2,703 deletions.
215 changes: 0 additions & 215 deletions docs/content/docs/connectors/datastream/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,32 +339,6 @@ final FileSink<String> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import java.time.Duration

val input: DataStream[String] = ...

val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -441,23 +415,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -495,22 +452,6 @@ final FileSink<ProtoRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord is a generated protobuf Message class.
val input: DataStream[ProtoRecord] = ...

val sink: FileSink[ProtoRecord] = FileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -565,23 +506,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -624,26 +548,6 @@ stream.sinkTo(FileSink.forBulkFormat(
factory).build());
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
override def createWriter(out: OutputStream): DataFileWriter[Address] = {
val schema = ReflectData.get.getSchema(classOf[Address])
val datumWriter = new ReflectDatumWriter[Address](schema)

val dataFileWriter = new DataFileWriter[Address](datumWriter)
dataFileWriter.setCodec(CodecFactory.snappyCodec)
dataFileWriter.create(schema, out)
dataFileWriter
}
})

val stream: DataStream[Address] = ...
stream.sinkTo(FileSink.forBulkFormat(
outputBasePath,
factory).build());
```
{{< /tab >}}
{{< /tabs >}}

##### ORC Format
Expand Down Expand Up @@ -701,24 +605,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import java.nio.charset.StandardCharsets
import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
ageColVector.vector(batch.size + 1) = element.getAge
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -747,23 +633,6 @@ final FileSink<Person> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory

val schema: String = "struct<_col0:string,_col1:int>"
val input: DataStream[Person] = ...
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));

val sink: FileSink[Person] = FileSink
.forBulkFormat(outputBasePath, writerFactory)
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand All @@ -786,19 +655,6 @@ final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val schema: String = ...
val conf: Configuration = ...
val writerProperties: Properties = new Properties()

writerProperties.setProperty("orc.compress", "LZ4")
// Other ORC supported properties can also be set similarly.

val writerFactory = new OrcBulkWriterFactory(
new PersonVectorizer(schema), writerProperties, conf)
```
{{< /tab >}}
{{< /tabs >}}

The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
Expand All @@ -820,22 +676,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
...
val metadataKey: String = ...
val metadataValue: ByteBuffer = ...
addUserMetadata(metadataKey, metadataValue)
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -891,27 +731,6 @@ final FileSink<Tuple2<LongWritable, Text>> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;

val input: DataStream[(LongWritable, Text)] = ...
val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
val sink: FileSink[(LongWritable, Text)] = FileSink
.forBulkFormat(
outputBasePath,
new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -1050,24 +869,6 @@ FileSink<Tuple2<Integer, Integer>> sink = FileSink
.withOutputFileConfig(config)
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build()

val sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -1112,22 +913,6 @@ FileSink<Integer> fileSink=
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val fileSink: FileSink[Integer] =
FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setSizeThreshold(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor(
new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down
20 changes: 0 additions & 20 deletions docs/content/docs/connectors/datastream/formats/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ InputFormat.

The following example shows how to use Hadoop's `TextInputFormat`.

{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
{{< tab "Java" >}}

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();
Expand All @@ -86,21 +83,4 @@ DataStream<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readHadoopFi
[...]
```

{{< /tab >}}
{{< tab "Scala" >}}

```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
textInputFormat, classOf[Text], classOf[Text], textPath))

// Do something with the data.
[...]
```

{{< /tab >}}
{{< /tabs >}}

{{< top >}}
Loading

0 comments on commit 2e7c0ea

Please sign in to comment.