| | |
| |
|
| | import logging |
| | from dataclasses import dataclass, field |
| | from typing import Dict, List, Optional, Set, Tuple |
| |
|
| | import numpy as np |
| |
|
| | from .parser import ( |
| | filter_area, |
| | filter_node, |
| | filter_way, |
| | match_to_group, |
| | parse_area, |
| | parse_node, |
| | parse_way, |
| | Patterns, |
| | ) |
| | from .reader import OSMData, OSMNode, OSMRelation, OSMWay |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def glue(ways: List[OSMWay]) -> List[List[OSMNode]]: |
| | result: List[List[OSMNode]] = [] |
| | to_process: Set[Tuple[OSMNode]] = set() |
| |
|
| | for way in ways: |
| | if way.is_cycle(): |
| | result.append(way.nodes) |
| | else: |
| | to_process.add(tuple(way.nodes)) |
| |
|
| | while to_process: |
| | nodes: List[OSMNode] = list(to_process.pop()) |
| | glued: Optional[List[OSMNode]] = None |
| | other_nodes: Optional[Tuple[OSMNode]] = None |
| |
|
| | for other_nodes in to_process: |
| | glued = try_to_glue(nodes, list(other_nodes)) |
| | if glued is not None: |
| | break |
| |
|
| | if glued is not None: |
| | to_process.remove(other_nodes) |
| | if is_cycle(glued): |
| | result.append(glued) |
| | else: |
| | to_process.add(tuple(glued)) |
| | else: |
| | result.append(nodes) |
| |
|
| | return result |
| |
|
| |
|
| | def is_cycle(nodes: List[OSMNode]) -> bool: |
| | """Is way a cycle way or an area boundary.""" |
| | return nodes[0] == nodes[-1] |
| |
|
| |
|
| | def try_to_glue(nodes: List[OSMNode], other: List[OSMNode]) -> Optional[List[OSMNode]]: |
| | """Create new combined way if ways share endpoints.""" |
| | if nodes[0] == other[0]: |
| | return list(reversed(other[1:])) + nodes |
| | if nodes[0] == other[-1]: |
| | return other[:-1] + nodes |
| | if nodes[-1] == other[-1]: |
| | return nodes + list(reversed(other[:-1])) |
| | if nodes[-1] == other[0]: |
| | return nodes + other[1:] |
| | return None |
| |
|
| |
|
| | def multipolygon_from_relation(rel: OSMRelation, osm: OSMData): |
| | inner_ways = [] |
| | outer_ways = [] |
| | for member in rel.members: |
| | if member.type_ == "way": |
| | if member.role == "inner": |
| | if member.ref in osm.ways: |
| | inner_ways.append(osm.ways[member.ref]) |
| | elif member.role == "outer": |
| | if member.ref in osm.ways: |
| | outer_ways.append(osm.ways[member.ref]) |
| | else: |
| | logger.warning(f'Unknown member role "{member.role}".') |
| | if outer_ways: |
| | inners_path = glue(inner_ways) |
| | outers_path = glue(outer_ways) |
| | return inners_path, outers_path |
| |
|
| |
|
| | @dataclass |
| | class MapElement: |
| | id_: int |
| | label: str |
| | group: str |
| | tags: Optional[Dict[str, str]] |
| |
|
| |
|
| | @dataclass |
| | class MapNode(MapElement): |
| | xy: np.ndarray |
| |
|
| | @classmethod |
| | def from_osm(cls, node: OSMNode, label: str, group: str): |
| | return cls( |
| | node.id_, |
| | label, |
| | group, |
| | node.tags, |
| | xy=node.xy, |
| | ) |
| |
|
| |
|
| | @dataclass |
| | class MapLine(MapElement): |
| | xy: np.ndarray |
| |
|
| | @classmethod |
| | def from_osm(cls, way: OSMWay, label: str, group: str): |
| | xy = np.stack([n.xy for n in way.nodes]) |
| | return cls( |
| | way.id_, |
| | label, |
| | group, |
| | way.tags, |
| | xy=xy, |
| | ) |
| |
|
| |
|
| | @dataclass |
| | class MapArea(MapElement): |
| | outers: List[np.ndarray] |
| | inners: List[np.ndarray] = field(default_factory=list) |
| |
|
| | @classmethod |
| | def from_relation(cls, rel: OSMRelation, label: str, group: str, osm: OSMData): |
| | outers_inners = multipolygon_from_relation(rel, osm) |
| | if outers_inners is None: |
| | return None |
| | outers, inners = outers_inners |
| | outers = [np.stack([n.xy for n in way]) for way in outers] |
| | inners = [np.stack([n.xy for n in way]) for way in inners] |
| | return cls( |
| | rel.id_, |
| | label, |
| | group, |
| | rel.tags, |
| | outers=outers, |
| | inners=inners, |
| | ) |
| |
|
| | @classmethod |
| | def from_way(cls, way: OSMWay, label: str, group: str): |
| | xy = np.stack([n.xy for n in way.nodes]) |
| | return cls( |
| | way.id_, |
| | label, |
| | group, |
| | way.tags, |
| | outers=[xy], |
| | ) |
| |
|
| |
|
| | class MapData: |
| | def __init__(self): |
| | self.nodes: Dict[int, MapNode] = {} |
| | self.lines: Dict[int, MapLine] = {} |
| | self.areas: Dict[int, MapArea] = {} |
| |
|
| | @classmethod |
| | def from_osm(cls, osm: OSMData): |
| | self = cls() |
| |
|
| | for node in filter(filter_node, osm.nodes.values()): |
| | label = parse_node(node.tags) |
| | if label is None: |
| | continue |
| | group = match_to_group(label, Patterns.nodes) |
| | if group is None: |
| | group = match_to_group(label, Patterns.ways) |
| | if group is None: |
| | continue |
| | self.nodes[node.id_] = MapNode.from_osm(node, label, group) |
| |
|
| | for way in filter(filter_way, osm.ways.values()): |
| | label = parse_way(way.tags) |
| | if label is None: |
| | continue |
| | group = match_to_group(label, Patterns.ways) |
| | if group is None: |
| | group = match_to_group(label, Patterns.nodes) |
| | if group is None: |
| | continue |
| | self.lines[way.id_] = MapLine.from_osm(way, label, group) |
| |
|
| | for area in filter(filter_area, osm.ways.values()): |
| | label = parse_area(area.tags) |
| | if label is None: |
| | continue |
| | group = match_to_group(label, Patterns.areas) |
| | if group is None: |
| | group = match_to_group(label, Patterns.ways) |
| | if group is None: |
| | group = match_to_group(label, Patterns.nodes) |
| | if group is None: |
| | continue |
| | self.areas[area.id_] = MapArea.from_way(area, label, group) |
| |
|
| | for rel in osm.relations.values(): |
| | if rel.tags.get("type") != "multipolygon": |
| | continue |
| | label = parse_area(rel.tags) |
| | if label is None: |
| | continue |
| | group = match_to_group(label, Patterns.areas) |
| | if group is None: |
| | group = match_to_group(label, Patterns.ways) |
| | if group is None: |
| | group = match_to_group(label, Patterns.nodes) |
| | if group is None: |
| | continue |
| | area = MapArea.from_relation(rel, label, group, osm) |
| | assert rel.id_ not in self.areas |
| | if area is not None: |
| | self.areas[rel.id_] = area |
| |
|
| | return self |
| |
|