From 6ce493d4dfc2884832ff5b5be4479a55818b2fe7 Mon Sep 17 00:00:00 2001 From: H Lohaus Date: Sat, 16 Nov 2024 13:19:51 +0100 Subject: Fix api streaming, fix AsyncClient (#2357) * Fix api streaming, fix AsyncClient, Improve Client class, Some providers fixes, Update models list, Fix some tests, Update model list in Airforce provid er, Add OpenAi image generation url to api, Fix reload and debug in api arguments, Fix websearch in gui * Fix Cloadflare and Pi and AmigoChat provider * Fix conversation support in DDG provider, Add cloudflare bypass with nodriver * Fix unittests without curl_cffi --- g4f/client/client.py | 541 --------------------------------------------------- 1 file changed, 541 deletions(-) delete mode 100644 g4f/client/client.py (limited to 'g4f/client/client.py') diff --git a/g4f/client/client.py b/g4f/client/client.py deleted file mode 100644 index 73d8fea3..00000000 --- a/g4f/client/client.py +++ /dev/null @@ -1,541 +0,0 @@ -from __future__ import annotations - -import os -import time -import random -import string -import threading -import asyncio -import base64 -import aiohttp -import queue -from typing import Union, AsyncIterator, Iterator - -from ..providers.base_provider import AsyncGeneratorProvider -from ..image import ImageResponse, to_image, to_data_uri -from ..typing import Messages, ImageType -from ..providers.types import BaseProvider, ProviderType, FinishReason -from ..providers.conversation import BaseConversation -from ..image import ImageResponse as ImageProviderResponse -from ..errors import NoImageResponseError -from .stubs import ChatCompletion, ChatCompletionChunk, Image, ImagesResponse -from .image_models import ImageModels -from .types import IterResponse, ImageProvider -from .types import Client as BaseClient -from .service import get_model_and_provider, get_last_provider -from .helper import find_stop, filter_json, filter_none -from ..models import ModelUtils -from ..Provider import IterListProvider - -# Helper function to convert an async generator to a synchronous iterator -def to_sync_iter(async_gen: AsyncIterator) -> Iterator: - q = queue.Queue() - loop = asyncio.new_event_loop() - done = object() - - def _run(): - asyncio.set_event_loop(loop) - - async def iterate(): - try: - async for item in async_gen: - q.put(item) - finally: - q.put(done) - - loop.run_until_complete(iterate()) - loop.close() - - threading.Thread(target=_run).start() - - while True: - item = q.get() - if item is done: - break - yield item - -# Helper function to convert a synchronous iterator to an async iterator -async def to_async_iterator(iterator): - for item in iterator: - yield item - -# Synchronous iter_response function -def iter_response( - response: Union[Iterator[str], AsyncIterator[str]], - stream: bool, - response_format: dict = None, - max_tokens: int = None, - stop: list = None -) -> Iterator[Union[ChatCompletion, ChatCompletionChunk]]: - content = "" - finish_reason = None - completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28)) - idx = 0 - - if hasattr(response, '__aiter__'): - # It's an async iterator, wrap it into a sync iterator - response = to_sync_iter(response) - - for chunk in response: - if isinstance(chunk, FinishReason): - finish_reason = chunk.reason - break - elif isinstance(chunk, BaseConversation): - yield chunk - continue - - content += str(chunk) - - if max_tokens is not None and idx + 1 >= max_tokens: - finish_reason = "length" - - first, content, chunk = find_stop(stop, content, chunk if stream else None) - - if first != -1: - finish_reason = "stop" - - if stream: - yield ChatCompletionChunk(chunk, None, completion_id, int(time.time())) - - if finish_reason is not None: - break - - idx += 1 - - finish_reason = "stop" if finish_reason is None else finish_reason - - if stream: - yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time())) - else: - if response_format is not None and "type" in response_format: - if response_format["type"] == "json_object": - content = filter_json(content) - yield ChatCompletion(content, finish_reason, completion_id, int(time.time())) - -# Synchronous iter_append_model_and_provider function -def iter_append_model_and_provider(response: Iterator) -> Iterator: - last_provider = None - - for chunk in response: - last_provider = get_last_provider(True) if last_provider is None else last_provider - chunk.model = last_provider.get("model") - chunk.provider = last_provider.get("name") - yield chunk - -class Client(BaseClient): - def __init__( - self, - provider: ProviderType = None, - image_provider: ImageProvider = None, - **kwargs - ) -> None: - super().__init__(**kwargs) - self.chat: Chat = Chat(self, provider) - self._images: Images = Images(self, image_provider) - - @property - def images(self) -> Images: - return self._images - - async def async_images(self) -> Images: - return self._images - -# For backwards compatibility and legacy purposes, use Client instead -class AsyncClient(Client): - """Legacy AsyncClient that redirects to the main Client class. - This class exists for backwards compatibility.""" - - def __init__(self, *args, **kwargs): - import warnings - warnings.warn( - "AsyncClient is deprecated and will be removed in future versions." - "Use Client instead, which now supports both sync and async operations.", - DeprecationWarning, - stacklevel=2 - ) - super().__init__(*args, **kwargs) - - async def async_create(self, *args, **kwargs): - """Asynchronous create method that calls the synchronous method.""" - return await super().async_create(*args, **kwargs) - - async def async_generate(self, *args, **kwargs): - """Asynchronous image generation method.""" - return await super().async_generate(*args, **kwargs) - - async def async_images(self) -> Images: - """Asynchronous access to images.""" - return await super().async_images() - - async def async_fetch_image(self, url: str) -> bytes: - """Asynchronous fetching of an image by URL.""" - return await self._fetch_image(url) - -class Completions: - def __init__(self, client: Client, provider: ProviderType = None): - self.client: Client = client - self.provider: ProviderType = provider - - def create( - self, - messages: Messages, - model: str, - provider: ProviderType = None, - stream: bool = False, - proxy: str = None, - response_format: dict = None, - max_tokens: int = None, - stop: Union[list[str], str] = None, - api_key: str = None, - ignored: list[str] = None, - ignore_working: bool = False, - ignore_stream: bool = False, - **kwargs - ) -> Union[ChatCompletion, Iterator[ChatCompletionChunk]]: - model, provider = get_model_and_provider( - model, - self.provider if provider is None else provider, - stream, - ignored, - ignore_working, - ignore_stream, - ) - - stop = [stop] if isinstance(stop, str) else stop - - if asyncio.iscoroutinefunction(provider.create_completion): - # Run the asynchronous function in an event loop - response = asyncio.run(provider.create_completion( - model, - messages, - stream=stream, - **filter_none( - proxy=self.client.get_proxy() if proxy is None else proxy, - max_tokens=max_tokens, - stop=stop, - api_key=self.client.api_key if api_key is None else api_key - ), - **kwargs - )) - else: - response = provider.create_completion( - model, - messages, - stream=stream, - **filter_none( - proxy=self.client.get_proxy() if proxy is None else proxy, - max_tokens=max_tokens, - stop=stop, - api_key=self.client.api_key if api_key is None else api_key - ), - **kwargs - ) - - if stream: - if hasattr(response, '__aiter__'): - # It's an async generator, wrap it into a sync iterator - response = to_sync_iter(response) - - # Now 'response' is an iterator - response = iter_response(response, stream, response_format, max_tokens, stop) - response = iter_append_model_and_provider(response) - return response - else: - if hasattr(response, '__aiter__'): - # If response is an async generator, collect it into a list - response = list(to_sync_iter(response)) - response = iter_response(response, stream, response_format, max_tokens, stop) - response = iter_append_model_and_provider(response) - return next(response) - - async def async_create( - self, - messages: Messages, - model: str, - provider: ProviderType = None, - stream: bool = False, - proxy: str = None, - response_format: dict = None, - max_tokens: int = None, - stop: Union[list[str], str] = None, - api_key: str = None, - ignored: list[str] = None, - ignore_working: bool = False, - ignore_stream: bool = False, - **kwargs - ) -> Union[ChatCompletion, AsyncIterator[ChatCompletionChunk]]: - model, provider = get_model_and_provider( - model, - self.provider if provider is None else provider, - stream, - ignored, - ignore_working, - ignore_stream, - ) - - stop = [stop] if isinstance(stop, str) else stop - - if asyncio.iscoroutinefunction(provider.create_completion): - response = await provider.create_completion( - model, - messages, - stream=stream, - **filter_none( - proxy=self.client.get_proxy() if proxy is None else proxy, - max_tokens=max_tokens, - stop=stop, - api_key=self.client.api_key if api_key is None else api_key - ), - **kwargs - ) - else: - response = provider.create_completion( - model, - messages, - stream=stream, - **filter_none( - proxy=self.client.get_proxy() if proxy is None else proxy, - max_tokens=max_tokens, - stop=stop, - api_key=self.client.api_key if api_key is None else api_key - ), - **kwargs - ) - - # Removed 'await' here since 'async_iter_response' returns an async generator - response = async_iter_response(response, stream, response_format, max_tokens, stop) - response = async_iter_append_model_and_provider(response) - - if stream: - return response - else: - async for result in response: - return result - -class Chat: - completions: Completions - - def __init__(self, client: Client, provider: ProviderType = None): - self.completions = Completions(client, provider) - -# Asynchronous versions of the helper functions -async def async_iter_response( - response: Union[AsyncIterator[str], Iterator[str]], - stream: bool, - response_format: dict = None, - max_tokens: int = None, - stop: list = None -) -> AsyncIterator[Union[ChatCompletion, ChatCompletionChunk]]: - content = "" - finish_reason = None - completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28)) - idx = 0 - - if not hasattr(response, '__aiter__'): - response = to_async_iterator(response) - - async for chunk in response: - if isinstance(chunk, FinishReason): - finish_reason = chunk.reason - break - elif isinstance(chunk, BaseConversation): - yield chunk - continue - - content += str(chunk) - - if max_tokens is not None and idx + 1 >= max_tokens: - finish_reason = "length" - - first, content, chunk = find_stop(stop, content, chunk if stream else None) - - if first != -1: - finish_reason = "stop" - - if stream: - yield ChatCompletionChunk(chunk, None, completion_id, int(time.time())) - - if finish_reason is not None: - break - - idx += 1 - - finish_reason = "stop" if finish_reason is None else finish_reason - - if stream: - yield ChatCompletionChunk(None, finish_reason, completion_id, int(time.time())) - else: - if response_format is not None and "type" in response_format: - if response_format["type"] == "json_object": - content = filter_json(content) - yield ChatCompletion(content, finish_reason, completion_id, int(time.time())) - -async def async_iter_append_model_and_provider(response: AsyncIterator) -> AsyncIterator: - last_provider = None - - if not hasattr(response, '__aiter__'): - response = to_async_iterator(response) - - async for chunk in response: - last_provider = get_last_provider(True) if last_provider is None else last_provider - chunk.model = last_provider.get("model") - chunk.provider = last_provider.get("name") - yield chunk - -async def iter_image_response(response: AsyncIterator) -> Union[ImagesResponse, None]: - response_list = [] - async for chunk in response: - if isinstance(chunk, ImageProviderResponse): - response_list.extend(chunk.get_list()) - elif isinstance(chunk, str): - response_list.append(chunk) - - if response_list: - return ImagesResponse([Image(image) for image in response_list]) - - return None - -async def create_image(client: Client, provider: ProviderType, prompt: str, model: str = "", **kwargs) -> AsyncIterator: - if isinstance(provider, type) and provider.__name__ == "You": - kwargs["chat_mode"] = "create" - else: - prompt = f"create an image with: {prompt}" - - if asyncio.iscoroutinefunction(provider.create_completion): - response = await provider.create_completion( - model, - [{"role": "user", "content": prompt}], - stream=True, - proxy=client.get_proxy(), - **kwargs - ) - else: - response = provider.create_completion( - model, - [{"role": "user", "content": prompt}], - stream=True, - proxy=client.get_proxy(), - **kwargs - ) - - # Wrap synchronous iterator into async iterator if necessary - if not hasattr(response, '__aiter__'): - response = to_async_iterator(response) - - return response - -class Image: - def __init__(self, url: str = None, b64_json: str = None): - self.url = url - self.b64_json = b64_json - - def __repr__(self): - return f"Image(url={self.url}, b64_json={'' if self.b64_json else None})" - -class ImagesResponse: - def __init__(self, data: list[Image]): - self.data = data - - def __repr__(self): - return f"ImagesResponse(data={self.data})" - -class Images: - def __init__(self, client: 'Client', provider: 'ImageProvider' = None): - self.client: 'Client' = client - self.provider: 'ImageProvider' = provider - self.models: ImageModels = ImageModels(client) - - def generate(self, prompt: str, model: str = None, response_format: str = "url", **kwargs) -> ImagesResponse: - """ - Synchronous generate method that runs the async_generate method in an event loop. - """ - return asyncio.run(self.async_generate(prompt, model, response_format=response_format, **kwargs)) - - async def async_generate(self, prompt: str, model: str = None, response_format: str = "url", **kwargs) -> ImagesResponse: - provider = self.models.get(model, self.provider) - if provider is None: - raise ValueError(f"Unknown model: {model}") - - if isinstance(provider, IterListProvider): - if provider.providers: - provider = provider.providers[0] - else: - raise ValueError(f"IterListProvider for model {model} has no providers") - - if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider): - messages = [{"role": "user", "content": prompt}] - async for response in provider.create_async_generator(model, messages, **kwargs): - if isinstance(response, ImageResponse): - return await self._process_image_response(response, response_format) - elif isinstance(response, str): - image_response = ImageResponse([response], prompt) - return await self._process_image_response(image_response, response_format) - elif hasattr(provider, 'create'): - if asyncio.iscoroutinefunction(provider.create): - response = await provider.create(prompt) - else: - response = provider.create(prompt) - - if isinstance(response, ImageResponse): - return await self._process_image_response(response, response_format) - elif isinstance(response, str): - image_response = ImageResponse([response], prompt) - return await self._process_image_response(image_response, response_format) - else: - raise ValueError(f"Provider {provider} does not support image generation") - - raise NoImageResponseError(f"Unexpected response type: {type(response)}") - - async def _process_image_response(self, response: ImageResponse, response_format: str) -> ImagesResponse: - processed_images = [] - - for image_data in response.get_list(): - if image_data.startswith('http://') or image_data.startswith('https://'): - if response_format == "url": - processed_images.append(Image(url=image_data)) - elif response_format == "b64_json": - # Fetch the image data and convert it to base64 - image_content = await self._fetch_image(image_data) - b64_json = base64.b64encode(image_content).decode('utf-8') - processed_images.append(Image(b64_json=b64_json)) - else: - # Assume image_data is base64 data or binary - if response_format == "url": - if image_data.startswith('data:image'): - # Remove the data URL scheme and get the base64 data - header, base64_data = image_data.split(',', 1) - else: - base64_data = image_data - # Decode the base64 data - image_data_bytes = base64.b64decode(base64_data) - # Convert bytes to an image - image = to_image(image_data_bytes) - file_name = self._save_image(image) - processed_images.append(Image(url=file_name)) - elif response_format == "b64_json": - if isinstance(image_data, bytes): - b64_json = base64.b64encode(image_data).decode('utf-8') - else: - b64_json = image_data # If already base64-encoded string - processed_images.append(Image(b64_json=b64_json)) - - return ImagesResponse(processed_images) - - async def _fetch_image(self, url: str) -> bytes: - # Asynchronously fetch image data from the URL - async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - if resp.status == 200: - return await resp.read() - else: - raise Exception(f"Failed to fetch image from {url}, status code {resp.status}") - - def _save_image(self, image: 'PILImage') -> str: - os.makedirs('generated_images', exist_ok=True) - file_name = f"generated_images/image_{int(time.time())}_{random.randint(0, 10000)}.png" - image.save(file_name) - return file_name - - async def create_variation(self, image: Union[str, bytes], model: str = None, response_format: str = "url", **kwargs): - # Existing implementation, adjust if you want to support b64_json here as well - pass -- cgit v1.2.3