Skip to content

Commit

Permalink
Restore map_plugin.py to version from commit: 35d3f80
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremiah-k committed Dec 24, 2024
1 parent e030217 commit b012f71
Showing 1 changed file with 70 additions and 102 deletions.
172 changes: 70 additions & 102 deletions plugins/map_plugin.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import io
import staticmaps
import s2sphere
import math
import os
import random
import io
import re

import s2sphere
import staticmaps
from PIL import Image
from nio import AsyncClient, UploadResponse
from PIL import Image, ImageFont

from log_utils import get_logger
from plugins.base_plugin import BasePlugin

logger = get_logger(__name__)


class TextLabel(staticmaps.Object):
def __init__(self, latlng: s2sphere.LatLng, text: str, fontSize: int = 12) -> None:
Expand Down Expand Up @@ -41,41 +35,7 @@ def render_pillow(self, renderer: staticmaps.PillowRenderer) -> None:
x, y = renderer.transformer().ll2pixel(self.latlng())
x = x + renderer.offset_x()

# Attempt to load a font that supports emojis
font_paths = [
"/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", # Common on Linux
"/usr/share/fonts/truetype/noto/NotoEmoji-Regular.ttf",
"/usr/share/fonts/truetype/noto/NotoEmoji-Bold.ttf",
"/System/Library/Fonts/Apple Color Emoji.ttf", # macOS
"C:\\Windows\\Fonts\\seguiemj.ttf", # Windows Segoe UI Emoji
]

font = None
for path in font_paths:
if os.path.isfile(path):
try:
font = ImageFont.truetype(path, self._font_size)
break
except OSError:
logger.warning(f"Failed to load font from {path}")
pass

if not font:
# If emoji font not found, use default font
font = ImageFont.load_default()
self._text = self._text.encode("ascii", "ignore").decode(
"ascii"
) # Remove non-ASCII characters

# Get the size of the text using textsize or textbbox
try:
bbox = renderer.draw().textbbox((0, 0), self._text, font=font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
except Exception:
# Fallback in case of an error
tw, th = renderer.draw().textsize(self._text, font=font)

tw, th = renderer.draw().textsize(self._text)
w = max(self._arrow, tw + 2 * self._margin)
h = th + 2 * self._margin

Expand All @@ -95,17 +55,57 @@ def render_pillow(self, renderer: staticmaps.PillowRenderer) -> None:
(x - tw / 2, y - self._arrow - h / 2 - th / 2),
self._text,
fill=(0, 0, 0, 255),
font=font,
)

def render_cairo(self, renderer: staticmaps.CairoRenderer) -> None:
# Since Cairo is not being used, we can leave this method empty
pass
x, y = renderer.transformer().ll2pixel(self.latlng())

ctx = renderer.context()
ctx.select_font_face("Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL)

ctx.set_font_size(self._font_size)
x_bearing, y_bearing, tw, th, _, _ = ctx.text_extents(self._text)

w = max(self._arrow, tw + 2 * self._margin)
h = th + 2 * self._margin

path = [
(x, y),
(x + self._arrow / 2, y - self._arrow),
(x + w / 2, y - self._arrow),
(x + w / 2, y - self._arrow - h),
(x - w / 2, y - self._arrow - h),
(x - w / 2, y - self._arrow),
(x - self._arrow / 2, y - self._arrow),
]

ctx.set_source_rgb(1, 1, 1)
ctx.new_path()
for p in path:
ctx.line_to(*p)
ctx.close_path()
ctx.fill()

ctx.set_source_rgb(1, 0, 0)
ctx.set_line_width(1)
ctx.new_path()
for p in path:
ctx.line_to(*p)
ctx.close_path()
ctx.stroke()

ctx.set_source_rgb(0, 0, 0)
ctx.set_line_width(1)
ctx.move_to(
x - tw / 2 - x_bearing, y - self._arrow - h / 2 - y_bearing - th / 2
)
ctx.show_text(self._text)
ctx.stroke()

def render_svg(self, renderer: staticmaps.SvgRenderer) -> None:
x, y = renderer.transformer().ll2pixel(self.latlng())

# Guess text extents
# guess text extents
tw = len(self._text) * self._font_size * 0.5
th = self._font_size * 1.2

Expand Down Expand Up @@ -143,11 +143,9 @@ def render_svg(self, renderer: staticmaps.SvgRenderer) -> None:

def anonymize_location(lat, lon, radius=1000):
# Generate random offsets for latitude and longitude
# Convert latitude to radians for math.cos()
lat_rad = math.radians(lat)
lat_offset = random.uniform(-radius / 111320, radius / 111320)
lon_offset = random.uniform(
-radius / (111320 * math.cos(lat_rad)), radius / (111320 * math.cos(lat_rad))
-radius / (111320 * math.cos(lat)), radius / (111320 * math.cos(lat))
)

# Apply the offsets to the location coordinates
Expand Down Expand Up @@ -179,67 +177,41 @@ def get_map(locations, zoom=None, image_size=None, anonymize=True, radius=10000)
)
context.add_object(TextLabel(radio, location["label"], fontSize=50))

# Render non-anti-aliased PNG
# render non-anti-aliased png
if image_size:
return context.render_pillow(image_size[0], image_size[1])
else:
return context.render_pillow(1000, 1000)


async def upload_image(client: AsyncClient, image: Image.Image):
async def upload_image(client: AsyncClient, image: Image.Image) -> UploadResponse:
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_data = buffer.getvalue()
buffer.seek(0) # Reset buffer to the beginning

response, maybe_keys = await client.upload(
buffer,
io.BytesIO(image_data),
content_type="image/png",
filename="location.png",
filesize=len(image_data),
)

# Get image dimensions
width, height = image.size

return response, len(image_data), width, height
return response


async def send_room_image(
client: AsyncClient,
room_id: str,
upload_response: UploadResponse,
image_size: int,
width: int,
height: int,
client: AsyncClient, room_id: str, upload_response: UploadResponse
):
await client.room_send(
response = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.image",
"body": "location.png",
"url": upload_response.content_uri,
"info": {
"mimetype": "image/png",
"size": image_size,
"w": width,
"h": height,
},
},
content={"msgtype": "m.image", "url": upload_response.content_uri, "body": ""},
)


async def send_image(client: AsyncClient, room_id: str, image: Image.Image):
response, image_size, width, height = await upload_image(client=client, image=image)
await send_room_image(
client,
room_id,
upload_response=response,
image_size=image_size,
width=width,
height=height,
)
response = await upload_image(client=client, image=image)
await send_room_image(client, room_id, upload_response=response)


class Plugin(BasePlugin):
Expand All @@ -248,7 +220,7 @@ class Plugin(BasePlugin):
@property
def description(self):
return (
"Map of mesh radio nodes. Supports `zoom` and `size` options to customize"
f"Map of mesh radio nodes. Supports `zoom` and `size` options to customize"
)

async def handle_meshtastic_message(
Expand All @@ -263,8 +235,8 @@ def get_mesh_commands(self):
return []

async def handle_room_message(self, room, event, full_message):
# Pass the event to matches()
if not self.matches(event):
full_message = full_message.strip()
if not self.matches(full_message):
return False

from matrix_utils import connect_matrix
Expand All @@ -285,25 +257,25 @@ async def handle_room_message(self, room, event, full_message):

try:
zoom = int(zoom)
except ValueError:
zoom = self.config.get("zoom", 13)
except:
zoom = self.config["zoom"] if "zoom" in self.config else 8

if zoom < 0 or zoom > 30:
zoom = 8

try:
image_size = (int(image_size[0]), int(image_size[1]))
except (ValueError, TypeError):
except:
image_size = (
self.config.get("image_width", 1000),
self.config.get("image_height", 1000),
self.config["image_width"] if "image_width" in self.config else 1000,
self.config["image_height"] if "image_height" in self.config else 1000,
)

if image_size[0] > 1000 or image_size[1] > 1000:
image_size = (1000, 1000)

locations = []
for _node, info in meshtastic_client.nodes.items():
for node, info in meshtastic_client.nodes.items():
if "position" in info and "latitude" in info["position"]:
locations.append(
{
Expand All @@ -313,12 +285,8 @@ async def handle_room_message(self, room, event, full_message):
}
)

if not locations:
await self.send_matrix_message(room.room_id, "No node locations available.")
return True

anonymize = self.config.get("anonymize", True)
radius = self.config.get("radius", 1000)
anonymize = self.config["anonymize"] if "anonymize" in self.config else True
radius = self.config["radius"] if "radius" in self.config else 1000

pillow_image = get_map(
locations=locations,
Expand Down

0 comments on commit b012f71

Please sign in to comment.