diff --git a/docs/content.zh/docs/concepts/flink-architecture.md b/docs/content.zh/docs/concepts/flink-architecture.md index e57b06fcd1308..ddbf6b374f4da 100644 --- a/docs/content.zh/docs/concepts/flink-architecture.md +++ b/docs/content.zh/docs/concepts/flink-architecture.md @@ -35,7 +35,7 @@ Flink 运行时由两种类型的进程组成:一个 _JobManager_ 和一个或 {{< img src="/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="70%" >}} -*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。 +*Client* 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java 程序的一部分运行,也可以在命令行进程`./bin/flink run ...`中运行。 可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为[standalone 集群]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})启动、在容器中启动、或者通过[YARN]({{< ref "docs/deployment/resource-providers/yarn" >}})等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。 diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md index 6bdb30f07952d..2ad4a272842a8 100644 --- a/docs/content.zh/docs/connectors/datastream/filesystem.md +++ b/docs/content.zh/docs/connectors/datastream/filesystem.md @@ -341,32 +341,6 @@ final FileSink sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.serialization.SimpleStringEncoder -import org.apache.flink.core.fs.Path -import org.apache.flink.configuration.MemorySize -import org.apache.flink.connector.file.sink.FileSink -import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy - -import java.time.Duration - -val input: DataStream[String] = ... - -val sink: FileSink[String] = FileSink - .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) - .withRollingPolicy( - DefaultRollingPolicy.builder() - .withRolloverInterval(Duration.ofMinutes(15)) - .withInactivityInterval(Duration.ofMinutes(5)) - .withMaxPartSize(MemorySize.ofMebiBytes(1024)) - .build()) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< tab "Python" >}} @@ -444,23 +418,6 @@ final FileSink sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.connector.file.sink.FileSink; -import org.apache.flink.formats.parquet.avro.AvroParquetWriters -import org.apache.avro.Schema - -val schema: Schema = ... -val input: DataStream[GenericRecord] = ... - -val sink: FileSink[GenericRecord] = FileSink - .forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema)) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< tab "Python" >}} @@ -497,22 +454,6 @@ final FileSink sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.connector.file.sink.FileSink; -import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters - -// ProtoRecord 是一个生成 protobuf 的类 -val input: DataStream[ProtoRecord] = ... - -val sink: FileSink[ProtoRecord] = FileSink - .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord])) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< /tabs >}} @@ -567,23 +508,6 @@ final FileSink sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.connector.file.sink.FileSink; -import org.apache.flink.formats.avro.AvroWriters -import org.apache.avro.Schema - -val schema: Schema = ... -val input: DataStream[GenericRecord] = ... - -val sink: FileSink[GenericRecord] = FileSink - .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema)) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< tab "Python" >}} @@ -624,26 +548,6 @@ stream.sinkTo(FileSink.forBulkFormat( factory).build()); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() { - override def createWriter(out: OutputStream): DataFileWriter[Address] = { - val schema = ReflectData.get.getSchema(classOf[Address]) - val datumWriter = new ReflectDatumWriter[Address](schema) - - val dataFileWriter = new DataFileWriter[Address](datumWriter) - dataFileWriter.setCodec(CodecFactory.snappyCodec) - dataFileWriter.create(schema, out) - dataFileWriter - } -}) - -val stream: DataStream[Address] = ... -stream.sinkTo(FileSink.forBulkFormat( - outputBasePath, - factory).build()); -``` -{{< /tab >}} {{< /tabs >}} @@ -698,24 +602,6 @@ public class PersonVectorizer extends Vectorizer implements Serializable } } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import java.nio.charset.StandardCharsets -import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector} - -class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { - - override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = { - val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector] - val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector] - nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8)) - ageColVector.vector(batch.size + 1) = element.getAge - } - -} - ``` {{< /tab >}} {{< /tabs >}} @@ -744,23 +630,6 @@ final FileSink sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.connector.file.sink.FileSink; -import org.apache.flink.orc.writer.OrcBulkWriterFactory - -val schema: String = "struct<_col0:string,_col1:int>" -val input: DataStream[Person] = ... -val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema)); - -val sink: FileSink[Person] = FileSink - .forBulkFormat(outputBasePath, writerFactory) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< /tabs >}} @@ -782,19 +651,6 @@ final OrcBulkWriterFactory writerFactory = new OrcBulkWriterFactory<>( ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val schema: String = ... -val conf: Configuration = ... -val writerProperties: Properties = new Properties() - -writerProperties.setProperty("orc.compress", "LZ4") -// 其他 ORC 属性也可以使用类似方式进行设置 - -val writerFactory = new OrcBulkWriterFactory( - new PersonVectorizer(schema), writerProperties, conf) -``` -{{< /tab >}} {{< /tabs >}} 完整的 ORC 输出属性列表可以参考 [此文档](https://orc.apache.org/docs/hive-config.html) 。 @@ -815,22 +671,6 @@ public class PersonVectorizer extends Vectorizer implements Serializable } } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { - - override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = { - ... - val metadataKey: String = ... - val metadataValue: ByteBuffer = ... - addUserMetadata(metadataKey, metadataValue) - } - -} - ``` {{< /tab >}} {{< /tabs >}} @@ -888,27 +728,6 @@ final FileSink> sink = FileSink input.sinkTo(sink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.connector.file.sink.FileSink; -import org.apache.flink.configuration.GlobalConfiguration -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.SequenceFile -import org.apache.hadoop.io.Text; - -val input: DataStream[(LongWritable, Text)] = ... -val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()) -val sink: FileSink[(LongWritable, Text)] = FileSink - .forBulkFormat( - outputBasePath, - new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class)) - .build() - -input.sinkTo(sink) - ``` {{< /tab >}} {{< /tabs >}} @@ -1050,24 +869,6 @@ FileSink> sink = FileSink .withOutputFileConfig(config) .build(); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -val config = OutputFileConfig - .builder() - .withPartPrefix("prefix") - .withPartSuffix(".ext") - .build() - -val sink = FileSink - .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) - .withBucketAssigner(new KeyBucketAssigner()) - .withRollingPolicy(OnCheckpointRollingPolicy.build()) - .withOutputFileConfig(config) - .build() - ``` {{< /tab >}} {{< tab "Python" >}} @@ -1113,22 +914,6 @@ FileSink fileSink= new DecoderBasedReader.Factory<>(SimpleStringDecoder::new))) .build(); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -val fileSink: FileSink[Integer] = - FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]()) - .enableCompact( - FileCompactStrategy.Builder.newBuilder() - .setNumCompactThreads(1024) - .enableCompactionOnCheckpoint(5) - .build(), - new RecordWiseFileCompactor( - new DecoderBasedReader.Factory(() => new SimpleStringDecoder))) - .build() - ``` {{< /tab >}} {{< tab "Python" >}} diff --git a/docs/content.zh/docs/connectors/datastream/formats/hadoop.md b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md index 164294618cda9..e7c1a6283f805 100644 --- a/docs/content.zh/docs/connectors/datastream/formats/hadoop.md +++ b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md @@ -63,9 +63,6 @@ under the License. 下面的示例展示了如何使用 Hadoop 的 `TextInputFormat`。 -{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}} -{{< tab "Java" >}} - ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KeyValueTextInputFormat textInputFormat = new KeyValueTextInputFormat(); @@ -77,21 +74,4 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm [...] ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment -val textInputFormat = new KeyValueTextInputFormat -val input: DataStream[(Text, Text)] = - env.createInput(HadoopInputs.readHadoopFile( - textInputFormat, classOf[Text], classOf[Text], textPath)) - - // Do something with the data. - [...] -``` - -{{< /tab >}} -{{< /tabs >}} - {{< top >}} diff --git a/docs/content.zh/docs/deployment/advanced/external_resources.md b/docs/content.zh/docs/deployment/advanced/external_resources.md index a145a92372aa0..8f230ee27d5e9 100644 --- a/docs/content.zh/docs/deployment/advanced/external_resources.md +++ b/docs/content.zh/docs/deployment/advanced/external_resources.md @@ -109,8 +109,6 @@ external-resource.fpga.yarn.config-key: yarn.io/fpga # 定义 FPGA 在 Yarn 中 算子可以通过 `getExternalResourceInfos(String resourceName)` 从 `RuntimeContext` 或 `FunctionContext` 中获取特定扩展资源的 `ExternalResourceInfo`。 此处的 `resourceName` 应与在扩展资源列表中定义的名称相同。具体用法如下: -{{< tabs "5e1a48c8-14ae-4836-b5fd-84879c4bf36d" >}} -{{< tab "Java" >}} ```java public class ExternalResourceMapFunction extends RichMapFunction { private static final String RESOURCE_NAME = "foo"; @@ -126,25 +124,6 @@ public class ExternalResourceMapFunction extends RichMapFunction } } ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -class ExternalResourceMapFunction extends RichMapFunction[(String, String)] { - var RESOURCE_NAME = "foo" - - override def map(value: String): String = { - val externalResourceInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME) - val addresses = new util.ArrayList[String] - externalResourceInfos.asScala.foreach( - externalResourceInfo => addresses.add(externalResourceInfo.getProperty("address").get())) - - // map function with addresses. - // ... - } -} -``` -{{< /tab >}} -{{< /tabs >}} `ExternalResourceInfo` 中包含一个或多个键-值对,其键值表示资源的不同维度。你可以通过 `ExternalResourceInfo#getKeys` 获取所有的键。 @@ -166,8 +145,6 @@ class ExternalResourceMapFunction extends RichMapFunction[(String, String)] { 例如,要为名为“FPGA”的扩展资源实现插件,你首先需要实现 `FPGADriver` 和 `FPGADriverFactory`: -{{< tabs "b44c0b2c-52ef-4281-8a93-40ca3843c3b8" >}} -{{< tab "Java" >}} ```java public class FPGADriver implements ExternalResourceDriver { @Override @@ -196,34 +173,6 @@ public class FPGAInfo implements ExternalResourceInfo { } } ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -class FPGADriver extends ExternalResourceDriver { - override def retrieveResourceInfo(amount: Long): Set[FPGAInfo] = { - // return the information set of "FPGA" - } -} - -class FPGADriverFactory extends ExternalResourceDriverFactory { - override def createExternalResourceDriver(config: Configuration): ExternalResourceDriver = { - new FPGADriver() - } -} - -// Also implement FPGAInfo which contains basic properties of "FPGA" resource. -class FPGAInfo extends ExternalResourceInfo { - override def getProperty(key: String): Option[String] = { - // return the property with the given key. - } - - override def getKeys(): util.Collection[String] = { - // return all property keys. - } -} -``` -{{< /tab >}} -{{< /tabs >}} 在 `META-INF/services/` 中创建名为 `org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory` 的文件,向其中写入工厂类名,如 `your.domain.FPGADriverFactory`。 diff --git a/docs/content.zh/docs/deployment/cli.md b/docs/content.zh/docs/deployment/cli.md index 5b5fac6cd564e..cd624ed437352 100644 --- a/docs/content.zh/docs/deployment/cli.md +++ b/docs/content.zh/docs/deployment/cli.md @@ -566,7 +566,7 @@ related options. Here's an overview of all the Python related options for the ac -pyclientexec,--pyClientExecutable The path of the Python interpreter used to launch the Python process when submitting - the Python jobs via \"flink run\" or compiling the Java/Scala jobs containing + the Python jobs via \"flink run\" or compiling the Java jobs containing Python UDFs. (e.g., --pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python) diff --git a/docs/content.zh/docs/deployment/finegrained_resource.md b/docs/content.zh/docs/deployment/finegrained_resource.md index 258c6dad266ec..b7b0b464f04d1 100644 --- a/docs/content.zh/docs/deployment/finegrained_resource.md +++ b/docs/content.zh/docs/deployment/finegrained_resource.md @@ -113,26 +113,6 @@ someStream.filter(...).slotSharingGroup("a") // 设置Slot共享组的名字为 env.registerSlotSharingGroup(ssgA); // 注册共享组的资源 ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment - -val ssgA = SlotSharingGroup.newBuilder("a") - .setCpuCores(1.0) - .setTaskHeapMemoryMB(100) - .build() - -val ssgB = SlotSharingGroup.newBuilder("b") - .setCpuCores(0.5) - .setTaskHeapMemoryMB(100) - .build() - -someStream.filter(...).slotSharingGroup("a") // 设置Slot共享组的名字为‘a’ -.map(...).slotSharingGroup(ssgB) // 直接设置Slot共享组的名字和资源. - -env.registerSlotSharingGroup(ssgA) // 注册共享组的资源 -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -183,23 +163,6 @@ SlotSharingGroup ssgWithName = SlotSharingGroup.newBuilder("ssg").build(); env.registerSlotSharingGroup(ssgWithResource); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -// 通过指定资源直接构建一个 slot 共享组 -val ssgWithResource = - SlotSharingGroup.newBuilder("ssg") - .setCpuCores(1.0) // required - .setTaskHeapMemoryMB(100) // required - .setTaskOffHeapMemoryMB(50) - .setManagedMemory(MemorySize.ofMebiBytes(200)) - .setExternalResource("gpu", 1.0) - .build() - -// 构建一个 slot 共享组未指定资源,然后在 StreamExecutionEnvironment中注册资源 -val ssgWithName = SlotSharingGroup.newBuilder("ssg").build() -env.registerSlotSharingGroup(ssgWithResource) -``` -{{< /tab >}} {{< tab "Python" >}} ```python # Directly build a slot sharing group with specific resource diff --git a/docs/content.zh/docs/dev/configuration/advanced.md b/docs/content.zh/docs/dev/configuration/advanced.md index 80aa2ad349f9a..08e8ec45260a7 100644 --- a/docs/content.zh/docs/dev/configuration/advanced.md +++ b/docs/content.zh/docs/dev/configuration/advanced.md @@ -40,7 +40,7 @@ Flink 还在 `/opt` 文件夹下提供了额外的可选依赖项,可以通过 ## Scala 版本 -不同的 Scala 版本二进制不兼容,所有(传递地)依赖于 Scala 的 Flink 依赖项都以它们构建的 Scala 版本为后缀(如 `flink-streaming-scala_2.12`)。 +不同的 Scala 版本二进制不兼容,所有(传递地)依赖于 Scala 的 Flink 依赖项都以它们构建的 Scala 版本为后缀(如 `flink-table-api-scala-bridge_2.12`)。 如果您只使用 Flink 的 Java API,您可以使用任何 Scala 版本。如果您使用 Flink 的 Scala API,则需要选择与应用程序的 Scala 匹配的 Scala 版本。 diff --git a/docs/content.zh/docs/dev/configuration/maven.md b/docs/content.zh/docs/dev/configuration/maven.md index 761631d5d8db3..dd1bd066979f1 100644 --- a/docs/content.zh/docs/dev/configuration/maven.md +++ b/docs/content.zh/docs/dev/configuration/maven.md @@ -67,7 +67,7 @@ __注意:__ 如果您使用不同于 `DataStreamJob` 的类作为应用程序 然后在命令行执行 `mvn install`。 -当您在由 `Java Project Template`、`Scala Project Template` 或 Gradle 创建出来的项目里,运行 `mvn clean package` 会自动将应用程序依赖打包进应用程序 JAR。对于不是通过这些模板创建的项目,我们建议使用 Maven Shade 插件以将所有必需的依赖项打包进应用程序 jar。 +当您在由 `Java Project Template` 或 Gradle 创建出来的项目里,运行 `mvn clean package` 会自动将应用程序依赖打包进应用程序 JAR。对于不是通过这些模板创建的项目,我们建议使用 Maven Shade 插件以将所有必需的依赖项打包进应用程序 jar。 **重要提示:** 请注意,应将所有这些(核心)依赖项的生效范围置为 [*provided*](https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope)。这意味着需要对它们进行编译,但不应将它们打包进项目生成的应用程序 JAR 文件中。如果不设置为 *provided*,最好的情况是生成的 JAR 变得过大,因为它还包含所有 Flink 核心依赖项。最坏的情况是添加到应用程序 JAR 文件中的 Flink 核心依赖项与您自己的一些依赖项的版本冲突(通常通过反向类加载来避免)。 diff --git a/docs/content.zh/docs/dev/configuration/overview.md b/docs/content.zh/docs/dev/configuration/overview.md index f715ec7b8c1c8..2327383d578ee 100644 --- a/docs/content.zh/docs/dev/configuration/overview.md +++ b/docs/content.zh/docs/dev/configuration/overview.md @@ -57,12 +57,6 @@ under the License. -{{< hint warning >}} -All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API. - -See FLIP-265 Deprecate and remove Scala API support -{{< /hint >}} - ### Maven 命令 ```bash $ mvn archetype:generate \ @@ -230,7 +224,7 @@ Flink提供了两大 API:[Datastream API]({{< ref "docs/dev/datastream/overvie 如果你想通过简单地执行主类来运行你的作业,你需要 classpath 里包含 `flink-clients`。对于 Table API 程序,你还需要在 classpath 中包含 `flink-table-runtime` 和 `flink-table-planner-loader`。 -根据经验,我们**建议**将应用程序代码及其所有必需的依赖项打包进一个 fat/uber JAR 中。这包括打包你作业用到的连接器、格式和第三方依赖项。此规则**不适用于** Java API、DataStream Scala API 以及前面提到的运行时模块,它们已经由 Flink 本身提供,**不应**包含在作业的 uber JAR 中。你可以把该作业 JAR 提交到已经运行的 Flink 集群,也可以轻松将其添加到 Flink 应用程序容器镜像中,而无需修改发行版。 +根据经验,我们**建议**将应用程序代码及其所有必需的依赖项打包进一个 fat/uber JAR 中。这包括打包你作业用到的连接器、格式和第三方依赖项。此规则**不适用于** Java API 以及前面提到的运行时模块,它们已经由 Flink 本身提供,**不应**包含在作业的 uber JAR 中。你可以把该作业 JAR 提交到已经运行的 Flink 集群,也可以轻松将其添加到 Flink 应用程序容器镜像中,而无需修改发行版。 diff --git a/docs/content.zh/docs/dev/datastream/event-time/built_in.md b/docs/content.zh/docs/dev/datastream/event-time/built_in.md index 792dfad43c73f..9b35e9e9c539c 100644 --- a/docs/content.zh/docs/dev/datastream/event-time/built_in.md +++ b/docs/content.zh/docs/dev/datastream/event-time/built_in.md @@ -45,11 +45,6 @@ under the License. WatermarkStrategy.forMonotonousTimestamps(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -WatermarkStrategy.forMonotonousTimestamps() -``` -{{< /tab >}} {{< tab "Python" >}} ```python WatermarkStrategy.for_monotonous_timestamps() @@ -69,12 +64,6 @@ WatermarkStrategy.for_monotonous_timestamps() WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(10)) -``` -{{< /tab >}} {{< tab "Python" >}} ```python WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10)) diff --git a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md index 1bdea7880e5ec..33f020508c827 100644 --- a/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md +++ b/docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md @@ -68,15 +68,6 @@ WatermarkStrategy .withTimestampAssigner((event, timestamp) -> event.f0); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -WatermarkStrategy - .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) - .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] { - override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1 - }) -``` -{{< /tab >}} {{< tab "Python" >}} ```python class FirstElementTimestampAssigner(TimestampAssigner): @@ -130,25 +121,6 @@ withTimestampsAndWatermarks .addSink(...); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment - -val stream: DataStream[MyEvent] = env.readFile( - myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, - FilePathFilter.createDefaultFilter()) - -val withTimestampsAndWatermarks: DataStream[MyEvent] = stream - .filter( _.severity == WARNING ) - .assignTimestampsAndWatermarks() - -withTimestampsAndWatermarks - .keyBy( _.getGroup ) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(10))) - .reduce( (a, b) => a.add(b) ) - .addSink(...) -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -189,13 +161,6 @@ WatermarkStrategy .withIdleness(Duration.ofMinutes(1)); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -WatermarkStrategy - .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) - .withIdleness(Duration.ofMinutes(1)) -``` -{{< /tab >}} {{< tab "Python" >}} ```python WatermarkStrategy \ @@ -233,13 +198,6 @@ WatermarkStrategy .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -WatermarkStrategy - .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) - .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)) -``` -{{< /tab >}} {{< tab "Python" >}} ```python WatermarkStrategy \ @@ -378,45 +336,6 @@ public class TimeLagWatermarkGenerator implements WatermarkGenerator { } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -/** - * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。 - * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。 - */ -class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { - - val maxOutOfOrderness = 3500L // 3.5 秒 - - var currentMaxTimestamp: Long = _ - - override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { - currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp) - } - - override def onPeriodicEmit(): Unit = { - // 发出的 watermark = 当前最大时间戳 - 最大乱序时间 - output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)) - } -} - -/** - * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。 - */ -class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { - - val maxTimeLag = 5000L // 5 秒 - - override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { - // 处理时间场景下不需要实现 - } - - override def onPeriodicEmit(): Unit = { - output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag)) - } -} -``` -{{< /tab >}} {{< tab "Python" >}} ```python 目前在python中不支持该api @@ -451,22 +370,6 @@ public class PunctuatedAssigner implements WatermarkGenerator { } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { - - override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { - if (event.hasWatermarkMarker()) { - output.emitWatermark(new Watermark(event.getWatermarkTimestamp())) - } - } - - override def onPeriodicEmit(): Unit = { - // onEvent 中已经实现 - } -} -``` -{{< /tab >}} {{< tab "Python" >}} ```python Python API 中尚不支持该特性。 @@ -505,20 +408,6 @@ DataStream stream = env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]() - .setBootstrapServers("brokers") - .setTopics("my-topic") - .setGroupId("my-group") - .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new SimpleStringSchema) - .build() - -val stream = env.fromSource( - kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource") -``` -{{< /tab >}} {{< tab "Python" >}} ```python kafka_source = KafkaSource.builder() diff --git a/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md b/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md index 46ebc3bde1f37..9a45bbb65300c 100644 --- a/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md +++ b/docs/content.zh/docs/dev/datastream/execution/execution_configuration.md @@ -37,12 +37,6 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ExecutionConfig executionConfig = env.getConfig(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment -var executionConfig = env.getConfig -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() diff --git a/docs/content.zh/docs/dev/datastream/execution/parallel.md b/docs/content.zh/docs/dev/datastream/execution/parallel.md index 7d1232d7a0c0d..7b8754d43feca 100644 --- a/docs/content.zh/docs/dev/datastream/execution/parallel.md +++ b/docs/content.zh/docs/dev/datastream/execution/parallel.md @@ -58,21 +58,6 @@ wordCounts.print(); env.execute("Word Count Example"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment - -val text = [...] -val wordCounts = text - .flatMap{ _.split(" ") map { (_, 1) } } - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) - .sum(1).setParallelism(5) -wordCounts.print() - -env.execute("Word Count Example") -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -112,22 +97,6 @@ wordCounts.print(); env.execute("Word Count Example"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment -env.setParallelism(3) - -val text = [...] -val wordCounts = text - .flatMap{ _.split(" ") map { (_, 1) } } - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) - .sum(1) -wordCounts.print() - -env.execute("Word Count Example") -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -150,14 +119,14 @@ env.execute("Word Count Example") ### 客户端层次 -将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。 +将作业提交到 Flink 时可在客户端设定其并行度,Flink 的命令行接口(CLI)就是一种典型的客户端。 在 CLI 客户端中,可以通过 `-p` 参数指定并行度,例如: ./bin/flink run -p 10 ../examples/*WordCount-java*.jar -在 Java/Scala 程序中,可以通过如下方式指定并行度: +在客户端程序中,可以通过如下方式指定并行度: {{< tabs "59257013-dbf1-41d8-a719-72ace65f63ff" >}} {{< tab "Java" >}} @@ -177,23 +146,6 @@ try { e.printStackTrace(); } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -try { - PackagedProgram program = new PackagedProgram(file, args) - InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123") - Configuration config = new Configuration() - - Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader()) - - // set the parallelism to 10 here - client.run(program, 10, true) - -} catch { - case e: Exception => e.printStackTrace -} ``` {{< /tab >}} {{< tab "Python" >}} diff --git a/docs/content.zh/docs/dev/datastream/experimental.md b/docs/content.zh/docs/dev/datastream/experimental.md index 2e5c86ef09ac3..66deddd5c5d85 100644 --- a/docs/content.zh/docs/dev/datastream/experimental.md +++ b/docs/content.zh/docs/dev/datastream/experimental.md @@ -58,8 +58,6 @@ the method creates a keyed stream from the base stream. Code example: -{{< tabs "0bf7bee6-23be-42b5-bf90-afab0a4f8dc2" >}} -{{< tab "Java" >}} ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = ...; @@ -69,19 +67,5 @@ DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of( .addSink(new DiscardingSink<>()); env.execute(); ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment -env.setParallelism(1) -val source = ... -new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(1))) - .reduce((a, b) => a + b) - .addSink(new DiscardingSink[Int]) -env.execute() -``` -{{< /tab >}} -{{< /tabs >}} {{< top >}} diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 83ade4fbf6878..295212ee5d4d1 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -114,38 +114,6 @@ env.getCheckpointConfig().setExternalizedCheckpointRetention( env.getCheckpointConfig().enableUnalignedCheckpoints(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() - -// 每 1000ms 开始一次 checkpoint -env.enableCheckpointing(1000) - -// 高级选项: - -// 设置模式为精确一次 (这是默认值) -env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - -// 确认 checkpoints 之间的时间会进行 500 ms -env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) - -// Checkpoint 必须在一分钟内完成,否则就会被抛弃 -env.getCheckpointConfig.setCheckpointTimeout(60000) - -// 允许两个连续的 checkpoint 错误 -env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2) - -// 同一时间只允许一个 checkpoint 进行 -env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) - -// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 -env.getCheckpointConfig().setExternalizedCheckpointRetention( - ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) - -// 开启实验性的 unaligned checkpoints -env.getCheckpointConfig.enableUnalignedCheckpoints() -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md index e4d8c3b9c13d6..cc10ae23b04aa 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md @@ -33,7 +33,7 @@ Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})。 ## Keyed DataStream 如果你希望使用 keyed state,首先需要为`DataStream`指定 key(主键)。这个主键用于状态分区(也会给数据流中的记录本身分区)。 -你可以使用 `DataStream` 中 Java/Scala API 的 `keyBy(KeySelector)` 或者是 Python API 的 `key_by(KeySelector)` 来指定 key。 +你可以使用 `DataStream` 中 Java API 的 `keyBy(KeySelector)` 或者是 Python API 的 `key_by(KeySelector)` 来指定 key。 它将生成 `KeyedStream`,接下来允许使用 keyed state 操作。 Key selector 函数接收单条记录作为输入,返回这条记录的 key。该 key 可以为任何类型,但是它的计算产生方式**必须**是具备确定性的。 @@ -59,15 +59,6 @@ KeyedStream keyed = words ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -// some ordinary case class -case class WC(word: String, count: Int) -val words: DataStream[WC] = // [...] -val keyed = words.keyBy( _.word ) -``` -{{< /tab >}} - {{< tab "Python" >}} ```python words = # type: DataStream[Row] @@ -78,7 +69,7 @@ keyed = words.key_by(lambda row: row[0]) #### Tuple Keys 和 Expression Keys -Flink 也有两种不同定义 key 的方式:Java/Scala API(Python API 仍未支持) 的 Tuple key(通过字段索引指定的 key)和 Expression key(通过字段名称指定的 key)。 +Flink 也有两种不同定义 key 的方式:Java API(Python API 仍未支持) 的 Tuple key(通过字段索引指定的 key)和 Expression key(通过字段名称指定的 key)。 借此你可以通过 tuple 字段索引,或者是选取对象字段的表达式来指定 key。 如今我们不建议这样使用,但你可以参考 `DataStream` 的 Javadoc 来了解它们。 使用 KeySelector 函数显然是更好的。以几乎可以忽略的额外开销为代价,结合 Java Lambda 表达式,我们可以更方便得使用KeySelector。 @@ -88,7 +79,7 @@ Flink 也有两种不同定义 key 的方式:Java/Scala API(Python API 仍 ## 使用 Keyed State keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream` -上使用,在Java/Scala API上可以通过 `stream.keyBy(...)` 得到 `KeyedStream`,在Python API上可以通过 `stream.key_by(...)` 得到 `KeyedStream`。 +上使用,在Java API上可以通过 `stream.keyBy(...)` 得到 `KeyedStream`,在Python API上可以通过 `stream.key_by(...)` 得到 `KeyedStream`。 接下来,我们会介绍不同类型的状态,然后介绍如何使用他们。所有支持的状态类型如下所示: @@ -179,64 +170,6 @@ env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2 // the printed output will be (1,4) and (1,5) ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { - - private var sum: ValueState[(Long, Long)] = _ - - override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { - - // access the state value - val tmpCurrentSum = sum.value - - // If it hasn't been used before, it will be null - val currentSum = if (tmpCurrentSum != null) { - tmpCurrentSum - } else { - (0L, 0L) - } - - // update the count - val newSum = (currentSum._1 + 1, currentSum._2 + input._2) - - // update the state - sum.update(newSum) - - // if the count reaches 2, emit the average and clear the state - if (newSum._1 >= 2) { - out.collect((input._1, newSum._2 / newSum._1)) - sum.clear() - } - } - - override def open(parameters: Configuration): Unit = { - sum = getRuntimeContext.getState( - new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) - ) - } -} - - -object ExampleCountWindowAverage extends App { - val env = StreamExecutionEnvironment.getExecutionEnvironment - - env.fromCollection(List( - (1L, 3L), - (1L, 5L), - (1L, 7L), - (1L, 4L), - (1L, 2L) - )).keyBy(_._1) - .flatMap(new CountWindowAverage()) - .print() - // the printed output will be (1,4) and (1,5) - - env.execute("ExampleKeyedState") -} -``` -{{< /tab >}} - {{< tab "Python" >}} ```python from pyflink.common.typeinfo import Types @@ -314,38 +247,6 @@ ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text stateDescriptor.enableTimeToLive(ttlConfig); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.StateTtlConfig -import org.apache.flink.api.common.state.ValueStateDescriptor -import java.time.Duration - -val ttlConfig = StateTtlConfig - .newBuilder(Duration.ofSeconds(1)) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .build - -val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) -stateDescriptor.enableTimeToLive(ttlConfig) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -from pyflink.common.time import Time -from pyflink.common.typeinfo import Types -from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig - -ttl_config = StateTtlConfig \ - .new_builder(Duration.ofSeconds(1)) \ - .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \ - .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \ - .build() - -state_descriptor = ValueStateDescriptor("text state", Types.STRING()) -state_descriptor.enable_time_to_live(ttl_config) -``` -{{< /tab >}} {{< /tabs >}} TTL 配置有以下几个选项: @@ -404,16 +305,6 @@ StateTtlConfig ttlConfig = StateTtlConfig .build(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.StateTtlConfig - -val ttlConfig = StateTtlConfig - .newBuilder(Duration.ofSeconds(1)) - .disableCleanupInBackground - .build -``` -{{< /tab >}} {{< tab "Python" >}} ```python from pyflink.common.time import Time @@ -446,17 +337,6 @@ StateTtlConfig ttlConfig = StateTtlConfig .build(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.StateTtlConfig -import java.time.Duration - -val ttlConfig = StateTtlConfig - .newBuilder(Duration.ofSeconds(1)) - .cleanupFullSnapshot - .build -``` -{{< /tab >}} {{< tab "Python" >}} ```python from pyflink.common.time import Time @@ -492,15 +372,6 @@ import org.apache.flink.api.common.state.StateTtlConfig; .build(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.StateTtlConfig -val ttlConfig = StateTtlConfig - .newBuilder(Duration.ofSeconds(1)) - .cleanupIncrementally(10, true) - .build -``` -{{< /tab >}} {{< tab "Python" >}} ```python from pyflink.common.time import Time @@ -542,16 +413,6 @@ StateTtlConfig ttlConfig = StateTtlConfig .build(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.StateTtlConfig - -val ttlConfig = StateTtlConfig - .newBuilder(Duration.ofSeconds(1)) - .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) - .build -``` -{{< /tab >}} {{< tab "Python" >}} ```python from pyflink.common import Duration @@ -590,23 +451,6 @@ RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一 - 对已有的作业,这个清理方式可以在任何时候通过 `StateTtlConfig` 启用或禁用该特性,比如从 savepoint 重启后。 - 定期压缩功能只在 TTL 启用时生效。 -### DataStream 状态相关的 Scala API - -除了上面描述的接口之外,Scala API 还在 `KeyedStream` 上对 `map()` 和 `flatMap()` 访问 `ValueState` 提供了一个更便捷的接口。 -用户函数能够通过 `Option` 获取当前 `ValueState` 的值,并且返回即将保存到状态的值。 - -```scala -val stream: DataStream[(String, Int)] = ... - -val counts: DataStream[(String, Int)] = stream - .keyBy(_._1) - .mapWithState((in: (String, Int), count: Option[Int]) => - count match { - case Some(c) => ( (in._1, c), Some(c + in._2) ) - case None => ( (in._1, 0), Some(in._2) ) - }) -``` - ## 算子状态 (Operator State) *算子状态*(或者*非 keyed 状态*)是绑定到一个并行算子实例的状态。[Kafka Connector]({{< ref "docs/connectors/datastream/kafka" >}}) 是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。 @@ -709,49 +553,6 @@ public class BufferingSink } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -class BufferingSink(threshold: Int = 0) - extends SinkFunction[(String, Int)] - with CheckpointedFunction { - - @transient - private var checkpointedState: ListState[(String, Int)] = _ - - private val bufferedElements = ListBuffer[(String, Int)]() - - override def invoke(value: (String, Int), context: Context): Unit = { - bufferedElements += value - if (bufferedElements.size >= threshold) { - for (element <- bufferedElements) { - // send it to the sink - } - bufferedElements.clear() - } - } - - override def snapshotState(context: FunctionSnapshotContext): Unit = { - checkpointedState.update(bufferedElements.asJava) - } - - override def initializeState(context: FunctionInitializationContext): Unit = { - val descriptor = new ListStateDescriptor[(String, Int)]( - "buffered-elements", - TypeInformation.of(new TypeHint[(String, Int)]() {}) - ) - - checkpointedState = context.getOperatorStateStore.getListState(descriptor) - - if(context.isRestored) { - for(element <- checkpointedState.get().asScala) { - bufferedElements += element - } - } - } - -} -``` -{{< /tab >}} {{< /tabs >}} `initializeState` 方法接收一个 `FunctionInitializationContext` 参数,会用来初始化 non-keyed state 的 "容器"。这些容器是一个 `ListState` @@ -771,18 +572,6 @@ ListStateDescriptor> descriptor = checkpointedState = context.getOperatorStateStore().getListState(descriptor); ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -val descriptor = new ListStateDescriptor[(String, Long)]( - "buffered-elements", - TypeInformation.of(new TypeHint[(String, Long)]() {}) -) - -checkpointedState = context.getOperatorStateStore.getListState(descriptor) - -``` {{< /tab >}} {{< /tabs >}} @@ -853,48 +642,6 @@ public static class CounterSource } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -class CounterSource - extends RichParallelSourceFunction[Long] - with CheckpointedFunction { - - @volatile - private var isRunning = true - - private var offset = 0L - private var state: ListState[Long] = _ - - override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { - val lock = ctx.getCheckpointLock - - while (isRunning) { - // output and state update are atomic - lock.synchronized({ - ctx.collect(offset) - - offset += 1 - }) - } - } - - override def cancel(): Unit = isRunning = false - - override def initializeState(context: FunctionInitializationContext): Unit = { - state = context.getOperatorStateStore.getListState( - new ListStateDescriptor[Long]("state", classOf[Long])) - - for (l <- state.get().asScala) { - offset = l - } - } - - override def snapshotState(context: FunctionSnapshotContext): Unit = { - state.update(java.util.Collections.singletonList(offset)) - } -} -``` -{{< /tab >}} {{< /tabs >}} 希望订阅 checkpoint 成功消息的算子,可以参考 `org.apache.flink.api.common.state.CheckpointListener` 接口。 diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md index 2fcbae4ec1a47..c8085d01291c3 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md @@ -44,12 +44,6 @@ config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); env.configure(config); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.setStateBackend(...) -``` -{{< /tab >}} {{< tab "Python" >}} ```python config = Configuration() diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md b/docs/content.zh/docs/dev/datastream/operators/asyncio.md index 3b0e3e4de0925..ebdb93a550a74 100644 --- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md +++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md @@ -66,8 +66,6 @@ Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端 下面是基本的代码模板: -{{< tabs "b9213242-26c1-4416-95c2-076a23777eec" >}} -{{< tab "Java" >}} ```java // 这个例子使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。 @@ -133,55 +131,6 @@ AsyncRetryStrategy asyncRetryStrategy = DataStream> resultStream = AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy); ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -/** - * 实现 'AsyncFunction' 用于发送请求和设置回调。 - */ -class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { - - /** 能够利用回调函数并发发送请求的数据库客户端 */ - lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) - - /** 用于 future 回调的上下文环境 */ - implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) - - - override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { - - // 发送异步请求,接收 future 结果 - val resultFutureRequested: Future[String] = client.query(str) - - // 设置客户端完成请求后要执行的回调函数 - // 回调函数只是简单地把结果发给 future - resultFutureRequested.onSuccess { - case result: String => resultFuture.complete(Iterable((str, result))) - } - } -} - -// 创建初始 DataStream -val stream: DataStream[String] = ... - -// 应用异步 I/O 转换操作,不启用重试 -val resultStream: DataStream[(String, String)] = - AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) - -// 或 应用异步 I/O 转换操作并启用重试 -// 创建一个异步重试策略 -val asyncRetryStrategy: AsyncRetryStrategy[String] = - new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms - .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) - .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) - .build(); - -// 应用异步 I/O 转换操作并启用重试 -val resultStream: DataStream[(String, String)] = - AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy) -``` -{{< /tab >}} -{{< /tabs >}} **重要提示**: 第一次调用 `ResultFuture.complete` 后 `ResultFuture` 就完成了。 后续的 `complete` 调用都将被忽略。 @@ -248,7 +197,7 @@ Flink 提供两种模式控制结果记录以何种顺序发出。 ### 实现提示 -在实现使用 *Executor*(或者 Scala 中的 *ExecutionContext*)和回调的 *Futures* 时,建议使用 `DirectExecutor`,因为通常回调的工作量很小,`DirectExecutor` 避免了额外的线程切换开销。回调通常只是把结果发送给 `ResultFuture`,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。 +在实现使用 *Executor* 和回调的 *Futures* 时,建议使用 `DirectExecutor`,因为通常回调的工作量很小,`DirectExecutor` 避免了额外的线程切换开销。回调通常只是把结果发送给 `ResultFuture`,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。 `DirectExecutor` 可以通过 `org.apache.flink.util.concurrent.Executors.directExecutor()` 或 `com.google.common.util.concurrent.MoreExecutors.directExecutor()` 获得。 diff --git a/docs/content.zh/docs/dev/datastream/operators/joining.md b/docs/content.zh/docs/dev/datastream/operators/joining.md index 3ab29579243eb..52eabf38c868f 100644 --- a/docs/content.zh/docs/dev/datastream/operators/joining.md +++ b/docs/content.zh/docs/dev/datastream/operators/joining.md @@ -54,8 +54,6 @@ stream.join(otherStream) 如图所示,我们定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 `[0,1], [2,3], ...` 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 `JoinFunction`。注意,滚动窗口 `[6,7]` 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。 -{{< tabs "a8e08868-40d6-4719-b554-e2cabf2e1f6f" >}} -{{< tab "Java" >}} ```java import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; @@ -77,27 +75,6 @@ orangeStream.join(greenStream) } }); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows -import java.time.Duration - -... - -val orangeStream: DataStream[Integer] = ... -val greenStream: DataStream[Integer] = ... - -orangeStream.join(greenStream) - .where(elem => /* select key */) - .equalTo(elem => /* select key */) - .window(TumblingEventTimeWindows.of(Duration.ofMillis(2))) - .apply { (e1, e2) => e1 + "," + e2 } -``` - -{{< /tab >}} -{{< /tabs >}} ### 滑动 Window Join @@ -108,9 +85,6 @@ orangeStream.join(greenStream) 本例中我们定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 `[-1, 0],[0,1],[1,2],[2,3], …`。 X 轴下方是每个滑动窗口中被 join 后传递给 `JoinFunction` 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 `[2,3]` 中 join,但没有与窗口 `[1,2]` 中任何元素 join。 -{{< tabs "a3d3218b-dd25-4428-bfbb-d02522d95661" >}} -{{< tab "Java" >}} - ```java import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; @@ -132,26 +106,6 @@ orangeStream.join(greenStream) } }); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows -import java.time.Duration - -... - -val orangeStream: DataStream[Integer] = ... -val greenStream: DataStream[Integer] = ... - -orangeStream.join(greenStream) - .where(elem => /* select key */) - .equalTo(elem => /* select key */) - .window(SlidingEventTimeWindows.of(Duration.ofMillis(2) /* size */, Duration.ofMillis(1) /* slide */)) - .apply { (e1, e2) => e1 + "," + e2 } -``` -{{< /tab >}} -{{< /tabs >}} ### 会话 Window Join @@ -161,9 +115,6 @@ orangeStream.join(greenStream) 这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 `JoinFunction`。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join! -{{< tabs "0e75f447-e1f7-4f38-b68c-de42ddd33512" >}} -{{< tab "Java" >}} - ```java import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; @@ -185,27 +136,6 @@ orangeStream.join(greenStream) } }); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows -import java.time.Duration - -... - -val orangeStream: DataStream[Integer] = ... -val greenStream: DataStream[Integer] = ... - -orangeStream.join(greenStream) - .where(elem => /* select key */) - .equalTo(elem => /* select key */) - .window(EventTimeSessionWindows.withGap(Duration.ofMillis(1))) - .apply { (e1, e2) => e1 + "," + e2 } -``` - -{{< /tab >}} -{{< /tabs >}} ## Interval Join @@ -234,9 +164,6 @@ Interval join 目前仅支持 event time。 `orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound` -{{< tabs "63cebeb2-5869-4d2e-998d-d77fb466e2e6" >}} -{{< tab "Java" >}} - ```java import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; @@ -260,31 +187,4 @@ orangeStream }); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction -import java.time.Duration - -... - -val orangeStream: DataStream[Integer] = ... -val greenStream: DataStream[Integer] = ... - -orangeStream - .keyBy(elem => /* select key */) - .intervalJoin(greenStream.keyBy(elem => /* select key */)) - .between(Duration.ofMillis(-2), Duration.ofMillis(1)) - .process(new ProcessJoinFunction[Integer, Integer, String] { - override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = { - out.collect(left + "," + right) - } - }) - -``` - -{{< /tab >}} -{{< /tabs >}} - {{< top >}} diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md b/docs/content.zh/docs/dev/datastream/operators/overview.md index d0a7f527b241c..c841a721dd5a1 100644 --- a/docs/content.zh/docs/dev/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/datastream/operators/overview.md @@ -49,11 +49,6 @@ dataStream.map(new MapFunction() { }); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream.map { x => x * 2 } -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) @@ -81,11 +76,6 @@ dataStream.flatMap(new FlatMapFunction() { }); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream.flatMap { str => str.split(" ") } -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute']) @@ -110,11 +100,6 @@ dataStream.filter(new FilterFunction() { }); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream.filter { _ != 0 } -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=[0, 1, 2, 3, 4, 5]) @@ -135,12 +120,6 @@ dataStream.keyBy(value -> value.getSomeKey()); dataStream.keyBy(value -> value.f0); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream.keyBy(_.someKey) -dataStream.keyBy(_._1) -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'b')]) @@ -175,11 +154,6 @@ keyedStream.reduce(new ReduceFunction() { }); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -keyedStream.reduce { _ + _ } -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()])) @@ -201,13 +175,6 @@ dataStream .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) @@ -231,12 +198,6 @@ dataStream .windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5))); ``` {{< /tab >}} -{{< tab "Scala">}} -```scala -dataStream - .windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.window_all(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) @@ -284,14 +245,6 @@ allWindowedStream.apply (new AllWindowFunction, Integer, }); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -windowedStream.apply { WindowFunction } - -// 在 non-keyed 窗口流上应用 AllWindowFunction -allWindowedStream.apply { AllWindowFunction } -``` -{{< /tab >}} {{< tab "Python" >}} ```python class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]): @@ -335,11 +288,6 @@ windowedStream.reduce (new ReduceFunction>() { }); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -windowedStream.reduce { _ + _ } -``` -{{< /tab >}} {{< tab "Python" >}} ```python class MyReduceFunction(ReduceFunction): @@ -364,11 +312,6 @@ windowed_stream.reduce(MyReduceFunction()) dataStream.union(otherStream1, otherStream2, ...); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.union(otherStream1, otherStream2, ...) -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.union(otherStream1, otherStream2, ...) @@ -390,14 +333,6 @@ dataStream.join(otherStream) .apply (new JoinFunction () {...}); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.join(otherStream) - .where().equalTo() - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))) - .apply { ... } -``` -{{< /tab >}} {{< tab "Python" >}} Python 中尚不支持此特性。 {{< /tab >}} @@ -420,18 +355,6 @@ keyedStream.intervalJoin(otherKeyedStream) .process(new IntervalJoinFunction() {...}); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -// this will join the two streams so that -// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 -keyedStream.intervalJoin(otherKeyedStream) - .between(Duration.ofMillis(-2), Duration.ofMillis(2)) - // lower and upper bound - .upperBoundExclusive(true) // optional - .lowerBoundExclusive(true) // optional - .process(new IntervalJoinFunction() {...}) -``` -{{< /tab >}} {{< tab "Python" >}} Python 中尚不支持此特性。 {{< /tab >}} @@ -451,14 +374,6 @@ dataStream.coGroup(otherStream) .apply (new CoGroupFunction () {...}); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.coGroup(otherStream) - .where(0).equalTo(1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))) - .apply {} -``` -{{< /tab >}} {{< tab "Python" >}} Python 中尚不支持此特性。 {{< /tab >}} @@ -478,14 +393,6 @@ DataStream otherStream = //... ConnectedStreams connectedStreams = someStream.connect(otherStream); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -someStream : DataStream[Int] = ... -otherStream : DataStream[String] = ... - -val connectedStreams = someStream.connect(otherStream) -``` -{{< /tab >}} {{< tab "Python" >}} ```python stream_1 = ... @@ -530,18 +437,6 @@ connectedStreams.flatMap(new CoFlatMapFunction() { }); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -connectedStreams.map( - (_ : Int) => true, - (_ : String) => false -) -connectedStreams.flatMap( - (_ : Int) => true, - (_ : String) => false -) -``` -{{< /tab >}} {{< tab "Python" >}} ```python class MyCoMapFunction(CoMapFunction): @@ -586,18 +481,6 @@ cachedDataStream.print(); // Consume cached result. env.execute(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val dataStream : DataStream[Int] = //... -val cachedDataStream = dataStream.cache() -cachedDataStream.print() // Do anything with the cachedDataStream -... -env.execute() // Execute and create cache. - -cachedDataStream.print() // Consume cached result. -env.execute() -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = ... # DataStream @@ -654,12 +537,6 @@ dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.partitionCustom(partitioner, "someKey") -dataStream.partitionCustom(partitioner, 0) -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream = env.from_collection(collection=[(2, 'a'), (2, 'a'), (3, 'b')]) @@ -679,11 +556,6 @@ data_stream.partition_custom(lambda key, num_partition: key % partition, lambda dataStream.shuffle(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.shuffle() -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.shuffle() @@ -711,11 +583,6 @@ data_stream.shuffle() dataStream.rescale(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.rescale() -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.rescale() @@ -734,11 +601,6 @@ data_stream.rescale() dataStream.broadcast(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -dataStream.broadcast() -``` -{{< /tab >}} {{< tab "Python" >}} ```python data_stream.broadcast() @@ -768,11 +630,6 @@ data_stream.broadcast() someStream.filter(...).map(...).startNewChain().map(...); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -someStream.filter(...).map(...).startNewChain().map(...) -``` -{{< /tab >}} {{< tab "Python" >}} ```python some_stream.filter(...).map(...).start_new_chain().map(...) @@ -790,11 +647,6 @@ some_stream.filter(...).map(...).start_new_chain().map(...) someStream.map(...).disableChaining(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -someStream.map(...).disableChaining() -``` -{{< /tab >}} {{< tab "Python" >}} ```python some_stream.map(...).disable_chaining() @@ -812,11 +664,6 @@ some_stream.map(...).disable_chaining() someStream.filter(...).slotSharingGroup("name"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -someStream.filter(...).slotSharingGroup("name") -``` -{{< /tab >}} {{< tab "Python" >}} ```python some_stream.filter(...).slot_sharing_group("name") @@ -840,11 +687,6 @@ Flink里的算子和作业节点会有一个名字和一个描述。名字和描 someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1") -``` -{{< /tab >}} {{< tab "Python" >}} ```python some_stream.filter(...).name("filter").set_description("x in (1, 2, 3, 4) and y > 1") diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index dfa49a624cde0..57a3207825e49 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -174,71 +174,6 @@ public class CountWithTimeoutFunction } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -import org.apache.flink.api.common.state.ValueState -import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.functions.KeyedProcessFunction -import org.apache.flink.util.Collector - -// 源数据流 -val stream: DataStream[Tuple2[String, String]] = ... - -// 使用 process function 来处理一个 Keyed Stream -val result: DataStream[Tuple2[String, Long]] = stream - .keyBy(_._1) - .process(new CountWithTimeoutFunction()) - -/** - * 存储在状态中的数据类型 - */ -case class CountWithTimestamp(key: String, count: Long, lastModified: Long) - -/** - * 该 ProcessFunction 的实现用于维护计数和超时 - */ -class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] { - - /** 由 process function 管理的状态 */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext - .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) - - - override def processElement( - value: (String, String), - ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, - out: Collector[(String, Long)]): Unit = { - - // 初始化或更新状态 - val current: CountWithTimestamp = state.value match { - case null => - CountWithTimestamp(value._1, 1, ctx.timestamp) - case CountWithTimestamp(key, count, lastModified) => - CountWithTimestamp(key, count + 1, ctx.timestamp) - } - - // 将更新后的状态写回 - state.update(current) - - // 注册一个 60s 之后的事件时间回调 - ctx.timerService.registerEventTimeTimer(current.lastModified + 60000) - } - - override def onTimer( - timestamp: Long, - ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, - out: Collector[(String, Long)]): Unit = { - - state.value match { - case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) => - out.collect((key, count)) - case _ => - } - } -} -``` -{{< /tab >}} {{< tab "Python" >}} ```python import datetime @@ -349,14 +284,6 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) thro // ... } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = { - var key = ctx.getCurrentKey - // ... -} ``` {{< /tab >}} {{< tab "Python" >}} @@ -403,12 +330,6 @@ long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 -ctx.timerService.registerProcessingTimeTimer(coalescedTime) -``` -{{< /tab >}} {{< tab "Python" >}} ```python coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000 @@ -426,12 +347,6 @@ long coalescedTime = ctx.timerService().currentWatermark() + 1; ctx.timerService().registerEventTimeTimer(coalescedTime); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val coalescedTime = ctx.timerService.currentWatermark + 1 -ctx.timerService.registerEventTimeTimer(coalescedTime) -``` -{{< /tab >}} {{< tab "Python" >}} ```python coalesced_time = ctx.timer_service().current_watermark() + 1 @@ -451,12 +366,6 @@ long timestampOfTimerToStop = ...; ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val timestampOfTimerToStop = ... -ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) -``` -{{< /tab >}} {{< tab "Python" >}} ```python timestamp_of_timer_to_stop = ... @@ -474,12 +383,6 @@ long timestampOfTimerToStop = ...; ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val timestampOfTimerToStop = ... -ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) -``` -{{< /tab >}} {{< tab "Python" >}} ```python timestamp_of_timer_to_stop = ... diff --git a/docs/content.zh/docs/dev/datastream/operators/windows.md b/docs/content.zh/docs/dev/datastream/operators/windows.md index 2da573635275a..0c113811bce7d 100644 --- a/docs/content.zh/docs/dev/datastream/operators/windows.md +++ b/docs/content.zh/docs/dev/datastream/operators/windows.md @@ -37,7 +37,7 @@ under the License. {{< tabs "Keyed Windows" >}} -{{< tab "Java/Scala" >}} +{{< tab "Java" >}} stream .keyBy(...) <- 仅 keyed 窗口需要 .window(...) <- 必填项:"assigner" @@ -67,7 +67,7 @@ under the License. {{< tabs "Non-Keyed Windows" >}} -{{< tab "Java/Scala" >}} +{{< tab "Java" >}} stream .windowAll(...) <- 必填项:"assigner" [.trigger(...)] <- 可选项:"trigger" (else default trigger) @@ -192,29 +192,6 @@ input .(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[T] = ... - -// 滚动 event-time 窗口 -input - .keyBy() - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) - .() - -// 滚动 processing-time 窗口 -input - .keyBy() - .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))) - .() - -// 长度为一天的滚动 event-time 窗口,偏移量为 -8 小时。 -input - .keyBy() - .window(TumblingEventTimeWindows.of(Duration.ofDays(1), Duration.ofHours(-8))) - .() -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -286,29 +263,6 @@ input .(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[T] = ... - -// 滑动 event-time 窗口 -input - .keyBy() - .window(SlidingEventTimeWindows.of(Duration.ofSeconds(10), Duration.ofSeconds(5))) - .() - -// 滑动 processing-time 窗口 -input - .keyBy() - .window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10), Duration.ofSeconds(5))) - .() - -// 滑动 processing-time 窗口,偏移量为 -8 小时 -input - .keyBy() - .window(SlidingProcessingTimeWindows.of(Duration.ofHours(12), Duration.ofHours(1), Duration.ofHours(-8))) - .() -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -390,44 +344,6 @@ input .(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[T] = ... - -// 设置了固定间隔的 event-time 会话窗口 -input - .keyBy() - .window(EventTimeSessionWindows.withGap(Duration.ofMinutes(10))) - .() - -// 设置了动态间隔的 event-time 会话窗口 -input - .keyBy() - .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { - override def extract(element: String): Long = { - // 决定并返回会话间隔 - } - })) - .() - -// 设置了固定间隔的 processing-time 会话窗口 -input - .keyBy() - .window(ProcessingTimeSessionWindows.withGap(Duration.ofMinutes(10))) - .() - - -// 设置了动态间隔的 processing-time 会话窗口 -input - .keyBy() - .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { - override def extract(element: String): Long = { - // 决定并返回会话间隔 - } - })) - .() -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -496,16 +412,6 @@ input .(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[T] = ... - -input - .keyBy() - .window(GlobalWindows.create()) - .() -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -555,16 +461,6 @@ input }); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[(String, Long)] = ... - -input - .keyBy() - .window() - .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) } -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -630,33 +526,6 @@ input .aggregate(new AverageAggregate()); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala - -/** - * The accumulator is used to keep a running sum and a count. The [getResult] method - * computes the average. - */ -class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { - override def createAccumulator() = (0L, 0L) - - override def add(value: (String, Long), accumulator: (Long, Long)) = - (accumulator._1 + value._2, accumulator._2 + 1L) - - override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 - - override def merge(a: (Long, Long), b: (Long, Long)) = - (a._1 + b._1, a._2 + b._2) -} - -val input: DataStream[(String, Long)] = ... - -input - .keyBy() - .window() - .aggregate(new AverageAggregate) -``` -{{< /tab >}} {{< tab "Python" >}} ```python class AverageAggregate(AggregateFunction): @@ -763,68 +632,6 @@ public abstract class ProcessWindowFunction impl public abstract void output(OutputTag outputTag, X value); } -} -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { - - /** - * Evaluates the window and outputs none or several elements. - * - * @param key The key for which this window is evaluated. - * @param context The context in which the window is being evaluated. - * @param elements The elements in the window being evaluated. - * @param out A collector for emitting elements. - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - def process( - key: KEY, - context: Context, - elements: Iterable[IN], - out: Collector[OUT]) - - /** - * Deletes any state in the [[Context]] when the Window expires - * (the watermark passes its `maxTimestamp` + `allowedLateness`). - * - * @param context The context to which the window is being evaluated - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - @throws[Exception] - def clear(context: Context) {} - - /** - * The context holding window metadata - */ - abstract class Context { - /** - * Returns the window that is being evaluated. - */ - def window: W - - /** - * Returns the current processing time. - */ - def currentProcessingTime: Long - - /** - * Returns the current event-time watermark. - */ - def currentWatermark: Long - - /** - * State accessor for per-key and per-window state. - */ - def windowState: KeyedStateStore - - /** - * State accessor for per-key global state. - */ - def globalState: KeyedStateStore - } - } ``` {{< /tab >}} @@ -937,29 +744,6 @@ public class MyProcessWindowFunction } } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[(String, Long)] = ... - -input - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5))) - .process(new MyProcessWindowFunction()) - -/* ... */ - -class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { - - def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = { - var count = 0L - for (in <- input) { - count = count + 1 - } - out.collect(s"Window ${context.window} count: $count") - } -} ``` {{< /tab >}} {{< tab "Python" >}} @@ -1036,28 +820,6 @@ private static class MyProcessWindowFunction } } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -val input: DataStream[SensorReading] = ... - -input - .keyBy() - .window() - .reduce( - (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, - ( key: String, - context: ProcessWindowFunction[_, _, _, TimeWindow]#Context, - minReadings: Iterable[SensorReading], - out: Collector[(Long, SensorReading)] ) => - { - val min = minReadings.iterator.next() - out.collect((context.window.getStart, min)) - } - ) - ``` {{< /tab >}} {{< tab "Python" >}} @@ -1138,44 +900,6 @@ private static class MyProcessWindowFunction } } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -val input: DataStream[(String, Long)] = ... - -input - .keyBy() - .window() - .aggregate(new AverageAggregate(), new MyProcessWindowFunction()) - -// Function definitions - -/** - * The accumulator is used to keep a running sum and a count. The [getResult] method - * computes the average. - */ -class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { - override def createAccumulator() = (0L, 0L) - - override def add(value: (String, Long), accumulator: (Long, Long)) = - (accumulator._1 + value._2, accumulator._2 + 1L) - - override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 - - override def merge(a: (Long, Long), b: (Long, Long)) = - (a._1 + b._1, a._2 + b._2) -} - -class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] { - - def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = { - val average = averages.iterator.next() - out.collect((key, average)) - } -} - ``` {{< /tab >}} {{< tab "Python" >}} @@ -1273,23 +997,6 @@ public interface WindowFunction extends Function } ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { - - /** - * Evaluates the window and outputs none or several elements. - * - * @param key The key for which this window is evaluated. - * @param window The window that is being evaluated. - * @param input The elements in the window being evaluated. - * @param out A collector for emitting elements. - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) -} -``` -{{< /tab >}} {{< tab "Python" >}} ```python class WindowFunction(Function, Generic[IN, OUT, KEY, W]): @@ -1321,16 +1028,6 @@ input .apply(new MyWindowFunction()); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[(String, Long)] = ... - -input - .keyBy() - .window() - .apply(new MyWindowFunction()) -``` -{{< /tab >}} {{< tab "Python" >}} ```python input = ... # type: DataStream @@ -1494,17 +1191,6 @@ input .(); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val input: DataStream[T] = ... - -input - .keyBy() - .window() - .allowedLateness(