Files
libpostal/scripts/geodata/address_formatting/formatter.py

405 lines
14 KiB
Python

# -*- coding: utf-8 -*-
import os
import pystache
import re
import six
import subprocess
import yaml
from geodata.address_formatting.aliases import Aliases
from geodata.text.tokenize import tokenize, tokenize_raw, token_types
from geodata.encoding import safe_decode
from collections import OrderedDict
from itertools import ifilter
FORMATTER_GIT_REPO = 'https://github.com/OpenCageData/address-formatting'
class AddressFormatter(object):
'''
Approximate Python port of lokku's Geo::Address::Formatter
Usage:
address_formatter = AddressFormatter()
components = {
'house': u'Anticafé',
'addr:housenumber': '2',
'addr:street': u'Calle de la Unión',
'addr:postcode': '28013',
'addr:city': u'Madrid',
}
address_formatter.format_address('es', components)
'''
whitespace_component_regex = re.compile('[\r\n]+[\s\r\n]*')
splitter = ' | '
separator_tag = 'SEP'
field_separator_tag = 'FSEP'
CATEGORY = 'category'
NEAR = 'near'
HOUSE = 'house'
HOUSE_NUMBER = 'house_number'
PO_BOX = 'po_box'
CARE_OF = 'care_of'
BLOCK = 'block'
BUILDING = 'building'
LEVEL = 'level'
UNIT = 'unit'
INTERSECTION = 'intersection'
ROAD = 'road'
SUBURB = 'suburb'
CITY_DISTRICT = 'city_district'
CITY = 'city'
ISLAND = 'island'
STATE = 'state'
STATE_DISTRICT = 'state_district'
POSTCODE = 'postcode'
COUNTRY = 'country'
address_formatter_fields = set([
CATEGORY,
NEAR,
HOUSE,
HOUSE_NUMBER,
PO_BOX,
CARE_OF,
BLOCK,
BUILDING,
LEVEL,
UNIT,
INTERSECTION,
ROAD,
SUBURB,
CITY,
CITY_DISTRICT,
ISLAND,
STATE,
STATE_DISTRICT,
POSTCODE,
COUNTRY,
])
aliases = Aliases(
OrderedDict([
('street', ROAD),
('street_name', ROAD),
('hamlet', CITY),
('village', CITY),
('neighborhood', SUBURB),
('neighbourhood', SUBURB),
('city_district', CITY_DISTRICT),
('county', STATE_DISTRICT),
('state_code', STATE),
('country_name', COUNTRY),
('postal_code', POSTCODE),
('post_code', POSTCODE),
])
)
template_address_parts = [HOUSE, HOUSE_NUMBER, ROAD]
template_admin_parts = [CITY, STATE, COUNTRY]
template_address_parts_re = re.compile('|'.join(['\{{{key}\}}'.format(key=key) for key in template_address_parts]))
template_admin_parts_re = re.compile('|'.join(['\{{{key}\}}'.format(key=key) for key in template_admin_parts]))
MINIMAL_COMPONENT_KEYS = [
(ROAD, HOUSE_NUMBER),
(ROAD, HOUSE),
(ROAD, POSTCODE)
]
def __init__(self, scratch_dir='/tmp', splitter=None):
if splitter is not None:
self.splitter = splitter
self.formatter_repo_path = os.path.join(scratch_dir, 'address-formatting')
self.clone_repo()
self.load_config()
def clone_repo(self):
subprocess.check_call(['rm', '-rf', self.formatter_repo_path])
subprocess.check_call(['git', 'clone', FORMATTER_GIT_REPO, self.formatter_repo_path])
def load_config(self):
config = yaml.load(open(os.path.join(self.formatter_repo_path,
'conf/countries/worldwide.yaml')))
for key, value in config.items():
if hasattr(value, 'items'):
address_template = value.get('address_template')
if address_template:
value['address_template'] = self.add_postprocessing_tags(address_template)
post_format_replacements = value.get('postformat_replace')
if post_format_replacements:
value['postformat_replace'] = [[pattern, replacement.replace('$', '\\')] for pattern, replacement in post_format_replacements]
else:
address_template = value
config[key] = self.add_postprocessing_tags(value)
self.config = config
def country_template(self, c):
return self.config.get(c, self.config['default'])
postprocessing_tags = [
(SUBURB, (ROAD,), (CITY_DISTRICT, CITY, ISLAND, STATE_DISTRICT, STATE, POSTCODE, COUNTRY)),
(CITY_DISTRICT, (ROAD, SUBURB), (CITY, ISLAND, STATE_DISTRICT, STATE)),
(STATE_DISTRICT, (SUBURB, CITY_DISTRICT, CITY, ISLAND), (STATE,)),
(STATE, (SUBURB, CITY_DISTRICT, CITY, ISLAND, STATE_DISTRICT), (COUNTRY,)),
]
template_tag_replacements = [
('county', STATE_DISTRICT),
]
def is_reverse(self, key, template):
address_parts_match = self.template_address_parts_re.search(template)
admin_parts_match = list(self.template_admin_parts_re.finditer(template))
if not address_parts_match:
raise ValueError('Template for {} does not contain any address parts'.format(key))
elif not admin_parts_match:
raise ValueError('Template for {} does not contain any admin parts'.format(key))
# last instance of city/state/country occurs before the first instance of house_number/road
return admin_parts_match[-1].start() < address_parts_match.start()
def build_first_of_template(self, keys):
""" For constructing """
return '{{{{#first}}}} {keys} {{{{/first}}}}'.format(keys=' || '.join(['{{{{{{{key}}}}}}}'.format(key=key) for key in keys]))
def insert_component(self, template, tag, before=(), after=(), separate=True, is_reverse=False):
if not before and not after:
return
tag_match = re.compile('\{{{key}\}}'.format(key=tag)).search(template)
if before:
before_match = re.compile('|'.join(['\{{{key}\}}'.format(key=key) for key in before])).search(template)
if before_match and tag_match and before_match.start() > tag_match.start():
return template
if after:
after_match = re.compile('|'.join(['\{{{key}\}}'.format(key=key) for key in after])).search(template)
if after_match and tag_match and tag_match.start() > after_match.start():
return template
before = set(before)
after = set(after)
key_added = False
skip_next_non_token = False
new_components = []
tag_token = '{{{{{{{key}}}}}}}'.format(key=tag)
parsed = pystache.parse(safe_decode(template))
num_tokens = len(parsed._parse_tree)
for i, el in enumerate(parsed._parse_tree):
if hasattr(el, 'parsed'):
keys = [e.key for e in el.parsed._parse_tree if hasattr(e, 'key')]
if set(keys) & before and not key_added:
token = new_components[-1] if new_components and '{' not in new_components[-1] else '\n'
new_components.extend([tag_token, token])
key_added = True
keys = [k for k in keys if self.aliases.get(k, k) != tag]
if keys:
new_components.append(self.build_first_of_template(keys))
else:
while new_components and '{' not in new_components[-1]:
new_components.pop()
continue
if set(keys) & after and not key_added:
token = '\n'
if i < num_tokens - 1 and isinstance(parsed._parse_tree[i + 1], six.string_types):
token = parsed._parse_tree[i + 1]
new_components.extend([token, tag_token])
key_added = True
elif hasattr(el, 'key'):
if el.key == tag:
skip_next_non_token = True
continue
if el.key in before and not key_added:
token = '\n'
if new_components and '{' not in new_components[-1]:
token = new_components[-1]
new_components.extend([tag_token, token])
key_added = True
new_components.append('{{{{{{{key}}}}}}}'.format(key=el.key))
if el.key in after and not key_added:
token = '\n'
if i < num_tokens - 1 and isinstance(parsed._parse_tree[i + 1], six.string_types):
token = parsed._parse_tree[i + 1]
new_components.extend([token, tag_token])
key_added = True
elif not skip_next_non_token:
new_components.append(el)
skip_next_non_token = False
return ''.join(new_components)
def add_postprocessing_tags(self, template):
is_reverse = self.is_reverse(template)
for key, pre_keys, post_keys in self.postprocessing_tags:
key_included = key in template
new_components = []
if key_included:
continue
for line in template.split('\n'):
pre_key = re.compile('|'.join(pre_keys)).search(line)
post_key = re.compile('|'.join(post_keys)).search(line)
if post_key and not pre_key and not key_included:
if not is_reverse:
new_components.append(u'{{{{{{{key}}}}}}}'.format(key=key))
key_included = True
new_components.append(line.rstrip('\n'))
if post_key and not pre_key and not key_included and is_reverse:
new_components.append(u'{{{{{{{key}}}}}}}'.format(key=key))
key_included = True
template = u'\n'.join(new_components)
return template
def render_template(self, template, components, tagged=False):
def render_first(text):
text = pystache.render(text, **components)
splits = (e.strip() for e in text.split('||'))
selected = next(ifilter(bool, splits), '')
return selected
output = pystache.render(template, first=render_first,
**components).strip()
values = self.whitespace_component_regex.split(output)
splitter = self.splitter if not tagged else ' {}/{} '.format(self.splitter.strip(), self.field_separator_tag)
values = [self.strip_component(val, tagged=tagged) for val in values]
output = splitter.join([
val for val in values if val.strip()
])
return output
def minimal_components(self, components):
for component_list in self.MINIMAL_COMPONENT_KEYS:
if all((c in components for c in component_list)):
return True
return False
def apply_replacements(self, template, components):
if not template.get('replace'):
return
for key in components.keys():
value = components[key]
for regex, replacement in template['replace']:
value = re.sub(regex, replacement, value)
components[key] = value
def post_replacements(self, template, text):
components = []
seen = set()
for component in text.split(self.splitter):
component = component.strip()
if component not in seen:
components.append(component)
seen.add(component)
text = self.splitter.join(components)
post_format_replacements = template.get('postformat_replace')
if post_format_replacements:
for regex, replacement in post_format_replacements:
text = re.sub(regex, replacement, text)
return text
def tag_template_separators(self, template):
template = re.sub(r'},', '}} ,/{} '.format(self.separator_tag), template)
template = re.sub(r'}-', '}} -/{} '.format(self.separator_tag), template)
template = re.sub(r' - ', ' -/{} '.format(self.separator_tag), template)
return template
def strip_component(self, value, tagged=False):
if not tagged:
comma = token_types.COMMA.value
hyphen = token_types.HYPHEN.value
start = end = 0
tokens = tokenize_raw(value.strip())
for token_start, token_length, token_type in tokens:
start = token_start
if token_type not in (comma, hyphen):
break
else:
start = token_start + token_length
for token_start, token_length, token_type in reversed(tokens):
end = token_start + token_length
if token_type not in (comma, hyphen):
break
else:
end = token_start
return value[start:end]
else:
start = end = 0
tokens = value.split()
separator_tag = self.separator_tag
for i, t in enumerate(tokens):
t, c = t.rsplit('/', 1)
start = i
if c != separator_tag:
break
else:
start = i + 1
num_tokens = len(tokens)
for j, t in enumerate(reversed(tokens)):
t, c = t.rsplit('/', 1)
end = num_tokens - j
if c != separator_tag:
break
else:
end = num_tokens - j - 1
return u' '.join(tokens[start:end])
def format_address(self, country, components,
minimal_only=True, tag_components=True, replace_aliases=True,
template_replacements=False):
template = self.config.get(country.upper())
if not template:
return None
template_text = template['address_template']
if replace_aliases:
self.replace_aliases(components)
if minimal_only and not self.minimal_components(components):
return None
if template_replacements:
self.apply_replacements(template, components)
if tag_components:
template_text = self.tag_template_separators(template_text)
components = {k: u' '.join([u'{}/{}'.format(t.replace(' ', ''), k.replace(' ', '_'))
for t, c in tokenize(v)])
for k, v in components.iteritems()}
text = self.render_template(template_text, components, tagged=tag_components)
text = self.post_replacements(template, text)
return text