Parsing moved to downloading. Matching is being tweaked.

This commit is contained in:
Konstantin Mamalakis 2018-03-15 20:36:34 +02:00 committed by Josh Harvey
parent 793db0f449
commit b72f5549a5
10 changed files with 456 additions and 276 deletions

View file

@ -1,7 +1,7 @@
const fs = require('fs')
const path = require('path')
const util = require('util')
const parser = require('./parsing')
const loader = require('./loading')
const matcher = require('./matching')
const nameUtils = require('./name-utils')
const options = require('../options')
@ -9,7 +9,7 @@ const _ = require('lodash/fp')
const debug_log = require('../pp')(__filename) // KOSTIS TODO: remove
const OFAC_DATA_DIR = options.ofacDataDir
const OFAC_SOURCES_DIR = path.join(options.ofacDataDir, 'sources')
let structs = null
@ -18,15 +18,15 @@ const readdir = util.promisify(fs.readdir)
function load () {
// NOTE: Not sure how you push code updates to existing clients. This problem
// might pop up if new code is pushed, without re-doing setup.
if (!OFAC_DATA_DIR) {
if (!OFAC_SOURCES_DIR) {
const message = 'The ofacDataDir option has not been set in lamassu.json'
return Promise.reject(new Error(message))
}
return readdir(OFAC_DATA_DIR)
return readdir(OFAC_SOURCES_DIR)
.then(_.flow(
_.map(file => path.join(OFAC_DATA_DIR, file)),
parser.parse
_.map(file => path.join(OFAC_SOURCES_DIR, file)),
loader.load
))
.then(result => {
return (structs = result)
@ -42,7 +42,8 @@ function makeCompatible (nameParts) {
return _.map(_.zipObject(['partName', 'value']), props)
}
function match (nameParts, birthDateString, threshold) {
function match (nameParts, birthDateString, options) {
const {debug} = options
if (!structs) {
const message = 'The OFAC data sources have not been loaded yet.'
return Promise.reject(new Error(message))
@ -68,10 +69,10 @@ function match (nameParts, birthDateString, threshold) {
])(birthDateString)
const candidate = {parts, fullName, words, birthDate}
// debug_log(candidate)
debug && debug_log(candidate)
const result = matcher.match(structs, candidate, threshold)
// debug_log(result)
const result = matcher.match(structs, candidate, options)
debug && debug_log(result)
return result
}

103
lib/ofac/loading.js Normal file
View file

@ -0,0 +1,103 @@
const fs = require('fs')
const ndjson = require('ndjson')
const _ = require('lodash/fp')
const mapAliases = _.curry((iteratee, individuals) => {
const mapIndividual = individual => {
const {id, aliases} = individual
return _.map(alias => iteratee(id, alias), aliases)
}
return _.flatMap(mapIndividual, individuals)
})
const getPhoneticEntries = (individualId, alias) => {
const pairPhoneticsWithValues = word => {
const {value, phonetics} = word
const makeEntry = phonetic => ({value, phonetic, aliasId: alias.id})
return _.map(makeEntry, phonetics)
}
return _.flatMap(pairPhoneticsWithValues, alias.words)
}
const producePhoneticMap = _.flow(
mapAliases(getPhoneticEntries),
_.flatten,
_.groupBy(_.get('phonetic')),
_.mapValues(_.flow(
_.map(_.get('aliasId')),
_.uniq
)),
_.toPairs,
entries => new Map(entries)
)
const getWords = (individualId, alias) => {
const pairWordsWithIds = word => ({value: word.value, aliasId: alias.id})
return _.map(pairWordsWithIds, alias.words)
}
const produceWordList = _.flow(
mapAliases(getWords),
_.flatten,
_.groupBy(_.get('value')),
_.mapValues(_.map(_.get('aliasId'))),
_.toPairs,
_.map(_.zipObject(['value', 'aliasIds']))
)
const parseSource = source => {
const individuals = []
const readStream = fs.createReadStream(source)
const jsonStream = readStream.pipe(ndjson.parse())
jsonStream.on('data', individual => {
individuals.push(individual)
})
return new Promise((resolve, reject) => {
jsonStream.on('error', reject)
jsonStream.on('end', () => {
resolve(individuals)
})
})
}
const load = sources => Promise.all(_.map(parseSource, sources))
.then(_.flow(
_.flatten,
_.compact,
_.uniqBy(_.get('id')),
individuals => {
const individualsMap = _.flow(
_.groupBy(_.get('id')),
_.mapValues(_.first),
_.toPairs,
entries => new Map(entries)
)(individuals)
const makeEntries = (individualId, alias) => [alias.id, alias]
const aliasesMap = new Map(mapAliases(makeEntries, individuals))
const getIdPairs = (individualId, alias) => [alias.id, individualId]
const idPairs = mapAliases(getIdPairs, individuals)
const aliasToIndividual = new Map(idPairs)
const phoneticMap = producePhoneticMap(individuals)
const wordList = produceWordList(individuals)
return {
individuals,
individualsMap,
aliasesMap,
aliasToIndividual,
phoneticMap,
wordList
}
}
))
module.exports = {load}

View file

@ -28,7 +28,8 @@ const isBornTooLongSince = _.curry((days, dateObject, individual) => {
// algorithm
function match (structs, candidate, threshold) {
function match (structs, candidate, options) {
const {threshold, ratio = 0.1, debug, verboseFor} = options
const {fullName, words, birthDate} = candidate
// Accept aliases who's full name matches.
@ -44,42 +45,57 @@ function match (structs, candidate, threshold) {
)(aliases)
const aliasIds = []
const phoneticWeight = 0.17
const aliasIdCounts = new Map()
const phoneticWeight = ratio
const stringWeight = 1 - phoneticWeight
for (const word of words) {
const getPhonetic = phonetic => structs.phoneticMap.get(phonetic)
const phoneticMatches = new Set(_.flatMap(getPhonetic, word.phonetics))
const aliasIds = new Set()
for (const wordEntry of structs.wordList) {
const stringScore = stringSimilarity(word.value, wordEntry.value)
if (stringWeight * stringScore + phoneticWeight < threshold) continue
const verbose = _.includes(wordEntry.value, verboseFor)
if (!verbose && stringWeight * stringScore + phoneticWeight < threshold) continue
for (const aliasId of wordEntry.aliasIds) {
const phoneticScore = phoneticMatches.has(aliasId) ? 1 : 0
const finalScore = stringWeight * stringScore + phoneticWeight * phoneticScore
const phoneticScore = phoneticMatches.has(aliasId) ? 1 : -1
// const finalScore = stringWeight * stringScore + phoneticWeight * phoneticScore
const finalScore = stringScore + phoneticWeight * phoneticScore
verbose && console.log(finalScore.toFixed(2), stringScore.toFixed(2), phoneticScore.toFixed(2), word.value, wordEntry.value)
if (finalScore >= threshold) {
aliasIds.push(aliasId)
aliasIds.add(aliasId)
}
}
}
verboseFor && console.log(aliasIds)
for (const aliasId of aliasIds.values()) {
const count = aliasIdCounts.get(aliasId) || 0
aliasIdCounts.set(aliasId, count + 1)
}
}
const aliasIdsFromNamePart = _.flow(
_.countBy(_.identity),
_.toPairs,
_.reject(_.flow(
_.last,
_.gt(2)
)),
_.map(_.first)
)(aliasIds)
verboseFor && console.log(aliasIdCounts)
// debug_log(aliasIdsFromFullName)
// debug_log(aliasIdsFromNamePart)
const aliasIdsFromNamePart = []
for (const [aliasId, count] of aliasIdCounts) {
const {length} = structs.aliasesMap.get(aliasId).words
if (count >= _.min([2, words.length, length])) {
aliasIdsFromNamePart.push(aliasId)
}
}
debug && debug_log(aliasIdsFromFullName)
debug && debug_log(aliasIdsFromNamePart)
// Get the full record for each matched id
const getIndividual = aliasId => {

View file

@ -139,111 +139,29 @@ function processProfile (profileNode) {
return individual
}
function promiseParseDocument (source) {
return new Promise((resolve, reject) => {
const stream = fs.createReadStream(source)
const xml = new XmlStream(stream)
const parse = (source, callback) => {
const stream = fs.createReadStream(source)
const xml = new XmlStream(stream)
xml.on('error', err => {
xml.pause()
const message = `Error while parsing OFAC data source file (${source}): ${err.message}`
reject(new Error(message))
})
xml.on('error', err => {
xml.pause()
const message = `Error while parsing OFAC data source file (${source}): ${err.message}`
callback(new Error(message))
})
xml.collect('Alias')
xml.collect('DocumentedName')
xml.collect('DocumentedNamePart')
xml.collect('Feature')
xml.collect('MasterNamePartGroup')
xml.collect('Alias')
xml.collect('DocumentedName')
xml.collect('DocumentedNamePart')
xml.collect('Feature')
xml.collect('MasterNamePartGroup')
const individuals = []
const forwardProfile = profile => profile && callback(null, profile)
const collectResult = result => result && individuals.push(result)
xml.on('updateElement: Profile', _.flow(processProfile, collectResult))
xml.on('updateElement: Profile', _.flow(processProfile, forwardProfile))
xml.on('end', _.wrap(resolve, individuals))
xml.on('end', () => {
callback(null, null)
})
}
const mapAliases = _.curry((iteratee, individuals) => {
const mapIndividual = individual => {
const {id, aliases} = individual
return _.map(alias => iteratee(id, alias), aliases)
}
return _.flatMap(mapIndividual, individuals)
})
const getPhoneticEntries = (individualId, alias) => {
const pairPhoneticsWithValues = word => {
const {value, phonetics} = word
const makeEntry = phonetic => ({value, phonetic, aliasId: alias.id})
return _.map(makeEntry, phonetics)
}
return _.flatMap(pairPhoneticsWithValues, alias.words)
}
const producePhoneticMap = _.flow(
mapAliases(getPhoneticEntries),
_.flatten,
_.groupBy(_.get('phonetic')),
_.mapValues(_.flow(
_.map(_.get('aliasId')),
_.uniq
)),
_.toPairs,
entries => new Map(entries)
)
const getWords = (individualId, alias) => {
const pairWordsWithIds = word => ({value: word.value, aliasId: alias.id})
return _.map(pairWordsWithIds, alias.words)
}
const produceWordList = _.flow(
mapAliases(getWords),
_.flatten,
_.groupBy(_.get('value')),
_.mapValues(_.map(_.get('aliasId'))),
_.toPairs,
_.map(_.zipObject(['value', 'aliasIds']))
)
function parse (sources) {
return Promise.all(_.map(promiseParseDocument, sources))
.then(_.flow(
_.flatten,
_.compact,
_.uniqBy(_.get('id')),
individuals => {
const individualsMap = _.flow(
_.groupBy(_.get('id')),
_.mapValues(_.first),
_.toPairs,
entries => new Map(entries)
)(individuals)
const makeEntries = (individualId, alias) => [alias.id, alias]
const aliasesMap = new Map(mapAliases(makeEntries, individuals))
const getIdPairs = (individualId, alias) => [alias.id, individualId]
const idPairs = mapAliases(getIdPairs, individuals)
const aliasToIndividual = new Map(idPairs)
const phoneticMap = producePhoneticMap(individuals)
const wordList = produceWordList(individuals)
return {
individuals,
individualsMap,
aliasesMap,
aliasToIndividual,
phoneticMap,
wordList
}
}
))
}
module.exports = {parse}

155
lib/ofac/update.js Normal file
View file

@ -0,0 +1,155 @@
const parser = require('./parsing')
const https = require('https')
const url = require('url')
const fs = require('fs')
const path = require('path')
const util = require('util')
const options = require('../options')
const _ = require('lodash/fp')
const OFAC_DATA_DIR = options.ofacDataDir
const OFAC_SOURCES_DIR = path.join(OFAC_DATA_DIR, 'sources')
const OFAC_SOURCES_FILE = path.join(OFAC_DATA_DIR, 'sources.json')
const OFAC_ETAGS_FILE = path.join(OFAC_DATA_DIR, 'etags.json')
const DOWNLOAD_DIR = path.resolve('/tmp')
const readFile = util.promisify(fs.readFile)
const writeFile = util.promisify(fs.writeFile)
const rename = util.promisify(fs.rename)
const unlink = util.promisify(fs.unlink)
const remove = file => {
console.log("remove", file)
return unlink(file)
}
const promiseGetEtag = (source) => {
return new Promise((resolve, reject) => {
const {url: sourceUrl} = source
const parsed = url.parse(sourceUrl)
const requestOptions = {
hostname: parsed.hostname,
path: parsed.path,
method: 'HEAD'
}
const request = https.request(requestOptions, _.flow(
_.get(['headers', 'etag']),
resolve
))
request.on('error', reject)
request.end()
})
}
const download = _.curry((dstDir, source) => {
console.log("download", source)
const {url: sourceUrl} = source
const fileName = path.basename(sourceUrl)
const dstFile = path.join(dstDir, fileName)
const file = fs.createWriteStream(dstFile)
return new Promise((resolve, reject) => {
const request = https.get(sourceUrl, response => {
response.pipe(file);
file.on('finish', () => file.close(() => resolve(dstFile)))
})
request.on('error', reject)
})
})
const parseToJson = srcFile => {
console.log("parseToJson", srcFile)
const dstFile = srcFile.replace(/\.xml$/, '.json')
const writeStream = fs.createWriteStream(dstFile)
return new Promise((resolve, reject) => {
parser.parse(srcFile, (err, profile) => {
console.log("callback", err, profile)
if (err) {
reject(err)
return
}
if (!profile) {
writeStream.end()
return
}
const json = JSON.stringify(profile)
writeStream.write(json + '\n', 'utf-8')
})
writeStream.on('error', reject)
writeStream.on('finish', () => resolve(dstFile))
})
}
const moveToSourcesDir = srcFile => {
console.log("moveToSourcesDir", srcFile)
const name = path.basename(srcFile)
const dstFile = path.join(OFAC_SOURCES_DIR, name)
return rename(srcFile, dstFile)
}
function update () {
const promiseOldEtags = readFile(OFAC_ETAGS_FILE, {encoding: 'utf-8'})
.then(json => JSON.parse(json) || {})
const promiseNewEtags = readFile(OFAC_SOURCES_FILE, {encoding: 'utf-8'})
.then(json => {
const obj = JSON.parse(json)
return obj ? obj.sources : []
})
.then(sources => Promise.all(_.map(promiseGetEtag, sources))
.then(etags => _.map(
([source, etag]) => ({...source, etag}),
_.zip(sources, etags)
))
)
return Promise.all([promiseOldEtags, promiseNewEtags])
.then(([oldEtags, newEtags]) => {
console.log("OLD", JSON.stringify(oldEtags, null, 4))
console.log("NEW", JSON.stringify(newEtags, null, 4))
const hasNotChanged = ({name, etag}) => oldEtags[name] === etag
const downloads = _.flow(
_.reject(hasNotChanged),
_.map(file => download(DOWNLOAD_DIR, file).then(parseToJson))
)(newEtags)
const oldFileNames = _.keys(oldEtags)
const newFileNames = _.map(_.get('name'), newEtags)
const missingFileNames = _.difference(oldFileNames, newFileNames)
const resolve = name => path.join(OFAC_SOURCES_DIR, name + '.json')
const missing = _.map(resolve, missingFileNames)
const etagsJson = _.flow(
_.map(source => [source.name, source.etag]),
_.fromPairs,
obj => JSON.stringify(obj, null, 4)
)(newEtags)
return Promise.all(downloads)
.then(parsed => {
console.log("finished", parsed)
const moves = _.map(moveToSourcesDir, parsed)
const deletions = _.map(remove, missing)
const updateEtags = writeFile(OFAC_ETAGS_FILE, etagsJson)
return Promise.all([updateEtags, ...moves, ...deletions])
})
})
}
module.exports = {update}