Part of [[Distillery Master]] The `distillery_master.py` script integrates with the RunPod API to send requests to the [[Distillery Worker]] for image generation. The main function responsible for this integration is `call_runpod`. ```python async def call_runpod(request_id, payload, command_args): try: for attempt in range(MAX_RUNPOD_ATTEMPTS): try: aws_manager = await AWSManager.get_instance() if isinstance(payload, str): payload = json.loads(payload) async with aiohttp.ClientSession() as session: endpoint = AsyncioEndpoint(command_args['ENDPOINT_ID'], session) job: AsyncioJob = await endpoint.run(payload) aws_manager.print_log(request_id, INSTANCE_IDENTIFIER, f"Call_Runpod: Runpod called successfully.", level='INFO') await asyncio.sleep(2) output = await job.output(timeout=RUNPOD_TIMEOUT) if isinstance(output, str): raise output aws_manager.print_log(request_id, INSTANCE_IDENTIFIER, f"Call_Runpod: Runpod output received. Output: {output}", level='INFO') return output except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() line_no = exc_traceback.tb_lineno formatted_exception = better_exceptions.format_exception(*sys.exc_info()) formatted_traceback = ''.join(formatted_exception) error_message = f'Unhandled error at line {line_no} (attempt {attempt+1}): {str(e)}. Formatted traceback: {formatted_traceback}' if attempt < MAX_RUNPOD_ATTEMPTS: aws_manager.print_log(request_id, INSTANCE_IDENTIFIER, error_message, level='WARNING') await asyncio.sleep(0.25) else: formatted_exception = better_exceptions.format_exception(*sys.exc_info()) formatted_traceback = ''.join(formatted_exception) aws_manager.print_log(request_id, INSTANCE_IDENTIFIER, formatted_traceback, level='ERROR') return [] except Exception as e: raise ``` ### Functionality The `call_runpod` function is an asynchronous function that sends a request to the [[RunPod]] API and returns the generated image list. Here's how it works: 1. The function takes three parameters: - `request_id`: The unique identifier for the request. - `payload`: The payload containing the request details. - `command_args`: Additional command arguments for the request. 2. Inside the function, there is a loop that attempts to call the [[RunPod]] API up to `MAX_RUNPOD_ATTEMPTS` times. 3. Within each attempt, the function does the following: - Retrieves an instance of the [[AWSManager]] class. - Checks if the `payload` is a string and parses it as [[JSON]] if necessary. - Creates an asynchronous HTTP client session using `aiohttp.ClientSession()`. - Creates an `AsyncioEndpoint` object with the specified `ENDPOINT_ID` and the client session. - Calls the `run` method of the `AsyncioEndpoint` object with the `payload` to send the request to the [[Distillery Worker]]. - Logs a success message using `aws_manager.print_log`. - Waits for a short duration using `asyncio.sleep(2)`. - Retrieves the output from the [[Distillery Worker]] using `job.output` with a specified `RUNPOD_TIMEOUT`. - Checks if the output is a string and raises it as an exception if necessary. - Logs the received output using `aws_manager.print_log`. - Returns the output. 4. If an exception occurs during the attempt: - The exception details are captured using `sys.exc_info()`. - The exception is formatted using `better_exceptions.format_exception` to provide a more readable traceback. - If the current attempt is less than `MAX_RUNPOD_ATTEMPTS`, the function logs a warning message and waits for a short duration before retrying. - If the current attempt is the last attempt, the function logs the formatted traceback as an error and returns an empty list. 5. If an exception occurs outside the attempt loop, it is re-raised. The `call_runpod` function handles the integration with the RunPod API, sends requests to the [[Distillery Worker]], and retrieves the generated image list. It includes error handling and retry mechanisms to handle potential issues during the API call.