224 lines
8.7 KiB
Python
224 lines
8.7 KiB
Python
import os
|
|
import sys
|
|
|
|
from collections import defaultdict, OrderedDict
|
|
|
|
from marisa_trie import BytesTrie
|
|
|
|
this_dir = os.path.realpath(os.path.dirname(__file__))
|
|
sys.path.append(os.path.realpath(os.path.join(os.pardir, os.pardir)))
|
|
|
|
sys.path.append(os.path.realpath(os.path.join(os.pardir, os.pardir, os.pardir, 'python')))
|
|
|
|
from geodata.encoding import safe_decode
|
|
from geodata.i18n.unicode_paths import DATA_DIR
|
|
from geodata.i18n.normalize import strip_accents
|
|
from geodata.i18n.unicode_properties import get_chars_by_script, get_script_languages
|
|
from address_normalizer.text.normalize import PhraseFilter
|
|
from address_normalizer.text.tokenize import *
|
|
|
|
WELL_REPRESENTED_LANGUAGES = set(['en', 'fr', 'it', 'de', 'nl', 'es'])
|
|
|
|
DICTIONARIES_DIR = os.path.join(DATA_DIR, 'dictionaries')
|
|
|
|
PREFIX_KEY = u'\x02'
|
|
SUFFIX_KEY = u'\x03'
|
|
|
|
POSSIBLE_ROMAN_NUMERALS = set(['i', 'ii', 'iii', 'iv', 'v', 'vi', 'vii', 'viii', 'ix',
|
|
'x', 'xi', 'xii', 'xiii', 'xiv', 'xv', 'xvi', 'xvii', 'xviii', 'xix',
|
|
'xx', 'xxx', 'xl', 'l', 'lx', 'lxx', 'lxxx', 'xc',
|
|
'c', 'cc', 'ccc', 'cd', 'd', 'dc', 'dcc', 'dccc', 'cm',
|
|
'm', 'mm', 'mmm', 'mmmm'])
|
|
|
|
|
|
class DictionaryPhraseFilter(PhraseFilter):
|
|
def __init__(self, *dictionaries):
|
|
self.dictionaries = dictionaries
|
|
|
|
def serialize(self, s):
|
|
return s
|
|
|
|
def deserialize(self, s):
|
|
return s
|
|
|
|
def configure(self, base_dir=DICTIONARIES_DIR):
|
|
kvs = defaultdict(OrderedDict)
|
|
for lang in os.listdir(DICTIONARIES_DIR):
|
|
for filename in self.dictionaries:
|
|
is_suffix_dictionary = 'suffixes' in filename
|
|
is_prefix_dictionary = 'prefixes' in filename
|
|
is_street_types_dictionary = 'street_types' in filename
|
|
is_stopword_dictionary = 'stopwords' in filename
|
|
|
|
path = os.path.join(DICTIONARIES_DIR, lang, filename)
|
|
if not os.path.exists(path):
|
|
continue
|
|
|
|
for line in open(path):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
phrases = safe_decode(line).split(u'|')
|
|
if not phrases:
|
|
continue
|
|
canonical = strip_accents(phrases[0])
|
|
|
|
for phrase in phrases:
|
|
|
|
if phrase in POSSIBLE_ROMAN_NUMERALS:
|
|
continue
|
|
|
|
is_canonical = strip_accents(phrase) == canonical
|
|
|
|
if is_suffix_dictionary:
|
|
phrase = SUFFIX_KEY + phrase[::-1]
|
|
elif is_prefix_dictionary:
|
|
phrase = PREFIX_KEY + phrase
|
|
|
|
if is_canonical or is_street_types_dictionary or is_prefix_dictionary or is_suffix_dictionary:
|
|
kvs[phrase][lang] = (is_canonical, is_stopword_dictionary)
|
|
|
|
kvs = [(k, '|'.join([v, str(int(c)), str(int(s))])) for k, vals in kvs.iteritems() for v, (c, s) in vals.iteritems()]
|
|
|
|
self.trie = BytesTrie(kvs)
|
|
self.configured = True
|
|
|
|
def search_substring(self, s):
|
|
if len(s) == 0:
|
|
return None, 0
|
|
|
|
for i in xrange(len(s) + 1):
|
|
if not self.trie.has_keys_with_prefix(s[:i]):
|
|
i -= 1
|
|
break
|
|
if i > 0:
|
|
return (self.trie.get(s[:i]), i)
|
|
else:
|
|
return None, 0
|
|
|
|
def basic_filter(self, tokens):
|
|
return super(DictionaryPhraseFilter, self).filter(tokens)
|
|
|
|
def filter(self, tokens):
|
|
for c, t, data in self.basic_filter(tokens):
|
|
if c != token_types.PHRASE:
|
|
token = t[1]
|
|
token_len = len(token)
|
|
suffix_search, suffix_len = self.search_substring(SUFFIX_KEY + token[::-1])
|
|
|
|
if suffix_search and self.trie.get(token[token_len - (suffix_len - len(SUFFIX_KEY)):].rstrip('.')):
|
|
yield (token_types.PHRASE, [(c,) + t], suffix_search)
|
|
continue
|
|
prefix_search, prefix_len = self.search_substring(PREFIX_KEY + token)
|
|
if prefix_search and self.trie.get(token[:(prefix_len - len(PREFIX_KEY))]):
|
|
yield (token_types.PHRASE, [(c,) + t], prefix_search)
|
|
continue
|
|
yield c, t, data
|
|
|
|
street_types_gazetteer = DictionaryPhraseFilter('street_types.txt',
|
|
'directionals.txt',
|
|
'concatenated_suffixes_separable.txt',
|
|
'concatenated_suffixes_inseparable.txt',
|
|
'concatenated_prefixes_separable.txt',
|
|
'stopwords.txt',)
|
|
|
|
char_scripts = get_chars_by_script()
|
|
script_languages = {script: set(langs) for script, langs in get_script_languages().iteritems()}
|
|
|
|
UNKNOWN_SCRIPT = 'Unknown'
|
|
COMMON_SCRIPT = 'Common'
|
|
MAX_ASCII = 127
|
|
|
|
|
|
def get_string_script(s):
|
|
s = safe_decode(s)
|
|
script = last_script = UNKNOWN_SCRIPT
|
|
is_ascii = True
|
|
script_len = 0
|
|
for c in s:
|
|
script = char_scripts[ord(c)]
|
|
if script == COMMON_SCRIPT and last_script != UNKNOWN_SCRIPT:
|
|
script = last_script
|
|
if last_script != script and last_script != UNKNOWN_SCRIPT and last_script != COMMON_SCRIPT:
|
|
break
|
|
is_ascii = is_ascii and ord(c) <= MAX_ASCII
|
|
script_len += 1
|
|
if script != UNKNOWN_SCRIPT:
|
|
last_script = script
|
|
return (last_script, script_len, is_ascii)
|
|
|
|
LATIN_SCRIPT = 'Latin'
|
|
UNKNOWN_LANGUAGE = 'unk'
|
|
AMBIGUOUS_LANGUAGE = 'xxx'
|
|
|
|
|
|
def disambiguate_language(text, languages):
|
|
valid_languages = OrderedDict(languages)
|
|
script_langs = {}
|
|
read_len = 0
|
|
while read_len < len(text):
|
|
script, script_len, is_ascii = get_string_script(text[read_len:])
|
|
if script != LATIN_SCRIPT:
|
|
script_langs[script] = set([l for l, d in languages if l in script_languages[script]])
|
|
read_len += script_len
|
|
|
|
num_defaults = sum((1 for lang, default in valid_languages.iteritems() if default))
|
|
tokens = [(c, t.rstrip('.')) for c, t in tokenize(safe_decode(text).replace(u'-', u' ').lower())]
|
|
|
|
current_lang = None
|
|
possible_lang = None
|
|
|
|
seen_languages = set()
|
|
|
|
for c, t, data in street_types_gazetteer.filter(tokens):
|
|
if c == token_types.PHRASE:
|
|
valid = []
|
|
data = [d.split('|') for d in data]
|
|
potentials = [l for l, c, s in data if l in valid_languages]
|
|
|
|
for lang, canonical, stopword in data:
|
|
canonical = int(canonical)
|
|
stopword = int(stopword)
|
|
if lang not in valid_languages or (stopword and len(potentials) > 1):
|
|
continue
|
|
is_default = valid_languages[lang]
|
|
|
|
lang_valid = is_default or not seen_languages or lang in seen_languages
|
|
|
|
if lang_valid and ((canonical and not stopword) or (is_default and len(potentials) == 1)):
|
|
valid.append(lang)
|
|
elif is_default and num_defaults > 1 and current_lang is not None and current_lang != lang:
|
|
return AMBIGUOUS_LANGUAGE
|
|
elif stopword and canonical and not is_default and lang in seen_languages:
|
|
valid.append(lang)
|
|
elif not seen_languages and len(potentials) == 1 and len(t[0][1]) > 1:
|
|
possible_lang = lang if possible_lang is None or possible_lang == lang else None
|
|
|
|
if seen_languages and valid and not any((l in seen_languages for l in valid)) and \
|
|
(not any((valid_languages.get(l) for l in valid)) or any((valid_languages.get(l) for l in seen_languages))):
|
|
return AMBIGUOUS_LANGUAGE
|
|
|
|
if len(valid) == 1:
|
|
current_lang = valid[0]
|
|
else:
|
|
valid_default = [l for l in valid if valid_languages.get(l)]
|
|
if len(valid_default) == 1 and current_lang is not None and valid_default[0] != current_lang:
|
|
return AMBIGUOUS_LANGUAGE
|
|
elif len(valid_default) == 1:
|
|
current_lang = valid_default[0]
|
|
|
|
if any((current_lang not in langs for script, langs in script_langs.iteritems())):
|
|
return AMBIGUOUS_LANGUAGE
|
|
|
|
seen_languages.update(valid)
|
|
|
|
if current_lang is not None:
|
|
return current_lang
|
|
elif possible_lang is not None:
|
|
if not any((possible_lang not in langs for script, langs in script_langs.iteritems())):
|
|
return possible_lang
|
|
else:
|
|
return AMBIGUOUS_LANGUAGE
|
|
return UNKNOWN_LANGUAGE
|