diff --git a/src/promptflow-core/promptflow/core/_serving/app.py b/src/promptflow-core/promptflow/core/_serving/app.py index 3e020388fd0..aa905a99b77 100644 --- a/src/promptflow-core/promptflow/core/_serving/app.py +++ b/src/promptflow-core/promptflow/core/_serving/app.py @@ -36,6 +36,7 @@ def create_app(**kwargs): from promptflow.core._serving.v2.app import PromptFlowServingAppV2 logger = LoggerFactory.get_logger("pfserving-app-v2", target_stdout=True) + # TODO: support specify flow file path in fastapi app app = PromptFlowServingAppV2(docs_url=None, redoc_url=None, logger=logger, **kwargs) # type: ignore # enable auto-instrumentation if customer installed opentelemetry-instrumentation-fastapi try: diff --git a/src/promptflow-core/promptflow/core/_serving/app_base.py b/src/promptflow-core/promptflow/core/_serving/app_base.py index 17848ae1caf..7f2a4aa3fcd 100644 --- a/src/promptflow-core/promptflow/core/_serving/app_base.py +++ b/src/promptflow-core/promptflow/core/_serving/app_base.py @@ -5,6 +5,7 @@ import mimetypes import os from abc import ABC, abstractmethod +from pathlib import Path from typing import Dict from promptflow._utils.flow_utils import resolve_flow_path @@ -38,8 +39,16 @@ def init_app(self, **kwargs): # parse promptflow project path self.project_path = self.extension.get_flow_project_path() logger.info(f"Project path: {self.project_path}") - flow_dir, flow_file_name = resolve_flow_path(self.project_path, allow_prompty_dir=True) - self.flow = init_executable(flow_path=flow_dir / flow_file_name) + + flow_file_path = kwargs.get("flow_file_path", None) + if flow_file_path: + self.flow_file_path = Path(flow_file_path) + else: + flow_dir, flow_file_name = resolve_flow_path(self.project_path, allow_prompty_dir=True) + # project path is also the current working directory + self.flow_file_path = flow_dir / flow_file_name + + self.flow = init_executable(flow_path=self.flow_file_path, working_dir=Path(self.project_path)) # enable environment_variables environment_variables = kwargs.get("environment_variables", {}) @@ -97,9 +106,8 @@ def init_invoker_if_not_exist(self): if self.flow_invoker: return self.logger.info("Promptflow executor starts initializing...") - flow_dir, flow_file_name = resolve_flow_path(self.project_path, allow_prompty_dir=True) self.flow_invoker = AsyncFlowInvoker( - flow=Flow.load(source=flow_dir / flow_file_name), + flow=Flow.load(source=self.flow_file_path), connection_provider=self.connection_provider, streaming=self.streaming_response_required, raise_ex=False, diff --git a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py index 1046b681e40..c80072407e9 100644 --- a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py +++ b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py @@ -41,7 +41,6 @@ from promptflow._sdk._constants import DEFAULT_SERVE_ENGINE, PROMPT_FLOW_DIR_NAME from promptflow._sdk._pf_client import PFClient from promptflow._sdk._utilities.chat_utils import start_chat_ui_service_monitor -from promptflow._sdk._utilities.general_utils import generate_yaml_entry_without_delete from promptflow._sdk._utilities.serve_utils import find_available_port, start_flow_service from promptflow._utils.flow_utils import is_flex_flow from promptflow._utils.logger_utils import get_cli_sdk_logger @@ -525,11 +524,9 @@ def _test_flow_multi_modal(args, pf_client, environment_variables): pfs_port = _invoke_pf_svc() serve_app_port = args.port or find_available_port() - flow = generate_yaml_entry_without_delete(entry=args.flow) - # flex flow without yaml file doesn't support /eval in chat window - enable_internal_features = Configuration.get_instance().is_internal_features_enabled() and flow == args.flow + enable_internal_features = Configuration.get_instance().is_internal_features_enabled() start_chat_ui_service_monitor( - flow=flow, + flow=args.flow, serve_app_port=serve_app_port, pfs_port=pfs_port, url_params=list_of_dict_to_dict(args.url_params), diff --git a/src/promptflow-devkit/promptflow/_sdk/_utilities/chat_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utilities/chat_utils.py index cdb6a222793..9a3df5ee105 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_utilities/chat_utils.py +++ b/src/promptflow-devkit/promptflow/_sdk/_utilities/chat_utils.py @@ -1,5 +1,5 @@ import json -import webbrowser +import shutil from pathlib import Path from typing import Any, Dict from urllib.parse import urlencode, urlunparse @@ -7,9 +7,14 @@ from promptflow._constants import FlowLanguage from promptflow._sdk._constants import DEFAULT_ENCODING, PROMPT_FLOW_DIR_NAME, UX_INPUTS_INIT_KEY, UX_INPUTS_JSON from promptflow._sdk._service.utils.utils import encrypt_flow_path -from promptflow._sdk._utilities.general_utils import resolve_flow_language +from promptflow._sdk._utilities.general_utils import ( + _get_additional_includes, + generate_yaml_entry_without_delete, + resolve_flow_language, +) from promptflow._sdk._utilities.monitor_utils import ( DirectoryModificationMonitorTarget, + FileModificationMonitorTarget, JsonContentMonitorTarget, Monitor, ) @@ -37,49 +42,57 @@ def construct_chat_page_url(flow_path, port, url_params): def _try_restart_service( *, last_result: ServeAppHelper, - flow_file_name: str, + flow_file_path: Path, flow_dir: Path, serve_app_port: int, ux_input_path: Path, environment_variables: Dict[str, str], + chat_page_url: str, + skip_open_browser: bool, ): if last_result is not None: + last_helper, skip_open_browser = last_result print_log("Changes detected, stopping current serve app...") - last_result.terminate() + last_helper.terminate() # init must be always loaded from ux_inputs.json if not ux_input_path.is_file(): init = {} else: ux_inputs = json.loads(ux_input_path.read_text(encoding=DEFAULT_ENCODING)) - init = ux_inputs.get(UX_INPUTS_INIT_KEY, {}).get(flow_file_name, {}) + init = ux_inputs.get(UX_INPUTS_INIT_KEY, {}).get(flow_file_path.name, {}) - language = resolve_flow_language(flow_path=flow_file_name, working_dir=flow_dir) + language = resolve_flow_language(flow_path=flow_file_path, working_dir=flow_dir) if language == FlowLanguage.Python: # additional includes will always be called by the helper. # This is expected as user will change files in original locations only helper = PythonServeAppHelper( - flow_file_name=flow_file_name, + flow_file_path=flow_file_path, flow_dir=flow_dir, init=init, port=serve_app_port, environment_variables=environment_variables, + chat_page_url=chat_page_url, ) else: helper = CSharpServeAppHelper( - flow_file_name=flow_file_name, + flow_file_path=flow_file_path, flow_dir=flow_dir, init=init, port=serve_app_port, environment_variables=environment_variables, + chat_page_url=chat_page_url, ) print_log("Starting serve app...") try: - helper.start() + helper.start(skip_open_browser=skip_open_browser) + # only open on first successful start + skip_open_browser = True except Exception: print_log("Failed to start serve app, please check the error message above.") - return helper + finally: + return helper, skip_open_browser def update_init_in_ux_inputs(*, ux_input_path: Path, flow_file_name: str, init: Dict[str, Any]): @@ -119,7 +132,7 @@ def touch_local_pfs(): def start_chat_ui_service_monitor( - flow, + flow: str, *, serve_app_port: str, pfs_port: str, @@ -129,42 +142,82 @@ def start_chat_ui_service_monitor( skip_open_browser: bool = False, environment_variables: Dict[str, str] = None, ): - flow_dir, flow_file_name = resolve_flow_path(flow, allow_prompty_dir=True) + # if flow is an entry, generate yaml entry without delete; if flow is a path, use it directly + flow_file_path = generate_yaml_entry_without_delete(entry=flow) + use_entry_as_flow = flow_file_path != flow + if use_entry_as_flow: + flow_dir = Path(".") + flow_file_name = flow_file_path.name + # when using entry as flow, chat UI will now access ux_inputs.json and images in temp directory + ux_input_path = flow_file_path.parent / PROMPT_FLOW_DIR_NAME / UX_INPUTS_JSON + + # so we need to copy the original ux_inputs.json to temp directory + origin_prompt_flow_dir = flow_dir / PROMPT_FLOW_DIR_NAME + if origin_prompt_flow_dir.is_dir(): + # TODO: skip files that no need to copy, like logs for flow run + shutil.copytree(origin_prompt_flow_dir, flow_file_path.parent / PROMPT_FLOW_DIR_NAME) + else: + flow_dir, flow_file_name = resolve_flow_path(flow, allow_prompty_dir=True) + flow_file_path = flow_dir / flow_file_name + ux_input_path = flow_dir / PROMPT_FLOW_DIR_NAME / UX_INPUTS_JSON + origin_prompt_flow_dir = None - ux_input_path = flow_dir / PROMPT_FLOW_DIR_NAME / UX_INPUTS_JSON update_init_in_ux_inputs(ux_input_path=ux_input_path, flow_file_name=flow_file_name, init=init) # show url for chat UI url_params["serve_app_port"] = serve_app_port if "enable_internal_features" not in url_params: - url_params["enable_internal_features"] = "true" if enable_internal_features else "false" + # /eval is not supported in chat window when using entry as a flow for now + url_params["enable_internal_features"] = ( + "true" if enable_internal_features and not use_entry_as_flow else "false" + ) chat_page_url = construct_chat_page_url( - str(flow_dir / flow_file_name), + # Chat UI now doesn't support as_posix in windows + str(flow_file_path), pfs_port, url_params=url_params, ) - print_log(f"You can begin chat flow on {chat_page_url}") - if not skip_open_browser: - webbrowser.open(chat_page_url) + + targets = [ + DirectoryModificationMonitorTarget( + target=flow_dir, + relative_root_ignores=[PROMPT_FLOW_DIR_NAME, "__pycache__"], + ), + JsonContentMonitorTarget( + target=ux_input_path, + node_path=[UX_INPUTS_INIT_KEY, flow_file_name], + ), + ] + + for additional_includes in _get_additional_includes(flow_file_path): + target = Path(additional_includes) + if target.is_file(): + targets.append( + FileModificationMonitorTarget( + target=target, + ) + ) + elif target.is_dir(): + targets.append( + DirectoryModificationMonitorTarget( + target=target, + relative_root_ignores=["__pycache__"], + ) + ) + + print_log(f"Chat page URL will be available after service is started: {chat_page_url}") monitor = Monitor( - targets=[ - DirectoryModificationMonitorTarget( - target=flow_dir, - relative_root_ignores=[PROMPT_FLOW_DIR_NAME, "__pycache__"], - ), - JsonContentMonitorTarget( - target=ux_input_path, - node_path=[UX_INPUTS_INIT_KEY, flow_file_name], - ), - ], + targets=targets, target_callback=_try_restart_service, target_callback_kwargs={ - "flow_file_name": flow_file_name, + "flow_file_path": flow_file_path, "flow_dir": flow_dir, "serve_app_port": int(serve_app_port), "ux_input_path": ux_input_path, "environment_variables": environment_variables, + "chat_page_url": chat_page_url, + "skip_open_browser": skip_open_browser, }, inject_last_callback_result=True, extra_logic_in_loop=touch_local_pfs, @@ -174,7 +227,10 @@ def start_chat_ui_service_monitor( monitor.start_monitor() except KeyboardInterrupt: print_log("Stopping monitor and attached serve app...") - serve_app_helper = monitor.last_callback_result - if serve_app_helper is not None: + if monitor.last_callback_result is not None: + serve_app_helper, _ = monitor.last_callback_result serve_app_helper.terminate() print_log("Stopped monitor and attached serve app.") + finally: + if use_entry_as_flow: + shutil.copytree(flow_file_path.parent / PROMPT_FLOW_DIR_NAME, origin_prompt_flow_dir, dirs_exist_ok=True) diff --git a/src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py index 964c0ce4e0f..d5699a171a4 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py +++ b/src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py @@ -981,7 +981,7 @@ def resolve_entry_and_code(entry: Union[str, PathLike, Callable], code: Path = N return entry, code -def create_flex_flow_yaml_in_target(entry: Union[str, PathLike, Callable], target_dir: str, code: Path = None): +def create_flex_flow_yaml_in_target(entry: Union[str, PathLike, Callable], target_dir: str, code: Path = None) -> Path: """ Generate a flex flow yaml in target folder. The code field in the yaml points to the original flex yaml flow folder. """ diff --git a/src/promptflow-devkit/promptflow/_sdk/_utilities/serve_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utilities/serve_utils.py index a751bc2c455..2aea36055a0 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_utilities/serve_utils.py +++ b/src/promptflow-devkit/promptflow/_sdk/_utilities/serve_utils.py @@ -33,13 +33,22 @@ class ServeAppHelper(abc.ABC): """ def __init__( - self, *, flow_file_name: str, flow_dir: Path, init: Dict[str, Any], port: int, host: str = "localhost", **kwargs + self, + *, + flow_file_path: Path, + flow_dir: Path, + init: Dict[str, Any], + port: int, + host: str = "localhost", + chat_page_url: str = None, + **kwargs, ): - self._flow_file_name = flow_file_name + self._flow_file_path = flow_file_path self._flow_dir = flow_dir self._init = init or {} self._port = port self._host = host + self._chat_page_url = chat_page_url or f"http://{host}:{port}" @abc.abstractmethod def start_in_main(self, skip_open_browser: bool = False): @@ -47,7 +56,7 @@ def start_in_main(self, skip_open_browser: bool = False): pass @abc.abstractmethod - def start(self): + def start(self, skip_open_browser: bool = True): """Start the serve app in a subprocess.""" pass @@ -59,14 +68,30 @@ def terminate(self): class PythonServeAppHelper(ServeAppHelper): def __init__( - self, *, flow_file_name: str, flow_dir: Path, init: Dict[str, Any], port: int, host: str = "localhost", **kwargs + self, + *, + flow_file_path: Path, + flow_dir: Path, + init: Dict[str, Any], + port: int, + host: str = "localhost", + chat_page_url: str = None, + **kwargs, ): self._static_folder: Optional[str] = kwargs.get("static_folder", None) self._config = kwargs.get("config", {}) or {} self._environment_variables = kwargs.get("environment_variables", {}) or {} self._engine = kwargs.get("engine", DEFAULT_SERVE_ENGINE) - super().__init__(flow_file_name=flow_file_name, flow_dir=flow_dir, init=init, port=port, host=host, **kwargs) + super().__init__( + flow_file_path=flow_file_path, + flow_dir=flow_dir, + init=init, + port=port, + host=host, + chat_page_url=chat_page_url, + **kwargs, + ) self._process: Optional[multiprocessing.Process] = None @@ -75,28 +100,8 @@ def _run(self, skip_open_browser: bool = False, enable_trace: bool = False): # trace must be started within the same process as the app start_trace() - error_msg = None - try: - flow_dir, flow_file_name = resolve_flow_path(self._flow_dir, allow_prompty_dir=True) - if flow_file_name != self._flow_file_name: - # this may happen when a prompty is specified while there is already a default flow in the flow dir - error_msg = ( - f"Default definition {flow_file_name} is found and will be picked in flow directory " - f"while {self._flow_file_name} is specified. " - f"Please remove {flow_file_name} from flow directory as a workaround." - ) - except (UserErrorException,) as e: - error_msg = e.message - - if error_msg is not None: - raise UserErrorException( - message_format="Service have some limitations on flow directory for now:\n{msg}", - msg=error_msg, - privacy_info=[self._flow_dir.absolute().as_posix()], - ) - serve_python_flow( - flow_file_name=self._flow_file_name, + flow_file_path=self._flow_file_path, flow_dir=self._flow_dir, port=self._port, host=self._host, @@ -106,16 +111,17 @@ def _run(self, skip_open_browser: bool = False, enable_trace: bool = False): init=self._init, skip_open_browser=skip_open_browser, engine=self._engine, + chat_page_url=self._chat_page_url, ) def start_in_main(self, skip_open_browser: bool = False): self._run(skip_open_browser=skip_open_browser) - def start(self): + def start(self, skip_open_browser: bool = True): self._process = multiprocessing.Process( target=self._run, # no need to open browser if the serve app is started in a subprocess - kwargs={"skip_open_browser": True, "enable_trace": True}, + kwargs={"skip_open_browser": skip_open_browser, "enable_trace": True}, ) self._process.start() @@ -127,9 +133,27 @@ def terminate(self): class CSharpServeAppHelper(ServeAppHelper): def __init__( - self, *, flow_file_name: str, flow_dir: Path, init: Dict[str, Any], port: int, host: str = "localhost", **kwargs + self, + *, + flow_file_path: Path, + flow_dir: Path, + init: Dict[str, Any], + port: int, + host: str = "localhost", + chat_page_url=None, + **kwargs, ): - super().__init__(flow_file_name=flow_file_name, flow_dir=flow_dir, init=init, port=port, host=host, **kwargs) + self._chat_on_serve = chat_page_url is None + + super().__init__( + flow_file_path=flow_file_path, + flow_dir=flow_dir, + init=init, + port=port, + host=host, + chat_page_url=chat_page_url, + **kwargs, + ) self._process: Optional[subprocess.Popen] = None @@ -141,7 +165,7 @@ def _construct_start_up_command(self) -> Generator[str, None, None]: "--port", str(self._port), "--yaml_path", - self._flow_file_name, + self._flow_file_path, "--assembly_folder", ".", "--connection_provider_url", @@ -171,7 +195,13 @@ def start_in_main(self, skip_open_browser: bool = False): except KeyboardInterrupt: pass - def start(self): + def start(self, skip_open_browser: bool = True): + # chat_page_url will be pointed to serve app url if not provided + # however, it's not supported in CSharp service for now + # so we skip opening browser if so; but keep the logic to open browser for `pf flow test --ui` + if not skip_open_browser and not self._chat_on_serve: + logger.info(f"Opening browser {self._chat_page_url}...") + webbrowser.open(self._chat_page_url) with self._construct_start_up_command() as command: self._process = subprocess.Popen(command, cwd=self._flow_dir, stdout=sys.stdout, stderr=sys.stderr) @@ -190,13 +220,12 @@ def find_available_port() -> str: return str(port) -def _resolve_python_flow_additional_includes(flow_file_name: str, flow_dir: Path) -> Path: +def _resolve_python_flow_additional_includes(flow_file_path: Path, flow_dir: Path) -> Path: # Resolve flow additional includes from promptflow._sdk.operations import FlowOperations - flow_path = Path(flow_dir) / flow_file_name - with FlowOperations._resolve_additional_includes(flow_path) as resolved_flow_path: - if resolved_flow_path == flow_path: + with FlowOperations._resolve_additional_includes(flow_file_path) as resolved_flow_path: + if resolved_flow_path == flow_file_path: return flow_dir # Copy resolved flow to temp folder if additional includes exists # Note: DO NOT use resolved flow path directly, as when inner logic raise exception, @@ -225,6 +254,7 @@ def start_flow_service( ) flow_dir, flow_file_name = resolve_flow_path(source, allow_prompty_dir=True) + flow_file_path = flow_dir / flow_file_name # prompty dir works for resolve_flow_path, but not for resolve_flow_language, # so infer language after resolve_flow_path language = resolve_flow_language(flow_path=flow_dir / flow_file_name) @@ -242,7 +272,7 @@ def start_flow_service( ) helper = PythonServeAppHelper( - flow_file_name=flow_file_name, + flow_file_path=flow_file_path, flow_dir=flow_dir, init=init, port=port, @@ -254,7 +284,7 @@ def start_flow_service( ) else: helper = CSharpServeAppHelper( - flow_file_name=flow_file_name, + flow_file_path=flow_file_path, flow_dir=flow_dir, init=init or {}, port=port, @@ -265,7 +295,7 @@ def start_flow_service( def serve_python_flow( *, - flow_file_name, + flow_file_path, flow_dir, port, host, @@ -275,6 +305,7 @@ def serve_python_flow( init, skip_open_browser: bool, engine, + chat_page_url, ): # we should consider moving below logic to PythonServeAppHelper._run but keep it here for now as it's not related to # the helper itself @@ -282,7 +313,8 @@ def serve_python_flow( from promptflow.core._serving.app import create_app # if no additional includes, flow_dir keeps the same; if additional includes, flow_dir is a temp dir - flow_dir = _resolve_python_flow_additional_includes(flow_file_name, flow_dir) + # there won't be additional includes if flow_file_path points to a generated temp flow file + flow_dir = _resolve_python_flow_additional_includes(flow_file_path, flow_dir) pf_config = Configuration(overrides=config) logger.info(f"Promptflow config: {pf_config}") @@ -296,11 +328,11 @@ def serve_python_flow( connection_provider=connection_provider, init=init, engine=engine, + flow_file_path=flow_file_path, ) if not skip_open_browser: - target = f"http://{host}:{port}" - logger.info(f"Opening browser {target}...") - webbrowser.open(target) + logger.info(f"Opening browser {chat_page_url}...") + webbrowser.open(chat_page_url) # Debug is not supported for now as debug will rerun command, and we changed working directory. if engine == "flask": app.run(port=port, host=host) diff --git a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py index f16589c127c..f72b2ba1f71 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_flow_serve.py @@ -9,18 +9,15 @@ @pytest.mark.unittest def test_flow_serve_resolve_additional_includes(): # Assert flow path not changed if no additional includes - flow_path = (PROMPTFLOW_ROOT / "tests/test_configs/flows/web_classification").resolve().absolute().as_posix() - resolved_flow_path = _resolve_python_flow_additional_includes("flow.dag.yaml", flow_path) + flow_path = (PROMPTFLOW_ROOT / "tests/test_configs/flows/web_classification").resolve().absolute() + resolved_flow_path = _resolve_python_flow_additional_includes(flow_path / "flow.dag.yaml", flow_path) assert flow_path == resolved_flow_path # Assert additional includes are resolved correctly flow_path = ( - (PROMPTFLOW_ROOT / "tests/test_configs/flows/web_classification_with_additional_include") - .resolve() - .absolute() - .as_posix() + (PROMPTFLOW_ROOT / "tests/test_configs/flows/web_classification_with_additional_include").resolve().absolute() ) - resolved_flow_path = _resolve_python_flow_additional_includes("flow.dag.yaml", flow_path) + resolved_flow_path = _resolve_python_flow_additional_includes(flow_path / "flow.dag.yaml", flow_path) assert (Path(resolved_flow_path) / "convert_to_dict.py").exists() assert (Path(resolved_flow_path) / "fetch_text_content_from_url.py").exists()