from __future__ import annotations import json import uuid import re import time from datetime import datetime, timezone, timedelta import urllib.parse from ...typing import AsyncResult, Messages, Cookies, ImagesType from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin from ..helper import format_prompt, format_image_prompt from ...providers.response import JsonConversation, ImageResponse, Reasoning from ...requests.aiohttp import StreamSession, StreamResponse, FormData from ...requests.raise_for_status import raise_for_status from ...image import to_bytes, is_accepted_format from ...cookies import get_cookies from ...errors import ResponseError from ... import debug class Janus_Pro_7B(AsyncGeneratorProvider, ProviderModelMixin): space = "deepseek-ai/Janus-Pro-7B" url = f"https://huggingface.co/spaces/{space}" api_url = "https://deepseek-ai-janus-pro-7b.hf.space" referer = f"{api_url}?__theme=light" working = True supports_stream = True supports_system_message = True supports_message_history = True default_model = "janus-pro-7b" default_image_model = "janus-pro-7b-image" default_vision_model = default_model models = [default_model, default_image_model] image_models = [default_image_model] @classmethod def run(cls, method: str, session: StreamSession, prompt: str, conversation: JsonConversation, image: dict = None, seed: int = 0): headers = { "content-type": "application/json", "x-zerogpu-token": conversation.zerogpu_token, "x-zerogpu-uuid": conversation.zerogpu_uuid, "referer": cls.referer, } if method == "post": return session.post(f"{cls.api_url}/gradio_api/queue/join?__theme=light", **{ "headers": {k: v for k, v in headers.items() if v is not None}, "json": {"data":[image,prompt,seed,0.95,0.1],"event_data":None,"fn_index":2,"trigger_id":10,"session_hash":conversation.session_hash}, }) elif method == "image": return session.post(f"{cls.api_url}/gradio_api/queue/join?__theme=light", **{ "headers": {k: v for k, v in headers.items() if v is not None}, "json": {"data":[prompt,seed,5,1],"event_data":None,"fn_index":3,"trigger_id":20,"session_hash":conversation.session_hash}, }) return session.get(f"{cls.api_url}/gradio_api/queue/data?session_hash={conversation.session_hash}", **{ "headers": { "accept": "text/event-stream", "content-type": "application/json", "referer": cls.referer, } }) @classmethod async def create_async_generator( cls, model: str, messages: Messages, images: ImagesType = None, prompt: str = None, proxy: str = None, cookies: Cookies = None, return_conversation: bool = False, conversation: JsonConversation = None, seed: int = None, **kwargs ) -> AsyncResult: def generate_session_hash(): """Generate a unique session hash.""" return str(uuid.uuid4()).replace('-', '')[:12] method = "post" if model == cls.default_image_model or prompt is not None: method = "image" prompt = format_prompt(messages) if prompt is None and conversation is None else prompt prompt = format_image_prompt(messages, prompt) if seed is None: seed = int(time.time()) session_hash = generate_session_hash() if conversation is None else getattr(conversation, "session_hash") async with StreamSession(proxy=proxy, impersonate="chrome") as session: session_hash = generate_session_hash() if conversation is None else getattr(conversation, "session_hash") zerogpu_uuid, zerogpu_token = await get_zerogpu_token(cls.space, session, conversation, cookies) if conversation is None or not hasattr(conversation, "session_hash"): conversation = JsonConversation(session_hash=session_hash, zerogpu_token=zerogpu_token, zerogpu_uuid=zerogpu_uuid) conversation.zerogpu_token = zerogpu_token if return_conversation: yield conversation if images is not None: data = FormData() for i in range(len(images)): images[i] = (to_bytes(images[i][0]), images[i][1]) for image, image_name in images: data.add_field(f"files", image, filename=image_name) async with session.post(f"{cls.api_url}/gradio_api/upload", params={"upload_id": session_hash}, data=data) as response: await raise_for_status(response) image_files = await response.json() images = [{ "path": image_file, "url": f"{cls.api_url}/gradio_api/file={image_file}", "orig_name": images[i][1], "size": len(images[i][0]), "mime_type": is_accepted_format(images[i][0]), "meta": { "_type": "gradio.FileData" } } for i, image_file in enumerate(image_files)] async with cls.run(method, session, prompt, conversation, None if images is None else images.pop(), seed) as response: await raise_for_status(response) async with cls.run("get", session, prompt, conversation, None, seed) as response: response: StreamResponse = response async for line in response.iter_lines(): decoded_line = line.decode(errors="replace") if decoded_line.startswith('data: '): try: json_data = json.loads(decoded_line[6:]) if json_data.get('msg') == 'log': yield Reasoning(status=json_data["log"]) if json_data.get('msg') == 'progress': if 'progress_data' in json_data and json_data['progress_data']: progress = json_data['progress_data'][0] yield Reasoning(status=f"{progress['desc']} {progress['index']}/{progress['length']}") if json_data.get('msg') == 'process_completed': if 'output' in json_data and 'error' in json_data['output']: raise ResponseError("Missing image input" if json_data['output']['error'] and "AttributeError" in json_data['output']['error'] else json_data['output']['error']) if 'output' in json_data and 'data' in json_data['output']: yield Reasoning(status="Finished") if "image" in json_data['output']['data'][0][0]: yield ImageResponse([image["image"]["url"] for image in json_data['output']['data'][0]], prompt) else: yield json_data['output']['data'][0] break except json.JSONDecodeError: debug.log("Could not parse JSON:", decoded_line) async def get_zerogpu_token(space: str, session: StreamSession, conversation: JsonConversation, cookies: Cookies = None): zerogpu_uuid = None if conversation is None else getattr(conversation, "zerogpu_uuid", None) zerogpu_token = "[object Object]" cookies = get_cookies("huggingface.co", raise_requirements_error=False) if cookies is None else cookies if zerogpu_uuid is None: async with session.get(f"https://huggingface.co/spaces/{space}", cookies=cookies) as response: match = re.search(r""token":"([^&]+?)"", await response.text()) if match: zerogpu_token = match.group(1) match = re.search(r""sessionUuid":"([^&]+?)"", await response.text()) if match: zerogpu_uuid = match.group(1) if cookies: # Get current UTC time + 10 minutes dt = (datetime.now(timezone.utc) + timedelta(minutes=10)).isoformat(timespec='milliseconds') encoded_dt = urllib.parse.quote(dt) async with session.get(f"https://huggingface.co/api/spaces/{space}/jwt?expiration={encoded_dt}&include_pro_status=true", cookies=cookies) as response: response_data = (await response.json()) if "token" in response_data: zerogpu_token = response_data["token"] return zerogpu_uuid, zerogpu_token