Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36352][doc] Remove Scala DataStream API related doc #26082

Merged
merged 1 commit into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content.zh/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Flink 运行时由两种类型的进程组成:一个 _JobManager_ 和一个或

{{< img src="/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="70%" >}}

*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。
*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。

可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为[standalone 集群]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})启动、在容器中启动、或者通过[YARN]({{< ref "docs/deployment/resource-providers/yarn" >}})等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。

Expand Down
215 changes: 0 additions & 215 deletions docs/content.zh/docs/connectors/datastream/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,6 @@ final FileSink<String> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import java.time.Duration

val input: DataStream[String] = ...

val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -444,23 +418,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -497,22 +454,6 @@ final FileSink<ProtoRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord 是一个生成 protobuf 的类
val input: DataStream[ProtoRecord] = ...

val sink: FileSink[ProtoRecord] = FileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -567,23 +508,6 @@ final FileSink<GenericRecord> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters
import org.apache.avro.Schema

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...

val sink: FileSink[GenericRecord] = FileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -624,26 +548,6 @@ stream.sinkTo(FileSink.forBulkFormat(
factory).build());
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
override def createWriter(out: OutputStream): DataFileWriter[Address] = {
val schema = ReflectData.get.getSchema(classOf[Address])
val datumWriter = new ReflectDatumWriter[Address](schema)

val dataFileWriter = new DataFileWriter[Address](datumWriter)
dataFileWriter.setCodec(CodecFactory.snappyCodec)
dataFileWriter.create(schema, out)
dataFileWriter
}
})

val stream: DataStream[Address] = ...
stream.sinkTo(FileSink.forBulkFormat(
outputBasePath,
factory).build());
```
{{< /tab >}}
{{< /tabs >}}

<a name="orc-format"></a>
Expand Down Expand Up @@ -698,24 +602,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import java.nio.charset.StandardCharsets
import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
ageColVector.vector(batch.size + 1) = element.getAge
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -744,23 +630,6 @@ final FileSink<Person> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory

val schema: String = "struct<_col0:string,_col1:int>"
val input: DataStream[Person] = ...
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));

val sink: FileSink[Person] = FileSink
.forBulkFormat(outputBasePath, writerFactory)
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand All @@ -782,19 +651,6 @@ final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val schema: String = ...
val conf: Configuration = ...
val writerProperties: Properties = new Properties()

writerProperties.setProperty("orc.compress", "LZ4")
// 其他 ORC 属性也可以使用类似方式进行设置

val writerFactory = new OrcBulkWriterFactory(
new PersonVectorizer(schema), writerProperties, conf)
```
{{< /tab >}}
{{< /tabs >}}

完整的 ORC 输出属性列表可以参考 [此文档](https://orc.apache.org/docs/hive-config.html) 。
Expand All @@ -815,22 +671,6 @@ public class PersonVectorizer extends Vectorizer<Person> implements Serializable
}
}

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {

override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
...
val metadataKey: String = ...
val metadataValue: ByteBuffer = ...
addUserMetadata(metadataKey, metadataValue)
}

}

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -888,27 +728,6 @@ final FileSink<Tuple2<LongWritable, Text>> sink = FileSink

input.sinkTo(sink);

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;

val input: DataStream[(LongWritable, Text)] = ...
val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
val sink: FileSink[(LongWritable, Text)] = FileSink
.forBulkFormat(
outputBasePath,
new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
.build()

input.sinkTo(sink)

```
{{< /tab >}}
{{< /tabs >}}
Expand Down Expand Up @@ -1050,24 +869,6 @@ FileSink<Tuple2<Integer, Integer>> sink = FileSink
.withOutputFileConfig(config)
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build()

val sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down Expand Up @@ -1113,22 +914,6 @@ FileSink<Integer> fileSink=
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val fileSink: FileSink[Integer] =
FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setNumCompactThreads(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor(
new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
.build()

```
{{< /tab >}}
{{< tab "Python" >}}
Expand Down
20 changes: 0 additions & 20 deletions docs/content.zh/docs/connectors/datastream/formats/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ under the License.

下面的示例展示了如何使用 Hadoop 的 `TextInputFormat`。

{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
{{< tab "Java" >}}

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat();
Expand All @@ -77,21 +74,4 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
[...]
```

{{< /tab >}}
{{< tab "Scala" >}}

```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new KeyValueTextInputFormat
val input: DataStream[(Text, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
textInputFormat, classOf[Text], classOf[Text], textPath))

// Do something with the data.
[...]
```

{{< /tab >}}
{{< /tabs >}}

{{< top >}}
Loading