-
Notifications
You must be signed in to change notification settings - Fork 190
Multi model deployment #208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 41 commits
4eac006
c68e999
5ce1a92
fa10e19
f9cbd74
8970f4e
517bea8
58dd2b2
bb0d551
e2bb9d5
3823534
6f9b4ad
499b9ad
5419ef6
a70b6de
eea658b
20f0878
c21c31b
f525329
3c0937f
156ac83
38e270e
4d4e0d8
01c8e59
f801b36
fd4e2ed
0a3b7e5
6523c04
06b40f5
96d0dcb
ab41d24
8673a9a
8d09b37
7a136d6
2c6ec08
0cb88a9
7c0ee12
7a956d5
f8cfe28
079807d
ea1e47e
98b6129
daab5e6
84073f9
3ee3410
b4edc2b
6346194
94b6699
710c20b
c2636b7
189e75c
adee843
a145be5
082c05e
3ce77d2
b40ecbd
72dd95c
a4e3d56
4b5bb47
30d2b03
c5d5996
3ae1781
4b8f02f
c51ce37
43479db
e1b6d23
1675bd8
8684a61
56a7fce
fb70c3d
e2cfe8a
1312738
b0f0da4
b78068e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| # DeepSpeed Team | ||
|
|
||
| import mii | ||
| import time | ||
|
|
||
| generator = mii.mii_query_handle("first_test") | ||
| result = generator.query( | ||
| {"query": ["DeepSpeed is", | ||
| "Seattle is"]}, | ||
| "bloom560m_deployment", | ||
| do_sample=True, | ||
| max_new_tokens=30, | ||
| ) | ||
| print(result) | ||
|
|
||
| time.sleep(5) | ||
| result = generator.query({'query': "DeepSpeed is the greatest"}, | ||
| "microsoft/DialogRPT-human-vs-rand_deployment") | ||
| print(result) | ||
|
|
||
| time.sleep(5) | ||
|
|
||
| result = generator.query( | ||
| { | ||
| 'text': "DeepSpeed is the greatest", | ||
| 'conversation_id': 3, | ||
| 'past_user_inputs': [], | ||
| 'generated_responses': [] | ||
| }, | ||
| "microsoft/DialoGPT-large_deployment") | ||
| print(result) | ||
|
|
||
| results = generator.query( | ||
| { | ||
| 'question': "What is the greatest?", | ||
| 'context': "DeepSpeed is the greatest" | ||
| }, | ||
| "deepset/roberta-large-squad2" + "-qa-deployment") | ||
| print(results) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| # DeepSpeed Team | ||
| import mii | ||
|
|
||
| mii.terminate("first_test") | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
|
TosinSeg marked this conversation as resolved.
Outdated
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| # DeepSpeed Team | ||
| import mii | ||
|
|
||
| gpu_index_map1 = {'master': [0]} | ||
| gpu_index_map2 = {'master': [1]} | ||
| gpu_index_map3 = {'master': [0, 1]} | ||
|
|
||
| deployments = [] | ||
| mii_configs1 = {"tensor_parallel": 2, "dtype": "fp16"} | ||
| deployments.append( | ||
| mii.Deployment(task='text-generation', | ||
| model="bigscience/bloom-560m", | ||
| deployment_name="bloom560m_deployment", | ||
| GPU_index_map=gpu_index_map3, | ||
| mii_config=mii.config.MIIConfig(**mii_configs1))) | ||
|
|
||
| # gpt2 | ||
| name = "microsoft/DialogRPT-human-vs-rand" | ||
| deployments.append( | ||
| mii.Deployment(task='text-classification', | ||
| model=name, | ||
| deployment_name=name + "_deployment", | ||
| GPU_index_map=gpu_index_map2)) | ||
|
|
||
| mii_configs2 = {"tensor_parallel": 1} | ||
|
|
||
| name = "microsoft/DialoGPT-large" | ||
|
|
||
| deployments.append( | ||
| mii.Deployment(task='conversational', | ||
| model=name, | ||
| deployment_name=name + "_deployment", | ||
| GPU_index_map=gpu_index_map1, | ||
| mii_config=mii.config.MIIConfig(**mii_configs2))) | ||
|
|
||
| name = "deepset/roberta-large-squad2" | ||
| deployments.append( | ||
| mii.Deployment(task="question-answering", | ||
| model=name, | ||
| deployment_name=name + "-qa-deployment", | ||
| GPU_index_map=gpu_index_map2)) | ||
|
|
||
| mii.deploy(deployment_tag="first_test", deployments=deployments) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,17 +12,27 @@ | |
| from mii.method_table import GRPC_METHOD_TABLE | ||
|
|
||
|
|
||
| def _get_deployment_info(deployment_name): | ||
| configs = mii.utils.import_score_file(deployment_name).configs | ||
| task = configs[mii.constants.TASK_NAME_KEY] | ||
| mii_configs_dict = configs[mii.constants.MII_CONFIGS_KEY] | ||
| def _get_deployment_info(deployment_tag): | ||
| deployments = [] | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| configs = mii.utils.import_score_file(deployment_tag).configs | ||
| for deployment in configs: | ||
| if not isinstance(configs[deployment], dict): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason that these would not be a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the data is written to the score file it stores the dictionaries of all the deployments, along with the load balancer, model_path, and deployment_tag. The 'deployment' on line 18 in the for loop looks at all of them. So I determine if it is a model by checking if it is a dictionary |
||
| continue | ||
| deployments.append(configs[deployment]) | ||
| mii_configs_dict = configs[deployment][mii.constants.MII_CONFIGS_KEY] | ||
| mii_configs = mii.config.MIIConfig(**mii_configs_dict) | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| return deployments | ||
| """ | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| task = configs[deployment_name][mii.constants.TASK_NAME_KEY] | ||
| mii_configs_dict = configs[deployment_name][mii.constants.MII_CONFIGS_KEY] | ||
| mii_configs = mii.config.MIIConfig(**mii_configs_dict) | ||
|
|
||
| assert task is not None, "The task name should be set before calling init" | ||
| return task, mii_configs | ||
| """ | ||
|
|
||
|
|
||
| def mii_query_handle(deployment_name): | ||
| def mii_query_handle(deployment_tag): | ||
| """Get a query handle for a local deployment: | ||
|
|
||
| mii/examples/local/gpt2-query-example.py | ||
|
|
@@ -35,12 +45,14 @@ def mii_query_handle(deployment_name): | |
| query_handle: A query handle with a single method `.query(request_dictionary)` using which queries can be sent to the model. | ||
| """ | ||
|
|
||
| if deployment_name in mii.non_persistent_models: | ||
| inference_pipeline, task = mii.non_persistent_models[deployment_name] | ||
| return MIINonPersistentClient(task, deployment_name) | ||
| if deployment_tag in mii.non_persistent_models: | ||
| inference_pipeline, task = mii.non_persistent_models[deployment_tag] | ||
| return MIINonPersistentClient(task, deployment_tag) | ||
|
|
||
| task_name, mii_configs = _get_deployment_info(deployment_name) | ||
| return MIIClient(task_name, "localhost", mii_configs.port_number) | ||
| deployments = _get_deployment_info(deployment_tag) | ||
| mii_configs_dict = deployments[0][mii.constants.MII_CONFIGS_KEY] | ||
| mii_configs = mii.config.MIIConfig(**mii_configs_dict) | ||
| return MIIClient(deployments, "localhost", mii_configs.port_number) | ||
|
|
||
|
|
||
| def create_channel(host, port): | ||
|
|
@@ -55,24 +67,37 @@ class MIIClient(): | |
| """ | ||
| Client to send queries to a single endpoint. | ||
| """ | ||
| def __init__(self, task_name, host, port): | ||
| def __init__(self, deployments, host, port): | ||
| self.asyncio_loop = asyncio.get_event_loop() | ||
| channel = create_channel(host, port) | ||
| self.stub = modelresponse_pb2_grpc.ModelResponseStub(channel) | ||
| self.task = get_task(task_name) | ||
| #self.task = get_task(task_name) | ||
| self.deployments = deployments | ||
|
|
||
| async def _request_async_response(self, request_dict, **query_kwargs): | ||
| if self.task not in GRPC_METHOD_TABLE: | ||
| raise ValueError(f"unknown task: {self.task}") | ||
| async def _request_async_response(self, request_dict, task, **query_kwargs): | ||
| if task not in GRPC_METHOD_TABLE: | ||
| raise ValueError(f"unknown task: {task}") | ||
|
|
||
| task_methods = GRPC_METHOD_TABLE[self.task] | ||
| task_methods = GRPC_METHOD_TABLE[task] | ||
| proto_request = task_methods.pack_request_to_proto(request_dict, **query_kwargs) | ||
| proto_response = await getattr(self.stub, task_methods.method)(proto_request) | ||
| return task_methods.unpack_response_from_proto(proto_response) | ||
|
|
||
| def query(self, request_dict, **query_kwargs): | ||
| def query(self, request_dict, deployment_name=None, **query_kwargs): | ||
| task = None | ||
| if deployment_name is None: #mii.terminate() or single model | ||
| #assert len(self.deployments) == 1, "Must pass deployment_name to query when using multiple deployments" | ||
| deployment_name = self.deployments[0]['deployment_name'] | ||
| task = get_task(self.deployments[0]['task_name']) | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| else: | ||
| for deployment in self.deployments: | ||
| if deployment[mii.constants.DEPLOYMENT_NAME_KEY] == deployment_name: | ||
| task = get_task(deployment[mii.constants.TASK_NAME_KEY]) | ||
| break | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| query_kwargs['deployment_name'] = deployment_name | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| return self.asyncio_loop.run_until_complete( | ||
| self._request_async_response(request_dict, | ||
| task, | ||
| **query_kwargs)) | ||
|
|
||
| async def terminate_async(self): | ||
|
|
@@ -86,17 +111,35 @@ async def create_session_async(self, session_id): | |
| return await self.stub.CreateSession( | ||
| modelresponse_pb2.SessionID(session_id=session_id)) | ||
|
|
||
| def create_session(self, session_id): | ||
| assert self.task == Tasks.TEXT_GENERATION, f"Session creation only available for task '{Tasks.TEXT_GENERATION}'." | ||
| def create_session(self, session_id, deployment_name=None): | ||
| task = None | ||
| if deployment_name is None: #mii.terminate() or single model | ||
| deployment_name = self.deployments[0][mii.constants.DEPLOYMENT_NAME_KEY] | ||
| task = get_task(self.deployments[0][mii.constants.TASK_NAME_KEY]) | ||
| else: | ||
| for deployment in self.deployments: | ||
| if deployment[mii.constants.DEPLOYMENT_NAME_KEY] == deployment_name: | ||
| task = get_task(deployment[mii.constants.TASK_NAME_KEY]) | ||
| break | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| assert task == Tasks.TEXT_GENERATION, f"Session creation only available for task '{Tasks.TEXT_GENERATION}'." | ||
| return self.asyncio_loop.run_until_complete( | ||
| self.create_session_async(session_id)) | ||
|
|
||
| async def destroy_session_async(self, session_id): | ||
| await self.stub.DestroySession(modelresponse_pb2.SessionID(session_id=session_id) | ||
| ) | ||
|
|
||
| def destroy_session(self, session_id): | ||
| assert self.task == Tasks.TEXT_GENERATION, f"Session deletion only available for task '{Tasks.TEXT_GENERATION}'." | ||
| def destroy_session(self, session_id, deployment_name=None): | ||
| task = None | ||
| if deployment_name is None: #mii.terminate() or single model | ||
| deployment_name = self.deployments[0][mii.constants.DEPLOYMENT_NAME_KEY] | ||
| task = get_task(self.deployments[0][mii.constants.TASK_NAME_KEY]) | ||
| else: | ||
| for deployment in self.deployments: | ||
| if deployment[mii.constants.DEPLOYMENT_NAME_KEY] == deployment_name: | ||
| task = get_task(deployment[mii.constants.TASK_NAME_KEY]) | ||
| break | ||
|
TosinSeg marked this conversation as resolved.
Outdated
|
||
| assert task == Tasks.TEXT_GENERATION, f"Session deletion only available for task '{Tasks.TEXT_GENERATION}'." | ||
| self.asyncio_loop.run_until_complete(self.destroy_session_async(session_id)) | ||
|
|
||
|
|
||
|
|
@@ -157,7 +200,7 @@ def destroy_session(self, session_id): | |
|
|
||
| class MIINonPersistentClient(): | ||
| def __init__(self, task, deployment_name): | ||
| self.task = task | ||
| self.task = get_task(task) | ||
| self.deployment_name = deployment_name | ||
|
|
||
| def query(self, request_dict, **query_kwargs): | ||
|
|
@@ -188,7 +231,10 @@ def terminate(self): | |
| del mii.non_persistent_models[self.deployment_name] | ||
|
|
||
|
|
||
| def terminate_restful_gateway(deployment_name): | ||
| _, mii_configs = _get_deployment_info(deployment_name) | ||
| if mii_configs.enable_restful_api: | ||
| requests.get(f"http://localhost:{mii_configs.restful_api_port}/terminate") | ||
| def terminate_restful_gateway(deployment_tag): | ||
| deployments = _get_deployment_info(deployment_tag) | ||
| for deployment in deployments: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we support having separate restful APIs for each deployment in a multi-model deployment? I think it is tied to the load balancer, not the inference servers. |
||
| mii_configs_dict = deployment[mii.constants.MII_CONFIGS_KEY] | ||
| mii_configs = mii.config.MIIConfig(**mii_configs_dict) | ||
| if mii_configs.enable_restful_api: | ||
| requests.get(f"http://localhost:{mii_configs.restful_api_port}/terminate") | ||
Uh oh!
There was an error while loading. Please reload this page.