Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36604][datastream] StreamingJobGraphGenerator::setOperatorConfig checks input serializer length #25576

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

liuml07
Copy link
Member

@liuml07 liuml07 commented Oct 26, 2024

https://issues.apache.org/jira/browse/FLINK-36604

What is the purpose of the change

We can add a precondition check and report meaningful error message for debugging.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 26, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@liuml07
Copy link
Member Author

liuml07 commented Feb 8, 2025

@sujay-jain Could you help review this patch? Appreciate your help.

Copy link

@sujay-jain sujay-jain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very useful change @liuml07. I have a small suggestion around improving the error message

@@ -1191,6 +1191,12 @@ public static void setOperatorConfig(
? 0 // single input operator
: inEdge.getTypeNumber() - 1; // in case of 2 or more inputs

Preconditions.checkState(
inputIndex < inputSerializers.length,
"Input type serializer of vertex '%s' was null or undefined for inputIndex %s",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making the error message more descriptive by adding some more context. For example, specifying the expected number of input serializers can help with debugging. A possible improvement could be:
"Invalid inputIndex %s for vertex '%s': Expected inputIndex to be less than %s (number of input serializers)."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, Sujay! That is very helpful feedback, and I have updated the patch to address the comments.

Copy link

@sujay-jain sujay-jain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thank you for the contribution!

@liuml07
Copy link
Member Author

liuml07 commented Feb 10, 2025

Could you help review this, @reswqa ? Thank you

@reswqa
Copy link
Member

reswqa commented Feb 12, 2025

Could you share the example job that make this check unhappy?

The inputIndex is a low-level concept for end users. I kind of think this error message is still confusing to them.

@liuml07
Copy link
Member Author

liuml07 commented Feb 13, 2025

@reswqa Yes agreed vertex is indeed low-level concept. I think some information about the error is still useful than built-in exception ArrayIndexOutOfBoundsException if the user needs to debug. Any suggestions for the error message? I'm thinking to make it clearer by including the inEdge as follows:

Preconditions.checkState(
        inputIndex < inputSerializers.length,
        "Could not find valid input serializers when creating job graph for edge: %s",
        inEdge);

Sample:

Could not find valid input serializers when creating job graph for edge: (Map-2 -> Sink: Print to Std. Out-3, typeNumber=0, outputPartitioner=FORWARD, exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)

Regarding the scenario of the failing example, the user job I met a few months ago was using a custom TypeInformation that was not implemented correctly. To be more specific, the TypeSerializer was null for a corner case. I can't find to share the real user code here which got fixed now, but following dummy example would show the error:

public class MissingSerDeserInTypeInformation {
    public static class MyObject {
        private final Integer value;
        MyObject(Integer value) {
            this.value = value;
        }
        @Override
        public String toString() {
            return "MyObject(" + value + ")";
        }
    }

    public static class MyObjectTypeInfo extends TypeInformation<MyObject> {
        public boolean isBasicType() {return false;}
        public boolean isTupleType() {return false;}
        public int getArity() {return 0;}
        public int getTotalFields() {return 0;}
        public Class<MyObject> getTypeClass() {return MyObject.class;}
        public boolean isKeyType() {return false;}
        public TypeSerializer<MyObject> createSerializer(ExecutionConfig executionConfig) {
                return null; // should always return a valid serializer
        }
        public String toString() {return "MyObjectTypeInfo";}
        public boolean equals(Object o) {return false;}
        public int hashCode() {return 0;}
        public boolean canEqual(Object o) {return false;}
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableGenericTypes();
        env.fromCollection(IntStream.range(1, 20).boxed().collect(Collectors.toList()))
                .map(MyObject::new)
                .returns(new MyObjectTypeInfo())
                .print();
        env.execute();
    }
}

@reswqa
Copy link
Member

reswqa commented Feb 14, 2025

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants