Skip to content

gsoc containing json serialization handler and dubbo codec #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/dubbo/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import threading
from typing import Any, Callable, Optional, Union

from typing import Any, Callable, Optional, Union,Type
from abc import ABC, abstractmethod
from dubbo.types import DeserializingFunction, RpcType, RpcTypes, SerializingFunction

__all__ = [
Expand Down Expand Up @@ -244,3 +245,21 @@ class ReadWriteStream(ReadStream, WriteStream, abc.ABC):
"""

pass


class Codec(ABC):
def __init__(self, model_type: Optional[Type[Any]] = None, **kwargs):
self.model_type = model_type

@abstractmethod
def encode(self, data: Any) -> bytes:
pass

@abstractmethod
def decode(self, data: bytes) -> Any:
pass

class CodecHelper:
@staticmethod
def get_class():
return Codec
183 changes: 147 additions & 36 deletions src/dubbo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from typing import Optional
from typing import Optional, Callable, List, Type, Union, Any

from dubbo.bootstrap import Dubbo
from dubbo.classes import MethodDescriptor
Expand All @@ -31,6 +32,7 @@
SerializingFunction,
)
from dubbo.url import URL
from dubbo.codec import DubboTransportService

__all__ = ["Client"]

Expand Down Expand Up @@ -82,70 +84,179 @@ def _initialize(self):

self._initialized = True

def unary(
def _create_rpc_callable(
self,
method_name: str,
rpc_type: str,
interface: Optional[Callable] = None,
method_name: Optional[str] = None,
params_types: Optional[List[Type]] = None,
return_type: Optional[Type] = None,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
default_method_name: str = "rpc_call",
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.UNARY.value,
"""
Create RPC callable with the specified type.
"""
# Validate
if interface is None and method_name is None:
raise ValueError("Either 'interface' or 'method_name' must be provided")

# Determine the actual method name to call
actual_method_name = method_name or (interface.__name__ if interface else default_method_name)

# Build method descriptor (automatic or manual)
if interface:
method_desc = DubboTransportService.create_method_descriptor(
func=interface,
method_name=actual_method_name,
parameter_types=params_types,
return_type=return_type,
interface=interface,
)
else:
# Manual mode fallback: use dummy function for descriptor creation
def dummy(): pass

method_desc = DubboTransportService.create_method_descriptor(
func=dummy,
method_name=actual_method_name,
parameter_types=params_types or [],
return_type=return_type or Any,
)

# Determine serializers if not provided
if request_serializer and response_deserializer:
final_request_serializer = request_serializer
final_response_deserializer = response_deserializer
else:
# Use DubboTransportService to generate serialization functions
final_request_serializer, final_response_deserializer = DubboTransportService.create_serialization_functions(codec, parameter_types=params_types, return_type=return_type)
print("final",codec, final_request_serializer, final_response_deserializer)

# Create the proper MethodDescriptor for the RPC call
rpc_method_descriptor = MethodDescriptor(
method_name=actual_method_name,
arg_serialization=(final_request_serializer, None), # (serializer, deserializer) for arguments
return_serialization=(None, final_response_deserializer), # (serializer, deserializer) for return value
rpc_type=rpc_type,
)

# Create and return the RpcCallable
return self._callable(rpc_method_descriptor)

def unary(
self,
interface: Optional[Callable] = None,
method_name: Optional[str] = None,
params_types: Optional[List[Type]] = None,
return_type: Optional[Type] = None,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also hope to see the implementation on the server side

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cnzakii
If you don't mind can you explain me about this

like most of the work of server can be handle by the rpc handler

or
Am i missing something ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can review the proposal I sent in Slack, which also contains the corresponding Server interface design. You should also implement the construction method descriptors on the Server side, select the serialization method, and so on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok
Now i understand
Thx for clarification

Create unary RPC call.

Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.UNARY.value,
interface=interface,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
default_method_name="unary",
)

def client_stream(
self,
method_name: str,
interface: Optional[Callable] = None,
method_name: Optional[str] = None,
params_types: Optional[List[Type]] = None,
return_type: Optional[Type] = None,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.CLIENT_STREAM.value,
)
"""
Create client streaming RPC call.

Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.CLIENT_STREAM.value,
interface=interface,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
default_method_name="client_stream",
)

def server_stream(
self,
method_name: str,
interface: Optional[Callable] = None,
method_name: Optional[str] = None,
params_types: Optional[List[Type]] = None,
return_type: Optional[Type] = None,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.SERVER_STREAM.value,
)
"""
Create server streaming RPC call.

Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.SERVER_STREAM.value,
interface=interface,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
default_method_name="server_stream",
)

def bi_stream(
self,
method_name: str,
interface: Optional[Callable] = None,
method_name: Optional[str] = None,
params_types: Optional[List[Type]] = None,
return_type: Optional[Type] = None,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
# create method descriptor
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.BI_STREAM.value,
)
"""
Create bidirectional streaming RPC call.

Supports both automatic mode (via interface) and manual mode (via method_name + params_types + return_type + codec).
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.BI_STREAM.value,
interface=interface,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
default_method_name="bi_stream",
)

def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
"""
Generate a proxy for the given method
Generate a proxy for the given method.
:param method_descriptor: The method descriptor.
:return: The proxy.
:rtype: RpcCallable
Expand All @@ -160,4 +271,4 @@ def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor

# create proxy
return self._callable_factory.get_callable(self._invoker, url)
return self._callable_factory.get_callable(self._invoker, url)
19 changes: 19 additions & 0 deletions src/dubbo/codec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .dubbo_codec import DubboTransportService

__all__ = ['DubboTransportService']
Loading