feat support wss(beta)

This commit is contained in:
LanQian 2024-04-24 14:39:08 +08:00
parent f887656ba0
commit e92eddc8d5
4 changed files with 252 additions and 329 deletions

View File

@ -70,7 +70,7 @@ def split_tokens_from_content(content, max_tokens, model=None):
encoding = tiktoken.get_encoding("cl100k_base")
encoded_content = encoding.encode(content)
len_encoded_content = len(encoded_content)
if len_encoded_content > max_tokens:
if len_encoded_content >= max_tokens:
content = encoding.decode(encoded_content[:max_tokens])
return content, max_tokens, "length"
else:

6
app.py
View File

@ -37,11 +37,7 @@ async def send_conversation(request: Request, token=Depends(verify_token)):
raise HTTPException(status_code=400, detail={"error": "Invalid JSON body"})
chat_service = await async_retry(to_send_conversation, request_data, access_token)
chat_service.prepare_send_conversation(
parent_message_id=request_data.get('parent_message_id'),
conversation_id=request_data.get('conversation_id'),
history_disabled= request_data.get('history_disabled', True) # default is True
)
chat_service.prepare_send_conversation()
res = await chat_service.send_conversation()
if isinstance(res, types.AsyncGeneratorType):

View File

@ -1,293 +1,15 @@
import json
import random
import string
import time
import uuid
import websockets
import asyncio
import base64
from fastapi import HTTPException
from api.chat_completions import num_tokens_from_messages, model_system_fingerprint, model_proxy, num_tokens_from_content
from api.chat_completions import num_tokens_from_messages, model_proxy
from chatgpt.chatResponse import api_messages_to_chat, stream_response, wss_response_stream, format_not_stream_response
from chatgpt.proofofwork import calc_proof_token
from utils.Client import Client
from utils.Logger import Logger
from utils.config import proxy_url_list, chatgpt_base_url_list, arkose_token_url_list
moderation_message = "I'm sorry, I cannot provide or engage in any content related to pornography, violence, or any unethical material. If you have any other questions or need assistance, please feel free to let me know. I'll do my best to provide support and assistance."
async def stream_response(service, response, model, max_tokens):
chat_id = f"chatcmpl-{''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29))}"
system_fingerprint_list = model_system_fingerprint.get(model, None)
system_fingerprint = random.choice(system_fingerprint_list) if system_fingerprint_list else None
created_time = int(time.time())
completion_tokens = -1
len_last_content = 0
last_content_type = None
last_recipient = None
end = False
message_id = None
current_message_id = None
conversation_id = None
all_text = ""
async for chunk in response:
chunk = chunk.decode("utf-8")
print(f"chunk:{chunk}")
if end:
yield "data: [DONE]\n\n"
break
try:
# to ignore afterwards \n\n for websocket message.
if chunk.startswith("data: [DONE]"):
yield "data: [DONE]\n\n"
elif not chunk.startswith("data: "):
continue
else:
chunk_old_data = json.loads(chunk[6:])
finish_reason = None
message = chunk_old_data.get("message", {})
role = message.get('author', {}).get('role')
if role == 'user':
continue
status = message.get("status")
content = message.get("content", {})
recipient = message.get("recipient", "")
current_message_id = message.get('id')
conversation_id = chunk_old_data.get('conversation_id')
if not message and chunk_old_data.get("type") == "moderation":
delta = {"role": "assistant", "content": moderation_message}
finish_reason = "stop"
end = True
elif status == "in_progress":
outer_content_type = content.get("content_type")
if outer_content_type == "text":
part = content.get("parts", [])[0]
if not part:
message_id = message.get("id")
new_text = ""
else:
# for wss message, first valid text, message_id is None
if message_id and message_id != message.get("id"):
continue
new_text = part[len_last_content:]
len_last_content = len(part)
else:
text = content.get("text", "")
if outer_content_type == "code" and last_content_type != "code":
new_text = "\n```" + recipient + "\n" + text[len_last_content:]
elif outer_content_type == "execution_output" and last_content_type != "execution_output":
new_text = "\n```" + "Output" + "\n" + text[len_last_content:]
else:
new_text = text[len_last_content:]
len_last_content = len(text)
if last_content_type == "code" and outer_content_type != "code":
new_text = "\n```\n" + new_text
elif last_content_type == "execution_output" and outer_content_type != "execution_output":
new_text = "\n```\n" + new_text
if recipient == "dalle.text2im" and last_recipient != "dalle.text2im":
new_text = "\n```" + "json" + "\n" + new_text
delta = {"content": new_text}
last_content_type = outer_content_type
last_recipient = recipient
if completion_tokens >= max_tokens:
delta = {}
finish_reason = "length"
end = True
elif status == "finished_successfully":
if content.get("content_type") == "multimodal_text":
parts = content.get("parts", [])
delta = {}
for part in parts:
inner_content_type = part.get('content_type')
if inner_content_type == "image_asset_pointer":
last_content_type = "image_asset_pointer"
asset_pointer = part.get('asset_pointer').replace('file-service://', '')
Logger.debug(f"asset_pointer: {asset_pointer}")
image_download_url = await service.get_image_download_url(asset_pointer)
Logger.debug(f"image_download_url: {image_download_url}")
if image_download_url:
delta = {"content": f"\n```\n![image]({image_download_url})\n"}
else:
delta = {"content": f"\n```\nFailed to load the image.\n"}
elif not message.get("end_turn") or not message.get("metadata").get("finish_details"):
message_id = None
len_last_content = 0
continue
else:
delta = {}
finish_reason = "stop"
end = True
else:
continue
if not delta.get("content"):
delta = {"role": "assistant", "content": ""}
chunk_new_data = {
"id": chat_id,
"message_id": current_message_id,
"conversation_id": conversation_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"delta": delta,
"logprobs": None,
"finish_reason": finish_reason
}
],
"system_fingerprint": system_fingerprint
}
completion_tokens += 1
print(f'chunk_new_data:{chunk_new_data}')
yield f"data: {json.dumps(chunk_new_data)}\n\n"
except Exception as e:
Logger.error(f"Error: {chunk}, error: {str(e)}")
continue
async def chat_response(service, response, prompt_tokens, model, max_tokens):
chat_id = f"chatcmpl-{''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29))}"
system_fingerprint_list = model_system_fingerprint.get(model, None)
system_fingerprint = random.choice(system_fingerprint_list) if system_fingerprint_list else None
created_time = int(time.time())
finish_reason = "stop"
completion_tokens = -1
len_last_content = 0
last_content_type = None
last_recipient = None
end = False
message_id = None
all_text = ""
async for chunk in response.aiter_lines():
chunk = chunk.decode("utf-8")
if end:
break
try:
if chunk.startswith("data: [DONE]"): # ignore afterwards \n\n for websocket.
break
elif not chunk.startswith("data: "):
continue
else:
chunk_old_data = json.loads(chunk[6:])
finish_reason = None
message = chunk_old_data.get("message", {})
status = message.get("status")
content = message.get("content", {})
recipient = message.get("recipient", "")
if not message and chunk_old_data.get("type") == "moderation":
delta = {"role": "assistant", "content": moderation_message}
finish_reason = "stop"
end = True
elif status == "in_progress":
outer_content_type = content.get("content_type")
if outer_content_type == "text":
part = content.get("parts", [])[0]
if not part:
message_id = message.get("id")
new_text = ""
else:
if message_id != message.get("id"):
continue
new_text = part[len_last_content:]
len_last_content = len(part)
else:
text = content.get("text", "")
if outer_content_type == "code" and last_content_type != "code":
new_text = "\n```" + recipient + "\n" + text[len_last_content:]
elif outer_content_type == "execution_output" and last_content_type != "execution_output":
new_text = "\n```" + "Output" + "\n" + text[len_last_content:]
else:
new_text = text[len_last_content:]
len_last_content = len(text)
if last_content_type == "code" and outer_content_type != "code":
new_text = "\n```\n" + new_text
elif last_content_type == "execution_output" and outer_content_type != "execution_output":
new_text = "\n```\n" + new_text
if recipient == "dalle.text2im" and last_recipient != "dalle.text2im":
new_text = "\n```" + "json" + "\n" + new_text
delta = {"content": new_text}
last_content_type = outer_content_type
last_recipient = recipient
if completion_tokens >= max_tokens:
delta = {}
finish_reason = "length"
end = True
elif status == "finished_successfully":
if content.get("content_type") == "multimodal_text":
parts = content.get("parts", [])
delta = {}
for part in parts:
inner_content_type = part.get('content_type')
if inner_content_type == "image_asset_pointer":
last_content_type = "image_asset_pointer"
asset_pointer = part.get('asset_pointer').replace('file-service://', '')
Logger.debug(f"asset_pointer: {asset_pointer}")
image_download_url = await service.get_image_download_url(asset_pointer)
Logger.debug(f"image_download_url: {image_download_url}")
if image_download_url:
delta = {"content": f"\n```\n![image]({image_download_url})\n"}
else:
delta = {"content": f"\n```\nFailed to load the image.\n"}
elif not message.get("end_turn") or not message.get("metadata").get("finish_details"):
message_id = None
len_last_content = 0
continue
else:
delta = {}
finish_reason = "stop"
end = True
else:
continue
all_text += delta.get("content", "")
completion_tokens += 1
except Exception as e:
Logger.error(f"Error: {chunk}, error: {str(e)}")
continue
completion_tokens = num_tokens_from_content(all_text, model)
message = {
"role": "assistant",
"content": all_text,
}
usage = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
return {
"id": chat_id,
"object": "chat.completion",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"message": message,
"logprobs": None,
"finish_reason": finish_reason
}
],
"usage": usage,
"system_fingerprint": system_fingerprint
}
def api_messages_to_chat(api_messages):
chat_messages = []
for api_message in api_messages:
role = api_message.get('role')
content = api_message.get('content')
chat_message = {
"id": f"{uuid.uuid4()}",
"author": {"role": role},
"content": {"content_type": "text", "parts": [content]},
"metadata": {}
}
chat_messages.append(chat_message)
return chat_messages
from utils.config import proxy_url_list, chatgpt_base_url_list, arkose_token_url_list, history_disabled
class ChatService:
@ -308,6 +30,10 @@ class ChatService:
self.arkose_token_url = random.choice(arkose_token_url_list) if arkose_token_url_list else None
self.proof_token = None
self.parent_message_id = data.get('parent_message_id')
self.conversation_id = data.get('conversation_id')
self.history_disabled = data.get('history_disabled', history_disabled)
self.data = data
self.model = self.data.get("model", "gpt-3.5-turbo-0125")
self.api_messages = self.data.get("messages", [])
@ -391,7 +117,7 @@ class ChatService:
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def prepare_send_conversation(self, parent_message_id=None, conversation_id=None, history_disabled=True):
def prepare_send_conversation(self):
self.headers = {
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip, deflate, br',
@ -419,60 +145,27 @@ class ChatService:
model = "gpt-4"
else:
model = "text-davinci-002-render-sha"
parent_message_id = parent_message_id if parent_message_id else f"{uuid.uuid4()}"
print(f"input conversation_id: {conversation_id}")
websocket_request_id = f"{uuid.uuid4()}"
self.chat_request = {
"action": "next",
"messages": chat_messages,
"parent_message_id": parent_message_id,
"parent_message_id": self.parent_message_id if self.parent_message_id else f"{uuid.uuid4()}",
"model": model,
"timezone_offset_min": -480,
"suggestions": [],
# let user decide whether or not we need to keep conversation history.
"history_and_training_disabled": history_disabled,
"history_and_training_disabled": self.history_disabled,
"conversation_mode": {"kind": "primary_assistant"},
"force_paragen": False,
"force_paragen_model_slug": "",
"force_nulligen": False,
"force_rate_limit": False,
"websocket_request_id": websocket_request_id,
"websocket_request_id": f"{uuid.uuid4()}",
}
if conversation_id:
self.chat_request['conversation_id'] = conversation_id
print(f"chat_request:{self.chat_request}", flush=True)
if self.conversation_id:
self.chat_request['conversation_id'] = self.conversation_id
return self.chat_request
async def wss_response_stream(self, detail):
wss_url = detail.get('wss_url')
subprotocols = ["json.reliable.webpubsub.azure.v1"]
async with websockets.connect(wss_url, ping_interval=None, subprotocols=subprotocols) as websocket:
while True:
message = None
try:
message = await asyncio.wait_for(websocket.recv(), timeout=30)
if message:
# print(f'wss messsage:{message}')
resultObj = json.loads(message)
sequenceId = resultObj.get("sequenceId", None)
if not sequenceId:
continue
result = resultObj.get("data", {}).get("body", None)
# {"type": "http.response.body", "body": "ZGF0YTogeyJjb252ZXJzYXRpb25faWQiOiAiNDJhNjNiNWItNDVhZC00Nzg0LWIwNGMtZWNlMDIyNDZhY2Q4IiwgIm1lc3NhZ2VfaWQiOiAiZGNiMTUxM2ItZmU1NC00ZWM5LWJiY2MtZmRhYTJkZWRhMzI0IiwgImlzX2NvbXBsZXRpb24iOiB0cnVlLCAibW9kZXJhdGlvbl9yZXNwb25zZSI6IHsiZmxhZ2dlZCI6IGZhbHNlLCAiYmxvY2tlZCI6IGZhbHNlLCAibW9kZXJhdGlvbl9pZCI6ICJtb2RyLThublA5UGpZVEZLUHl6Z0hDRlFtNHZTOUNESTU1In19Cgo=", "more_body": true, "response_id": "84f29a22d9ac8c4b-EWR", "conversation_id": "42a63b5b-45ad-4784-b04c-ece02246acd8"}
decoded_bytes = base64.b64decode(result)
# result = decoded_bytes.decode("utf-8")
yield decoded_bytes
else:
return
# print(f"Message from server: {message}")
except asyncio.TimeoutError:
# Handle timeout, e.g., by breaking the loop or doing something else
print("Timeout! No message received within the specified time.")
return
async def send_conversation(self):
url = f'{self.base_url}/conversation'
# Check for model access or existence
if "gpt-4" in self.model and self.persona != "chatgpt-paid":
raise HTTPException(status_code=404, detail={
"message": f"The model `{self.model}` does not exist or you do not have access to it.",
@ -495,15 +188,16 @@ class ChatService:
content_type = r.headers.get("Content-Type", "")
if "text/event-stream" in content_type and stream:
return stream_response(self, r.aiter_lines(), model, self.max_tokens)
elif "text/event-stream" in content_type and not stream:
return await format_not_stream_response(stream_response(self, r.aiter_lines(), model, self.max_tokens), self.prompt_tokens, self.max_tokens, model)
elif "application/json" in content_type:
rtext = await r.atext()
detail = json.loads(rtext).get("detail", json.loads(rtext))
wss_r = wss_response_stream(detail)
if stream:
print(f"detail: {detail}", flush=True)
wss_r = self.wss_response_stream(detail)
return stream_response(self, wss_r, model, self.max_tokens)
else:
return chat_response(self, r, self.prompt_tokens, model, self.max_tokens)
return await format_not_stream_response(stream_response(self, wss_r, model, self.max_tokens), self.prompt_tokens, self.max_tokens, model)
else:
raise HTTPException(status_code=r.status_code, detail="Unsupported Content-Type")
except HTTPException as e:

233
chatgpt/chatResponse.py Normal file
View File

@ -0,0 +1,233 @@
import asyncio
import base64
import json
import random
import string
import time
import uuid
import websockets
from api.chat_completions import model_system_fingerprint, split_tokens_from_content
from utils.Logger import Logger
moderation_message = "I'm sorry, I cannot provide or engage in any content related to pornography, violence, or any unethical material. If you have any other questions or need assistance, please feel free to let me know. I'll do my best to provide support and assistance."
async def format_not_stream_response(response, prompt_tokens, max_tokens, model):
chat_id = f"chatcmpl-{''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29))}"
system_fingerprint_list = model_system_fingerprint.get(model, None)
system_fingerprint = random.choice(system_fingerprint_list) if system_fingerprint_list else None
created_time = int(time.time())
all_text = ""
async for chunk in response:
try:
if chunk.startswith("data: [DONE]"):
break
elif not chunk.startswith("data: "):
continue
else:
chunk = json.loads(chunk[6:])
if not chunk["choices"][0].get("delta"):
continue
all_text += chunk["choices"][0]["delta"]["content"]
except Exception as e:
Logger.error(f"Error: {chunk}, error: {str(e)}")
continue
content, completion_tokens, finish_reason = split_tokens_from_content(all_text, max_tokens, model)
message = {
"role": "assistant",
"content": content,
}
usage = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
return {
"id": chat_id,
"object": "chat.completion",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"message": message,
"logprobs": None,
"finish_reason": finish_reason
}
],
"usage": usage,
"system_fingerprint": system_fingerprint
}
async def wss_response_stream(detail):
wss_url = detail.get('wss_url')
subprotocols = ["json.reliable.webpubsub.azure.v1"]
async with websockets.connect(wss_url, ping_interval=None, subprotocols=subprotocols) as websocket:
while True:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=15)
if message:
resultObj = json.loads(message)
sequenceId = resultObj.get("sequenceId", None)
if not sequenceId:
continue
result = resultObj.get("data", {}).get("body", None)
decoded_bytes = base64.b64decode(result)
# print(f"decoded_bytes:{decoded_bytes}")
yield decoded_bytes
else:
continue
except asyncio.TimeoutError:
Logger.error("Timeout! No message received within the specified time.")
return
async def stream_response(service, response, model, max_tokens):
chat_id = f"chatcmpl-{''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29))}"
system_fingerprint_list = model_system_fingerprint.get(model, None)
system_fingerprint = random.choice(system_fingerprint_list) if system_fingerprint_list else None
created_time = int(time.time())
completion_tokens = -1
len_last_content = 0
last_content_type = None
last_recipient = None
end = False
message_id = None
async for chunk in response:
chunk = chunk.decode("utf-8")
if end:
yield "data: [DONE]\n\n"
break
try:
if chunk.startswith("data: [DONE]"):
yield "data: [DONE]\n\n"
elif not chunk.startswith("data: "):
continue
else:
chunk_old_data = json.loads(chunk[6:])
finish_reason = None
message = chunk_old_data.get("message", {})
role = message.get('author', {}).get('role')
if role == 'user':
continue
status = message.get("status")
content = message.get("content", {})
recipient = message.get("recipient", "")
current_message_id = message.get('id')
conversation_id = chunk_old_data.get('conversation_id')
if not message and chunk_old_data.get("type") == "moderation":
delta = {"role": "assistant", "content": moderation_message}
finish_reason = "stop"
end = True
elif status == "in_progress":
outer_content_type = content.get("content_type")
if outer_content_type == "text":
part = content.get("parts", [])[0]
if not part:
message_id = message.get("id")
new_text = ""
else:
# for wss message, first valid text, message_id is None
if message_id and message_id != message.get("id"):
continue
new_text = part[len_last_content:]
len_last_content = len(part)
else:
text = content.get("text", "")
if outer_content_type == "code" and last_content_type != "code":
new_text = "\n```" + recipient + "\n" + text[len_last_content:]
elif outer_content_type == "execution_output" and last_content_type != "execution_output":
new_text = "\n```" + "Output" + "\n" + text[len_last_content:]
else:
new_text = text[len_last_content:]
len_last_content = len(text)
if last_content_type == "code" and outer_content_type != "code":
new_text = "\n```\n" + new_text
elif last_content_type == "execution_output" and outer_content_type != "execution_output":
new_text = "\n```\n" + new_text
if recipient == "dalle.text2im" and last_recipient != "dalle.text2im":
new_text = "\n```" + "json" + "\n" + new_text
delta = {"content": new_text}
last_content_type = outer_content_type
last_recipient = recipient
if completion_tokens >= max_tokens:
delta = {}
finish_reason = "length"
end = True
elif status == "finished_successfully":
if content.get("content_type") == "multimodal_text":
parts = content.get("parts", [])
delta = {}
for part in parts:
inner_content_type = part.get('content_type')
if inner_content_type == "image_asset_pointer":
last_content_type = "image_asset_pointer"
asset_pointer = part.get('asset_pointer').replace('file-service://', '')
Logger.debug(f"asset_pointer: {asset_pointer}")
image_download_url = await service.get_image_download_url(asset_pointer)
Logger.debug(f"image_download_url: {image_download_url}")
if image_download_url:
delta = {"content": f"\n```\n![image]({image_download_url})\n"}
else:
delta = {"content": f"\n```\nFailed to load the image.\n"}
elif not message.get("end_turn") or not message.get("metadata").get("finish_details"):
message_id = None
len_last_content = 0
continue
else:
part = content.get("parts", [])[0]
new_text = part[len_last_content:]
if not new_text:
delta = {}
else:
delta = {"content": new_text}
finish_reason = "stop"
end = True
else:
continue
if not end and not delta.get("content"):
delta = {"role": "assistant", "content": ""}
chunk_new_data = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"delta": delta,
"logprobs": None,
"finish_reason": finish_reason
}
],
"system_fingerprint": system_fingerprint
}
if not service.history_disabled:
chunk_new_data.update({
"message_id": current_message_id,
"conversation_id": conversation_id,
})
completion_tokens += 1
# print(f'chunk_new_data:{json.dumps(chunk_new_data)}\n\n')
yield f"data: {json.dumps(chunk_new_data)}\n\n"
except Exception as e:
Logger.error(f"Error: {chunk}, details: {str(e)}")
continue
def api_messages_to_chat(api_messages):
chat_messages = []
for api_message in api_messages:
role = api_message.get('role')
content = api_message.get('content')
chat_message = {
"id": f"{uuid.uuid4()}",
"author": {"role": role},
"content": {"content_type": "text", "parts": [content]},
"metadata": {}
}
chat_messages.append(chat_message)
return chat_messages