from __future__ import annotations import json import uuid import re import random from datetime import datetime, timezone, timedelta import urllib.parse from ...typing import AsyncResult, Messages, Cookies, ImagesType from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin from ..helper import format_prompt, format_image_prompt from ...providers.response import JsonConversation, ImageResponse, Reasoning from ...requests.aiohttp import StreamSession, StreamResponse, FormData from ...requests.raise_for_status import raise_for_status from ...image import to_bytes, is_accepted_format from ...cookies import get_cookies from ...errors import ResponseError from ... import debug from .raise_for_status import raise_for_status class Janus_Pro_7B(AsyncGeneratorProvider, ProviderModelMixin): space = "deepseek-ai/Janus-Pro-7B" url = f"https://huggingface.co/spaces/{space}" api_url = "https://deepseek-ai-janus-pro-7b.hf.space" referer = f"{api_url}?__theme=light" working = True supports_stream = True supports_system_message = True supports_message_history = True default_model = "janus-pro-7b" default_image_model = "janus-pro-7b-image" default_vision_model = default_model models = [default_model, default_image_model] image_models = [default_image_model] @classmethod def run(cls, method: str, session: StreamSession, prompt: str, conversation: JsonConversation, image: dict = None, seed: int = 0): headers = { "content-type": "application/json", "x-zerogpu-token": conversation.zerogpu_token, "x-zerogpu-uuid": conversation.zerogpu_uuid, "referer": cls.referer, } if method == "post": return session.post(f"{cls.api_url}/gradio_api/queue/join?__theme=light", **{ "headers": {k: v for k, v in headers.items() if v is not None}, "json": {"data":[image,prompt,seed,0.95,0.1],"event_data":None,"fn_index":2,"trigger_id":10,"session_hash":conversation.session_hash}, }) elif method == "image": return session.post(f"{cls.api_url}/gradio_api/queue/join?__theme=light", **{ "headers": {k: v for k, v in headers.items() if v is not None}, "json": {"data":[prompt,seed,5,1],"event_data":None,"fn_index":3,"trigger_id":20,"session_hash":conversation.session_hash}, }) return session.get(f"{cls.api_url}/gradio_api/queue/data?session_hash={conversation.session_hash}", **{ "headers": { "accept": "text/event-stream", "content-type": "application/json", "referer": cls.referer, } }) @classmethod async def create_async_generator( cls, model: str, messages: Messages, images: ImagesType = None, prompt: str = None, proxy: str = None, cookies: Cookies = None, zerogpu_token: str = None, zerogpu_uuid: str = "[object Object]", return_conversation: bool = False, conversation: JsonConversation = None, seed: int = None, **kwargs ) -> AsyncResult: def generate_session_hash(): """Generate a unique session hash.""" return str(uuid.uuid4()).replace('-', '')[:12] method = "post" if model == cls.default_image_model or prompt is not None: method = "image" prompt = format_prompt(messages) if prompt is None and conversation is None else prompt prompt = format_image_prompt(messages, prompt) if seed is None: seed = random.randint(1000, 999999) session_hash = generate_session_hash() if conversation is None else getattr(conversation, "session_hash") async with StreamSession(proxy=proxy, impersonate="chrome") as session: session_hash = generate_session_hash() if conversation is None else getattr(conversation, "session_hash") if zerogpu_token is None: zerogpu_uuid, zerogpu_token = await get_zerogpu_token(cls.space, session, conversation, cookies) if conversation is None or not hasattr(conversation, "session_hash"): conversation = JsonConversation(session_hash=session_hash, zerogpu_token=zerogpu_token, zerogpu_uuid=zerogpu_uuid) conversation.zerogpu_token = zerogpu_token if return_conversation: yield conversation if images is not None: data = FormData() for i in range(len(images)): images[i] = (to_bytes(images[i][0]), images[i][1]) for image, image_name in images: data.add_field(f"files", image, filename=image_name) async with session.post(f"{cls.api_url}/gradio_api/upload", params={"upload_id": session_hash}, data=data) as response: await raise_for_status(response) image_files = await response.json() images = [{ "path": image_file, "url": f"{cls.api_url}/gradio_api/file={image_file}", "orig_name": images[i][1], "size": len(images[i][0]), "mime_type": is_accepted_format(images[i][0]), "meta": { "_type": "gradio.FileData" } } for i, image_file in enumerate(image_files)] async with cls.run(method, session, prompt, conversation, None if images is None else images.pop(), seed) as response: await raise_for_status(response) async with cls.run("get", session, prompt, conversation, None, seed) as response: response: StreamResponse = response counter = 3 async for line in response.iter_lines(): decoded_line = line.decode(errors="replace") if decoded_line.startswith('data: '): try: json_data = json.loads(decoded_line[6:]) if json_data.get('msg') == 'log': yield Reasoning(status=json_data["log"]) if json_data.get('msg') == 'progress': if 'progress_data' in json_data: if json_data['progress_data']: progress = json_data['progress_data'][0] yield Reasoning(status=f"{progress['desc']} {progress['index']}/{progress['length']}") else: yield Reasoning(status=f"Generating") elif json_data.get('msg') == 'heartbeat': yield Reasoning(status=f"Generating{''.join(['.' for i in range(counter)])}") counter += 1 elif json_data.get('msg') == 'process_completed': if 'output' in json_data and 'error' in json_data['output']: json_data['output']['error'] = json_data['output']['error'].split("