Skip to content

Commit

Permalink
[FLINK-36352][doc] Remove Scala DataStream API related doc
Browse files Browse the repository at this point in the history
  • Loading branch information
X-czh committed Feb 8, 2025
1 parent 7f8752b commit 0bd63f2
Show file tree
Hide file tree
Showing 72 changed files with 38 additions and 6,109 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Flink 运行时由两种类型的进程组成:一个 _JobManager_ 和一个或

{{< img src="/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="70%" >}}

*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。
*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。

可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为[standalone 集群]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})启动、在容器中启动、或者通过[YARN]({{< ref "docs/deployment/resource-providers/yarn" >}})等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。

Expand Down
215 changes: 0 additions & 215 deletions docs/content.zh/docs/connectors/datastream/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,6 @@ final FileSink<String> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import java.time.Duration

val input: DataStream[String] = ...

val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -444,23 +418,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -497,22 +454,6 @@ final FileSink<ProtoRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord 是一个生成 protobuf 的类
val input: DataStream[ProtoRecord] = ...

val sink: FileSink[ProtoRecord] = FileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -567,23 +508,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -624,26 +548,6 @@ stream.sinkTo(FileSink.forBulkFormat(
factory).build());
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
override def createWriter(out: OutputStream): DataFileWriter[Address] = {
val schema = ReflectData.get.getSchema(classOf[Address])
val datumWriter = new ReflectDatumWriter[Address](schema)

val dataFileWriter = new DataFileWriter[Address](datumWriter)
dataFileWriter.setCodec(CodecFactory.snappyCodec)
dataFileWriter.create(schema, out)
dataFileWriter
}
})

val stream: DataStream[Address] = ...
stream.sinkTo(FileSink.forBulkFormat(
outputBasePath,
factory).build());
```
{{< /tab >}}
{{< /tabs >}}

<a name="orc-format"></a>
Expand Down Expand Up @@ -698,24 +602,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import java.nio.charset.StandardCharsets
import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
ageColVector.vector(batch.size + 1) = element.getAge
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -744,23 +630,6 @@ final FileSink<Person> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory

val schema: String = "struct<_col0:string,_col1:int>"
val input: DataStream[Person] = ...
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));

val sink: FileSink[Person] = FileSink
.forBulkFormat(outputBasePath, writerFactory)
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand All @@ -782,19 +651,6 @@ final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val schema: String = ...
val conf: Configuration = ...
val writerProperties: Properties = new Properties()

writerProperties.setProperty("orc.compress", "LZ4")
// 其他 ORC 属性也可以使用类似方式进行设置

val writerFactory = new OrcBulkWriterFactory(
new PersonVectorizer(schema), writerProperties, conf)
```
{{< /tab >}}
{{< /tabs >}}

完整的 ORC 输出属性列表可以参考 [此文档](https://orc.apache.org/docs/hive-config.html)
Expand All @@ -815,22 +671,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
...
val metadataKey: String = ...
val metadataValue: ByteBuffer = ...
addUserMetadata(metadataKey, metadataValue)
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -888,27 +728,6 @@ final FileSink<Tuple2<LongWritable, Text>> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;

val input: DataStream[(LongWritable, Text)] = ...
val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
val sink: FileSink[(LongWritable, Text)] = FileSink
.forBulkFormat(
outputBasePath,
new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -1050,24 +869,6 @@ FileSink<Tuple2<Integer, Integer>> sink = FileSink
.withOutputFileConfig(config)
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build()

val sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -1113,22 +914,6 @@ FileSink<Integer> fileSink=
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val fileSink: FileSink[Integer] =
FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setNumCompactThreads(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor(
new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down
20 changes: 0 additions & 20 deletions docs/content.zh/docs/connectors/datastream/formats/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ under the License.

下面的示例展示了如何使用 Hadoop 的 `TextInputFormat`

{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
{{< tab "Java" >}}

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();
Expand All @@ -77,21 +74,4 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
[...]
```

{{< /tab >}}
{{< tab "Scala" >}}

```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
textInputFormat, classOf[Text], classOf[Text], textPath))

// Do something with the data.
[...]
```

{{< /tab >}}
{{< /tabs >}}

{{< top >}}
Loading

0 comments on commit 0bd63f2

Please sign in to comment.