O conteúdo de mf-sync.js
.
#!/usr/bin/env node
process.title = 'mf-sync';
var path = require('path')
, fs = require('fs')
, _ = require('underscore');
// Parse command-line arguments
var args = _.chain(process.argv).rest(2).map(function(arg) {
arg = arg.replace('--', '').split('=');
_.size(arg) === 1 && arg.push(true);
return arg;
}).object().value();
if (!args.mfi) throw new Error('MFI ID not specified');
if (!args.source) throw new Error('Source ID not specified');
// Output when using '--debug' flag
var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
// Simulation mode
var simulate = _.has(args, 'simulate');
require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
simulate && console.warn('Simulation mode enabled. No changes will occurr.');
debug(args);
// Load MySQL configuration
var my = require('mysql');
var myConfig = require(path.join(__dirname, 'mysql.json'));
var db = 'gold';
if (args.source == '101027') db = 'mfdb';
var mysql = my.createConnection(myConfig[db]);
debug('MySQL', myConfig[db].database);
// Load Salesforce configuration
var sf = require('node-salesforce');
var sfConfig = require(path.join(__dirname, 'salesforce.json'));
var salesforce = new sf.Connection(sfConfig);
debug('Salesforce', sfConfig.username);
// Load CouchDB configuration
var cradle = require('cradle');
var couchConfig = require(path.join(__dirname, 'couchdb.json'));
var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
debug('CouchDB', couchConfig.mfdb.name);
// Add missing function to Underscore.js
_.mixin({
compactObject: function(obj) {
_.each(obj, function(v, k) {
if (_.isNull(v) || _.isFunction(v)) delete obj[k];
});
return obj;
}
});
// Get MFI data from MySQL
// -----------------------
var getMySQLData = function(mfi, callback) {
mysql.connect();
// Get master MFI metadata
debug('Getting master MFI metadata from 'mfi'.');
mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
if (err) throw new Error(err);
_.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
});
// Define MFDB data tables
var tables = {
'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
};
// Remove table name variance
var baseTable = _.memoize(function(table) {
return table.replace('_usd', '').replace('_adjusted', '');
});
var docs = {};
// Get all available MFDB data for the current 'mfi_vid'
debug('Getting all available MFDB data for the current 'mfi_vid'.');
_.each(_.keys(tables), function(key) {
_.each(tables[key], function(table) {
debug('Querying', key, 'data from', table);
mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
if (err) throw new Error(err);
// Create full document data
_.each(rows, function(row) {
// Create doc._id
var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
debug('Processing', table, 'data for', doc_id);
// Initialize document
if (!docs[doc_id]) docs[doc_id] = {
_id: doc_id,
type: 'mfi-period',
currency: key.split('/')[0],
adjusted: key.split('/')[1] === 'true',
fiscal_year: row.fiscal_year,
period_type: row.period_type,
as_of_date: row.as_of_date
};
if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
// Extend MFDB data into document
debug('Adding', table, 'data to', doc_id);
row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
});
});
});
});
// Get all scenario data to create dimension hierarchy
var tree = {};
mysql.query("SELECT * FROM scenarios", function(err, rows) {
debug('Processing scenario data into hierarchical tree.');
if (err) throw new Error(err);
// Get all children scenarios for any given parent
var getChildren = function(parent) {
var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
// Remove used scenarios from master list to decrease stack size
_.each(_.keys(scenarios), function(scenario) {
rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
});
}).value();
if (_.isEmpty(children)) return null;
return children;
}
// Recursively get dimension hierarchy
var getTree = function(hierarchy) {
if (_.isEmpty(hierarchy)) return;
_.each(_.keys(hierarchy), function(p) {
hierarchy[p] = getChildren(p);
if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
});
}
tree = getChildren('');
getTree(tree);
});
// Find path to nested object property
var findPath = _.memoize(function(needles, haystack) {
function constructPath(haystack, needle, path) {
if (!_.isObject(haystack)) return false;
if (typeof haystack !== 'object') return false;
for (var key in haystack) {
var value = haystack[key];
var currentPath = _.extend([], path);
currentPath.push(key);
if (key === needle) return currentPath;
var foundPath = constructPath(value, needle, currentPath);
if (foundPath) return foundPath;
}
}
// Handle comma-separated nested hierarchies
return _.chain(needles.split(',')).map(function(needle) {
return constructPath(haystack, needle, []);
}).flatten().compact().value();
});
// Assign value inside a nested object property
var deepAssign = function(obj, path, val) {
for (var i = 0 in path) {
var key = path[i];
if (i == path.length - 1) {
if (typeof obj[key] === 'object') obj[key].value = val;
else obj[key] = val;
} else if (typeof obj[key] !== 'object') {
obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
}
obj = obj[key];
}
}
// Sanitize dimension names
var sanitizeDimensions = _.memoize(function(dimensions) {
return _.map(dimensions, function(dimension) {
dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
if (/:/.test(dimension)) return dimension.split(':')[1];
else return dimension;
});
});
// Get dimension data for all available documents
_.each(['usd', 'local'], function(currency) {
var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
debug('Querying', currency, 'data from', dimensions_table);
mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
debug('Processing all data from', dimensions_table);
if (err) throw new Error(err);
_.each(rows, function(row) {
var dimension_path = findPath(row.scenarios, tree);
if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
_.each(['true', 'false'], function(adjusted) {
var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
});
});
});
});
mysql.end(function(err) {
debug('Disconnected from MySQL', db);
if (err) throw new Error(err);
callback(mfi, docs);
});
}
// Get MFI metadata from Salesforce
// --------------------------------
var getSalesforceData = function(mfi, docs, callback) {
var remaining = 4;
var done = function(mfi, docs) {
if (--remaining === 0) {
callback(mfi, docs);
// Logout from Salesforce
salesforce.logout(function(err) {
debug('Logged out from Salesforce');
if (err) throw new Error(err);
});
}
}
// Login into Salesforce
debug('Login into Salesforce');
salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
if (err) throw new Error(err);
// Get main MFI Metadata
debug('Getting MFI metadata from Salesforce');
salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
if (err) throw new Error(err);
if (result.totalSize === 0) throw new Error('MFI does not exist');
var record = {};
_.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
// Make attributes lowercase
record[k.toLowerCase()] = v;
});
_.extend(mfi, record);
mfi.mfi_name = mfi.name;
done(mfi, docs);
});
// Determine whether MFI contains Social Performance Profile data
debug('Determining whether MFI contains SP Profile data.');
salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
if (err) throw new Error(err);
mfi.sp_profile = !_.isEmpty(result.records);
done(mfi, docs);
});
// Get list of MFI Network Affiliations
debug('Getting list of MFI Network Affiliations');
salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
if (err) throw new Error(err);
mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
done(mfi, docs);
});
// Get annual diamonds
debug('Getting annual diamonds.');
salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
if (err) throw new Error(err);
// Group diamonds by year
var diamonds = _.chain(result.records).map(function(period) {
return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
}).object().value();
// Add diamonds to corresponding periods
_.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
doc.annual_diamonds = diamonds[doc.fiscal_year];
});
done(mfi, docs);
});
});
}
// Calculate Peer Group data
// -------------------------
var calculatePeerGroupData = function(docs, callback) {
// Safely get data point value
var getVal = function(obj, group, prop) {
if (_.has(obj, group) && _.has(obj[group], prop)) {
return obj[group][prop].value || obj[group][prop];
}
return undefined;
}
_.each(docs, function(doc, id) {
var peer_groups = {};
// Age
debug('Calculating peer group age for', doc._id);
if (_.has(doc, 'date_established__c')) {
var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
if (age) {
if (age < 4) peer_groups['age'] = 'New';
else if (age <= 8) peer_groups['age'] = 'Young';
else if (age > 8) peer_groups['age'] = 'Mature';
}
}
// Intermediation
debug('Calculating peer group intermediation for', doc._id);
var deposits = getVal(doc, 'balance_sheet', 'deposits');
var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
var ratio = deposits / total_assets;
if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
}
else if (total_assets === 0) {
peer_groups['intermediation'] = 'Non FI';
}
// Market
debug('Calculating peer group market for', doc._id);
var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
else if ((depth >= 1.5) && (depth < 2.5)) peer_groups['market'] = 'High End';
else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
}
// Outreach
debug('Calculating peer group outreach for', doc._id);
var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
// Scale
debug('Calculating peer group scale for', doc._id);
if (_.has(doc, 'mix_region__c')) {
var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
}
// Sustainability
debug('Calculating peer group sustainability for', doc._id);
var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
if (!_.isUndefined(operational_self_sufficiency)) {
if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
}
if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
});
callback(docs);
}
// Send data to CouchDB
// --------------------
var updateCouchDB = function(docs, callback) {
// Update master MFI record
debug('Updating master MFI record');
var mfi = docs.shift();
couch.get(mfi._id, function(err, doc) {
if (err) {
if (err.error === 'not_found') {
require('util').log('Inserting ' + mfi._id);
!simulate && couch.save(mfi._id, mfi, function(err, res) {
debug('Inserted', res);
if (err) throw new Error(err);
});
} else throw new Error(err);
} else if (doc._rev) {
require('util').log('Updating ' + mfi._id);
!simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
debug('Updated', res);
if (err) throw new Error(err);
});
}
});
// Get list of existing IDs in CouchDB
debug('Getting list of existing IDs in CouchDB');
couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
if (err) throw new Error(err);
// Remove outdated documents from CouchDB
_.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
return _.findWhere(ids, {id: id});
}).each(function(doc) {
require('util').log('Removing ' + doc.id);
couch.remove(doc.id, doc.value.rev, function(err, res) {
debug('Removed', res);
if (err) throw new Error(err);
});
});
// Insert/update all documents for this MFI
_.each(docs, function(doc) {
var update = _.findWhere(ids, {id: doc._id});
if (update) {
require('util').log('Updating ' + doc._id);
!simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
debug('Updated', res);
if (err) throw new Error(err);
});
} else {
require('util').log('Inserting ' + doc._id);
!simulate && couch.save(doc._id, doc, function(err, res) {
debug('Inserted', res);
if (err) throw new Error(err);
});
}
});
callback();
});
}
// Initialize MFI document
var mfi = {
_id: 'mfi/' + args.source + '/' + args.mfi,
type: 'mfi',
source_id: args.source,
mfi_id: args.mfi,
updated: new Date()
};
getMySQLData(mfi, function(mfi, docs) {
getSalesforceData(mfi, docs, function(mfi, docs) {
// Merge MFI metadata into each period
_.each(docs, function(doc, id) {
docs[id] = _.extend(_.clone(mfi), doc);
});
calculatePeerGroupData(docs, function(docs) {
// Convert to array for bulk updating
docs = _.union([mfi], _.values(docs));
updateCouchDB(docs, function() {
require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
});
});
});
});