349 lines
12 KiB
Python
349 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import pystache
|
|
import re
|
|
import subprocess
|
|
import yaml
|
|
|
|
from postal.text.tokenize import tokenize, tokenize_raw, token_types
|
|
from collections import OrderedDict, defaultdict
|
|
from itertools import ifilter
|
|
|
|
FORMATTER_GIT_REPO = 'https://github.com/OpenCageData/address-formatting'
|
|
|
|
|
|
class AddressFormatter(object):
|
|
'''
|
|
Approximate Python port of lokku's Geo::Address::Formatter
|
|
|
|
Usage:
|
|
address_formatter = AddressFormatter()
|
|
components = {
|
|
'house': u'Anticafé',
|
|
'addr:housenumber': '2',
|
|
'addr:street': u'Calle de la Unión',
|
|
'addr:postcode': '28013',
|
|
'addr:city': u'Madrid',
|
|
}
|
|
address_formatter.format_address('es', components)
|
|
'''
|
|
|
|
whitespace_component_regex = re.compile('[\r\n]+[\s\r\n]*')
|
|
|
|
splitter = ' | '
|
|
|
|
separator_tag = 'SEP'
|
|
field_separator_tag = 'FSEP'
|
|
|
|
HOUSE = 'house'
|
|
HOUSE_NUMBER = 'house_number'
|
|
ROAD = 'road'
|
|
SUBURB = 'suburb'
|
|
CITY_DISTRICT = 'city_district'
|
|
CITY = 'city'
|
|
STATE = 'state'
|
|
STATE_DISTRICT = 'state_district'
|
|
POSTCODE = 'postcode'
|
|
COUNTRY = 'country'
|
|
|
|
address_formatter_fields = set([
|
|
HOUSE,
|
|
HOUSE_NUMBER,
|
|
ROAD,
|
|
SUBURB,
|
|
CITY,
|
|
CITY_DISTRICT,
|
|
STATE,
|
|
STATE_DISTRICT,
|
|
POSTCODE,
|
|
COUNTRY,
|
|
])
|
|
|
|
aliases = OrderedDict([
|
|
('name', HOUSE),
|
|
('addr:housename', HOUSE),
|
|
('addr:housenumber', HOUSE_NUMBER),
|
|
('addr:house_number', HOUSE_NUMBER),
|
|
('addr:street', ROAD),
|
|
('addr:city', CITY),
|
|
('is_in:city', CITY),
|
|
('addr:locality', CITY),
|
|
('is_in:locality', CITY),
|
|
('addr:municipality', CITY),
|
|
('is_in:municipality', CITY),
|
|
('addr:hamlet', CITY),
|
|
('is_in:hamlet', CITY),
|
|
('addr:suburb', SUBURB),
|
|
('is_in:suburb', SUBURB),
|
|
('addr:neighbourhood', SUBURB),
|
|
('is_in:neighbourhood', SUBURB),
|
|
('addr:neighborhood', SUBURB),
|
|
('is_in:neighborhood', SUBURB),
|
|
('addr:district', STATE_DISTRICT),
|
|
('is_in:district', STATE_DISTRICT),
|
|
('addr:state', STATE),
|
|
('is_in:state', STATE),
|
|
('addr:province', STATE),
|
|
('is_in:province', STATE),
|
|
('addr:region', STATE),
|
|
('is_in:region', STATE),
|
|
('addr:postal_code', POSTCODE),
|
|
('addr:postcode', POSTCODE),
|
|
('addr:country', COUNTRY),
|
|
('addr:country_code', COUNTRY),
|
|
('country_code', COUNTRY),
|
|
('is_in:country_code', COUNTRY),
|
|
('is_in:country', COUNTRY),
|
|
('street', ROAD),
|
|
('street_name', ROAD),
|
|
('residential', ROAD),
|
|
('hamlet', CITY),
|
|
('neighborhood', SUBURB),
|
|
('neighbourhood', SUBURB),
|
|
('city_district', CITY_DISTRICT),
|
|
('county', STATE_DISTRICT),
|
|
('state_code', STATE),
|
|
('country_name', COUNTRY),
|
|
('postal_code', POSTCODE),
|
|
('post_code', POSTCODE),
|
|
])
|
|
|
|
prioritized_aliases = {k: i for i, k in enumerate(aliases)}
|
|
|
|
MINIMAL_COMPONENT_KEYS = [
|
|
(ROAD, HOUSE_NUMBER),
|
|
(ROAD, HOUSE),
|
|
(ROAD, POSTCODE)
|
|
]
|
|
|
|
def __init__(self, scratch_dir='/tmp', splitter=None):
|
|
if splitter is not None:
|
|
self.splitter = splitter
|
|
|
|
self.formatter_repo_path = os.path.join(scratch_dir, 'address-formatting')
|
|
self.clone_repo()
|
|
self.load_config()
|
|
|
|
def clone_repo(self):
|
|
subprocess.check_call(['rm', '-rf', self.formatter_repo_path])
|
|
subprocess.check_call(['git', 'clone', FORMATTER_GIT_REPO, self.formatter_repo_path])
|
|
|
|
def load_config(self):
|
|
config = yaml.load(open(os.path.join(self.formatter_repo_path,
|
|
'conf/countries/worldwide.yaml')))
|
|
for key, value in config.items():
|
|
if hasattr(value, 'items'):
|
|
address_template = value.get('address_template')
|
|
if address_template:
|
|
value['address_template'] = self.add_postprocessing_tags(address_template)
|
|
|
|
post_format_replacements = value.get('postformat_replace')
|
|
if post_format_replacements:
|
|
value['postformat_replace'] = [[pattern, replacement.replace('$', '\\')] for pattern, replacement in post_format_replacements]
|
|
else:
|
|
address_template = value
|
|
config[key] = self.add_postprocessing_tags(value)
|
|
self.config = config
|
|
|
|
def component_aliases(self):
|
|
self.aliases = OrderedDict()
|
|
self.aliases.update(self.osm_aliases)
|
|
components = yaml.load_all(open(os.path.join(self.formatter_repo_path,
|
|
'conf', 'components.yaml')))
|
|
for c in components:
|
|
name = c['name']
|
|
for a in c.get('aliases', []):
|
|
self.aliases[a] = name
|
|
|
|
def key_priority(self, key):
|
|
return self.prioritized_aliases.get(key, len(self.prioritized_aliases))
|
|
|
|
def replace_aliases(self, components):
|
|
replacements = defaultdict(list)
|
|
values = {}
|
|
for k in components.keys():
|
|
new_key = self.aliases.get(k)
|
|
if new_key and new_key not in components:
|
|
value = components.pop(k)
|
|
values[k] = value
|
|
replacements[new_key].append(k)
|
|
for key, source_keys in replacements.iteritems():
|
|
source_keys.sort(key=self.key_priority)
|
|
value = values[source_keys[0]]
|
|
components[key] = value
|
|
|
|
def country_template(self, c):
|
|
return self.config.get(c, self.config['default'])
|
|
|
|
postprocessing_tags = [
|
|
(SUBURB, (ROAD,), (CITY_DISTRICT, CITY, STATE_DISTRICT, STATE, POSTCODE, COUNTRY)),
|
|
(CITY_DISTRICT, (ROAD, SUBURB), (CITY, STATE_DISTRICT, STATE)),
|
|
(STATE_DISTRICT, (SUBURB, CITY_DISTRICT, CITY), (STATE,))
|
|
]
|
|
|
|
template_tag_replacements = [
|
|
('county', STATE_DISTRICT),
|
|
]
|
|
|
|
def add_postprocessing_tags(self, template):
|
|
is_reverse = False
|
|
if self.COUNTRY in template and self.ROAD in template:
|
|
is_reverse = template.index(self.COUNTRY) < template.index(self.ROAD)
|
|
elif self.STATE in template and self.ROAD in template:
|
|
is_reverse = template.index(self.STATE) < template.index(self.ROAD)
|
|
else:
|
|
raise ValueError('Template did not contain road and {state, country}')
|
|
|
|
for key, pre_keys, post_keys in self.postprocessing_tags:
|
|
key_included = key in template
|
|
new_components = []
|
|
if key_included:
|
|
continue
|
|
|
|
for line in template.split('\n'):
|
|
pre_key = re.compile('|'.join(pre_keys)).search(line)
|
|
post_key = re.compile('|'.join(post_keys)).search(line)
|
|
if post_key and not pre_key and not key_included:
|
|
if not is_reverse:
|
|
new_components.append(u'{{{{{{{key}}}}}}}'.format(key=key))
|
|
key_included = True
|
|
new_components.append(line.rstrip('\n'))
|
|
if post_key and not pre_key and not key_included and is_reverse:
|
|
new_components.append(u'{{{{{{{key}}}}}}}'.format(key=key))
|
|
key_included = True
|
|
template = u'\n'.join(new_components)
|
|
return template
|
|
|
|
def render_template(self, template, components, tagged=False):
|
|
def render_first(text):
|
|
text = pystache.render(text, **components)
|
|
splits = (e.strip() for e in text.split('||'))
|
|
selected = next(ifilter(bool, splits), '')
|
|
return selected
|
|
|
|
output = pystache.render(template, first=render_first,
|
|
**components).strip()
|
|
|
|
values = self.whitespace_component_regex.split(output)
|
|
|
|
splitter = self.splitter if not tagged else ' {}/{} '.format(self.splitter.strip(), self.field_separator_tag)
|
|
|
|
values = [self.strip_component(val, tagged=tagged) for val in values]
|
|
|
|
output = splitter.join([
|
|
val for val in values if val.strip()
|
|
])
|
|
|
|
return output
|
|
|
|
def minimal_components(self, components):
|
|
for component_list in self.MINIMAL_COMPONENT_KEYS:
|
|
if all((c in components for c in component_list)):
|
|
return True
|
|
return False
|
|
|
|
def apply_replacements(self, template, components):
|
|
if not template.get('replace'):
|
|
return
|
|
for key in components.keys():
|
|
value = components[key]
|
|
for regex, replacement in template['replace']:
|
|
value = re.sub(regex, replacement, value)
|
|
components[key] = value
|
|
|
|
def post_replacements(self, template, text):
|
|
components = []
|
|
seen = set()
|
|
for component in text.split(self.splitter):
|
|
component = component.strip()
|
|
if component not in seen:
|
|
components.append(component)
|
|
seen.add(component)
|
|
text = self.splitter.join(components)
|
|
post_format_replacements = template.get('postformat_replace')
|
|
if post_format_replacements:
|
|
for regex, replacement in post_format_replacements:
|
|
text = re.sub(regex, replacement, text)
|
|
return text
|
|
|
|
def tag_template_separators(self, template):
|
|
template = re.sub(r'},', '}} ,/{} '.format(self.separator_tag), template)
|
|
template = re.sub(r'}-', '}} -/{} '.format(self.separator_tag), template)
|
|
template = re.sub(r' - ', ' -/{} '.format(self.separator_tag), template)
|
|
return template
|
|
|
|
def strip_component(self, value, tagged=False):
|
|
if not tagged:
|
|
comma = token_types.COMMA.value
|
|
hyphen = token_types.HYPHEN.value
|
|
|
|
start = end = 0
|
|
tokens = tokenize_raw(value.strip())
|
|
for token_start, token_length, token_type in tokens:
|
|
start = token_start
|
|
if token_type not in (comma, hyphen):
|
|
break
|
|
else:
|
|
start = token_start + token_length
|
|
|
|
for token_start, token_length, token_type in reversed(tokens):
|
|
end = token_start + token_length
|
|
if token_type not in (comma, hyphen):
|
|
break
|
|
else:
|
|
end = token_start
|
|
|
|
return value[start:end]
|
|
else:
|
|
start = end = 0
|
|
tokens = value.split()
|
|
|
|
separator_tag = self.separator_tag
|
|
|
|
for i, t in enumerate(tokens):
|
|
t, c = t.rsplit('/', 1)
|
|
start = i
|
|
if c != separator_tag:
|
|
break
|
|
else:
|
|
start = i + 1
|
|
|
|
num_tokens = len(tokens)
|
|
|
|
for j, t in enumerate(reversed(tokens)):
|
|
t, c = t.rsplit('/', 1)
|
|
end = num_tokens - j
|
|
if c != separator_tag:
|
|
break
|
|
else:
|
|
end = num_tokens - j - 1
|
|
|
|
return u' '.join(tokens[start:end])
|
|
|
|
def format_address(self, country, components,
|
|
minimal_only=True, tag_components=True, replace_aliases=True,
|
|
template_replacements=False):
|
|
template = self.config.get(country.upper())
|
|
if not template:
|
|
return None
|
|
template_text = template['address_template']
|
|
if replace_aliases:
|
|
self.replace_aliases(components)
|
|
|
|
if minimal_only and not self.minimal_components(components):
|
|
return None
|
|
|
|
if template_replacements:
|
|
self.apply_replacements(template, components)
|
|
|
|
if tag_components:
|
|
template_text = self.tag_template_separators(template_text)
|
|
components = {k: u' '.join([u'{}/{}'.format(t.replace(' ', ''), k.replace(' ', '_'))
|
|
for t, c in tokenize(v)])
|
|
for k, v in components.iteritems()}
|
|
|
|
text = self.render_template(template_text, components, tagged=tag_components)
|
|
|
|
text = self.post_replacements(template, text)
|
|
return text
|