|
| 1 | +import base64 |
| 2 | +import math |
| 3 | +import re |
| 4 | +from io import BytesIO # pylint: disable=no-name-in-module |
| 5 | +from typing import Optional, Tuple |
| 6 | + |
| 7 | +import aiohttp |
| 8 | +import markdown |
| 9 | +from aiocache import cached |
| 10 | +from bs4 import BeautifulSoup |
| 11 | +from PIL import Image |
| 12 | + |
| 13 | + |
| 14 | +class MarkdownImageEncoder: # pylint: disable=too-few-public-methods |
| 15 | + def __init__(self, markdown_data: str, resource_url: Optional[str] = None) -> None: |
| 16 | + self.markdown_data = markdown_data |
| 17 | + self.resource_url = resource_url + "/" if resource_url else None |
| 18 | + |
| 19 | + @cached(ttl=3600, namespace="image_encoder") |
| 20 | + async def _fetch_resource(self, url: str) -> Tuple[bytes, str]: |
| 21 | + async with aiohttp.ClientSession() as session: |
| 22 | + async with session.get(url) as resp: |
| 23 | + resp.raise_for_status() |
| 24 | + return await resp.read(), resp.headers["Content-Type"] |
| 25 | + |
| 26 | + @cached(ttl=3600, namespace="image_encoder") |
| 27 | + async def _fetch_resource_and_compress(self, url: str) -> Tuple[bytes, str]: |
| 28 | + data, mime_type = await self._fetch_resource(url) |
| 29 | + |
| 30 | + max_size_bytes = 200 * 1024 |
| 31 | + if len(data) <= max_size_bytes: |
| 32 | + return data, mime_type |
| 33 | + |
| 34 | + if mime_type not in ["image/jpeg", "image/png", "image/webp"]: |
| 35 | + raise ValueError("Data is to big and not compressible") |
| 36 | + |
| 37 | + image = Image.open(BytesIO(data)) |
| 38 | + output = BytesIO() |
| 39 | + |
| 40 | + def get_scaling_factor(kilo_bytes: int) -> float: |
| 41 | + return math.sqrt(kilo_bytes / 150) |
| 42 | + |
| 43 | + # Resize the image keeping the aspect ratio and based on how big the image is |
| 44 | + scaling = get_scaling_factor(int(len(data) / 1024)) |
| 45 | + image.thumbnail((int(image.width / scaling), int(image.height / scaling))) |
| 46 | + |
| 47 | + if image.format == "JPEG": |
| 48 | + image.save(output, format=image.format, quality=40, optimize=True) |
| 49 | + else: |
| 50 | + image.save(output, format=image.format, optimize=True) |
| 51 | + |
| 52 | + return output.getvalue(), mime_type |
| 53 | + |
| 54 | + async def _convert_image_to_base64(self, url: str) -> str: |
| 55 | + try: |
| 56 | + data, mime_type = await self._fetch_resource_and_compress(url) |
| 57 | + |
| 58 | + if not mime_type.startswith("image/"): |
| 59 | + return url |
| 60 | + |
| 61 | + image_data = base64.b64encode(data).decode("utf-8") |
| 62 | + return f"data:{mime_type};base64,{image_data}" |
| 63 | + except Exception: # pylint: disable=broad-except |
| 64 | + return url |
| 65 | + |
| 66 | + async def _process_html_images(self, html: str) -> str: |
| 67 | + soup = BeautifulSoup(html, "html.parser") |
| 68 | + for img in soup.find_all("img"): |
| 69 | + img_url = img["src"] |
| 70 | + base64_image = await self._convert_image_to_base64( |
| 71 | + img_url if img_url.startswith(("http://", "https://")) else self.resource_url + img_url |
| 72 | + ) |
| 73 | + if base64_image: |
| 74 | + img["src"] = base64_image |
| 75 | + return str(soup) |
| 76 | + |
| 77 | + async def _process_markdown_images(self, markdown_text: str) -> str: |
| 78 | + markdown_image_pattern = r"!\[.*?\]\((.*?)\)" |
| 79 | + matches = re.findall(markdown_image_pattern, markdown_text) |
| 80 | + for match in matches: |
| 81 | + base64_image = await self._convert_image_to_base64( |
| 82 | + match if match.startswith(("http://", "https://")) else self.resource_url + match |
| 83 | + ) |
| 84 | + if base64_image: |
| 85 | + markdown_text = markdown_text.replace(match, base64_image) |
| 86 | + return str(markdown_text) |
| 87 | + |
| 88 | + async def get_processed_markdown(self) -> str: |
| 89 | + html = markdown.markdown(self.markdown_data) |
| 90 | + |
| 91 | + return str(await self._process_markdown_images(await self._process_html_images(html))) |
0 commit comments