nodejs sub processos no script bash nohup lock up

1

Eu executo scripts com nohup que contêm uma lista de algumas centenas a alguns milhares de comandos nodejs. Esses subprocessos nodejs sincronizam dados do mysql e salesforce para o couchdb.

$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh

O script:

#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....

Em um terminal, observo o bloqueio dos subprocessos:

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ kill 6333

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6423 ?        00:00:00 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6449 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

NOTA: 30097 é o pid do processo nohup .

Examinando o log antes e depois de matar o subprocesso, vejo que o próximo comando nodejs é executado em sequência. Eu tentei executá-los com --debug sinalizador para saída detalhada mas não vejo nada incomum.

Notas adicionais

  • O Nodejs tem um limite de memória de 1 GB.
  • O Couchdb usa como padrão 2048 conexões máximas.
  • O conteúdo de mf-sync.js .

    #!/usr/bin/env node
    process.title = 'mf-sync';
    
    var path = require('path')
    ,   fs = require('fs')
    ,   _ = require('underscore');
    
    // Parse command-line arguments
    var args = _.chain(process.argv).rest(2).map(function(arg) {
        arg = arg.replace('--', '').split('=');
        _.size(arg) === 1 && arg.push(true);
        return arg;
    }).object().value();
    
    if (!args.mfi) throw new Error('MFI ID not specified');
    if (!args.source) throw new Error('Source ID not specified');
    
    // Output when using '--debug' flag
    var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
    
    // Simulation mode
    var simulate = _.has(args, 'simulate');
    
    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
    simulate && console.warn('Simulation mode enabled. No changes will occurr.');
    debug(args);
    
    // Load MySQL configuration
    var my = require('mysql');
    var myConfig = require(path.join(__dirname, 'mysql.json'));
    var db = 'gold';
    if (args.source == '101027') db = 'mfdb';
    var mysql = my.createConnection(myConfig[db]);
    debug('MySQL', myConfig[db].database);
    
    // Load Salesforce configuration
    var sf = require('node-salesforce');
    var sfConfig = require(path.join(__dirname, 'salesforce.json'));
    var salesforce = new sf.Connection(sfConfig);
    debug('Salesforce', sfConfig.username);
    
    // Load CouchDB configuration
    var cradle = require('cradle');
    var couchConfig = require(path.join(__dirname, 'couchdb.json'));
    var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
    debug('CouchDB', couchConfig.mfdb.name);
    
    // Add missing function to Underscore.js
    _.mixin({
        compactObject: function(obj) {
            _.each(obj, function(v, k) {
                if (_.isNull(v) || _.isFunction(v)) delete obj[k];
            });
            return obj;
        }
    });
    
    // Get MFI data from MySQL
    // -----------------------
    var getMySQLData = function(mfi, callback) {
        mysql.connect();
    
        // Get master MFI metadata
        debug('Getting master MFI metadata from 'mfi'.');
        mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
            if (err) throw new Error(err);
            _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
        });
    
        // Define MFDB data tables
        var tables = {
            'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
            'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
            'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
            'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
        };
        // Remove table name variance
        var baseTable = _.memoize(function(table) {
            return table.replace('_usd', '').replace('_adjusted', '');
        });
    
        var docs = {};
        // Get all available MFDB data for the current 'mfi_vid'
        debug('Getting all available MFDB data for the current 'mfi_vid'.');
        _.each(_.keys(tables), function(key) {
            _.each(tables[key], function(table) {
                debug('Querying', key, 'data from', table);
                mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                    if (err) throw new Error(err);
    
                    // Create full document data
                    _.each(rows, function(row) {
                        // Create doc._id
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        debug('Processing', table, 'data for', doc_id);
    
                        // Initialize document
                        if (!docs[doc_id]) docs[doc_id] = {
                            _id: doc_id,
                            type: 'mfi-period',
                            currency: key.split('/')[0],
                            adjusted: key.split('/')[1] === 'true',
                            fiscal_year: row.fiscal_year,
                            period_type: row.period_type,
                            as_of_date: row.as_of_date
                        };
                        if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
    
                        // Extend MFDB data into document
                        debug('Adding', table, 'data to', doc_id);
                        row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
                        if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
                    });
                });
            });
        });
    
        // Get all scenario data to create dimension hierarchy
        var tree = {};
        mysql.query("SELECT * FROM scenarios", function(err, rows) {
            debug('Processing scenario data into hierarchical tree.');
            if (err) throw new Error(err);
    
            // Get all children scenarios for any given parent
            var getChildren = function(parent) {
                var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
                    // Remove used scenarios from master list to decrease stack size
                    _.each(_.keys(scenarios), function(scenario) {
                        rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
                    });
                }).value();
                if (_.isEmpty(children)) return null;
                return children;
            }
    
            // Recursively get dimension hierarchy
            var getTree = function(hierarchy) {
                if (_.isEmpty(hierarchy)) return;
                _.each(_.keys(hierarchy), function(p) {
                    hierarchy[p] = getChildren(p);
                    if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
                });
            }
    
            tree = getChildren('');
            getTree(tree);
        });
    
        // Find path to nested object property
        var findPath = _.memoize(function(needles, haystack) {
            function constructPath(haystack, needle, path) {
                if (!_.isObject(haystack)) return false;
                if (typeof haystack !== 'object') return false;
                for (var key in haystack) {
                    var value = haystack[key];
                    var currentPath = _.extend([], path);
                    currentPath.push(key);
                    if (key === needle) return currentPath;
                    var foundPath = constructPath(value, needle, currentPath);
                    if (foundPath) return foundPath;
                }
            }
            // Handle comma-separated nested hierarchies
            return _.chain(needles.split(',')).map(function(needle) {
                return constructPath(haystack, needle, []);
            }).flatten().compact().value();
        });
        // Assign value inside a nested object property
        var deepAssign = function(obj, path, val) {
            for (var i = 0 in path) {
                var key = path[i];
                if (i == path.length - 1) {
                    if (typeof obj[key] === 'object') obj[key].value = val;
                    else obj[key] = val;
                } else if (typeof obj[key] !== 'object') {
                    obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
                }
                obj = obj[key];
            }
        }
        // Sanitize dimension names
        var sanitizeDimensions = _.memoize(function(dimensions) {
            return _.map(dimensions, function(dimension) {
                dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
                if (/:/.test(dimension)) return dimension.split(':')[1];
                else return dimension;
            });
        });
    
        // Get dimension data for all available documents
        _.each(['usd', 'local'], function(currency) {
            var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
            debug('Querying', currency, 'data from', dimensions_table);
            mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                debug('Processing all data from', dimensions_table);
                if (err) throw new Error(err);
                _.each(rows, function(row) {
                    var dimension_path = findPath(row.scenarios, tree);
                    if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
                    _.each(['true', 'false'], function(adjusted) {
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
                        docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
                    });
                });
            });
        });
    
        mysql.end(function(err) {
            debug('Disconnected from MySQL', db);
            if (err) throw new Error(err);
            callback(mfi, docs);
        });
    }
    
    // Get MFI metadata from Salesforce
    // --------------------------------
    var getSalesforceData = function(mfi, docs, callback) {
        var remaining = 4;
        var done = function(mfi, docs) {
            if (--remaining === 0) {
                callback(mfi, docs);
    
                // Logout from Salesforce
                salesforce.logout(function(err) {
                    debug('Logged out from Salesforce');
                    if (err) throw new Error(err);
                });
            }
        }
    
        // Login into Salesforce
        debug('Login into Salesforce');
        salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
            if (err) throw new Error(err);
    
            // Get main MFI Metadata
            debug('Getting MFI metadata from Salesforce');
            salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
                if (err) throw new Error(err);
                if (result.totalSize === 0) throw new Error('MFI does not exist');
                var record = {};
                _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
                    // Make attributes lowercase
                    record[k.toLowerCase()] = v;
                });
                _.extend(mfi, record);
                mfi.mfi_name = mfi.name;
                done(mfi, docs);
            });
    
            // Determine whether MFI contains Social Performance Profile data
            debug('Determining whether MFI contains SP Profile data.');
            salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
                if (err) throw new Error(err);
                mfi.sp_profile = !_.isEmpty(result.records);
                done(mfi, docs);
            });
    
            // Get list of MFI Network Affiliations
            debug('Getting list of MFI Network Affiliations');
            salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
                done(mfi, docs);
            });
    
            // Get annual diamonds
            debug('Getting annual diamonds.');
            salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                // Group diamonds by year
                var diamonds = _.chain(result.records).map(function(period) {
                    return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
                }).object().value();
                // Add diamonds to corresponding periods
                _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
                    doc.annual_diamonds = diamonds[doc.fiscal_year];
                });
                done(mfi, docs);
            });
        });
    }
    
    // Calculate Peer Group data
    // -------------------------
    var calculatePeerGroupData = function(docs, callback) {
        // Safely get data point value
        var getVal = function(obj, group, prop) {
            if (_.has(obj, group) && _.has(obj[group], prop)) {
                return obj[group][prop].value || obj[group][prop];
            }
            return undefined;
        }
    
        _.each(docs, function(doc, id) {
            var peer_groups = {};
    
            // Age
            debug('Calculating peer group age for', doc._id);
            if (_.has(doc, 'date_established__c')) {
                var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
                if (age) {
                    if (age < 4) peer_groups['age'] = 'New';
                    else if (age <= 8) peer_groups['age'] = 'Young';
                    else if (age > 8) peer_groups['age'] = 'Mature';
                }
            }
    
            // Intermediation
            debug('Calculating peer group intermediation for', doc._id);
            var deposits = getVal(doc, 'balance_sheet', 'deposits');
            var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
            if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
                var ratio = deposits / total_assets;
                if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
                else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
                else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
            }
            else if (total_assets === 0) {
                peer_groups['intermediation'] = 'Non FI';
            }
    
            // Market
            debug('Calculating peer group market for', doc._id);
            var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
            var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
            if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
                if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
                else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
                else if ((depth >= 1.5)  && (depth < 2.5)) peer_groups['market'] = 'High End';
                else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
            }
    
            // Outreach
            debug('Calculating peer group outreach for', doc._id);
            var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
            if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
            else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
            else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
    
            // Scale
            debug('Calculating peer group scale for', doc._id);
            if (_.has(doc, 'mix_region__c')) {
                var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
                if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
                else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
                else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
            }
    
            // Sustainability
            debug('Calculating peer group sustainability for', doc._id);
            var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
            if (!_.isUndefined(operational_self_sufficiency)) {
                if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
                else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
            }
    
            if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
        });
    
        callback(docs);
    }
    
    // Send data to CouchDB
    // --------------------
    var updateCouchDB = function(docs, callback) {
        // Update master MFI record
        debug('Updating master MFI record');
        var mfi = docs.shift();
        couch.get(mfi._id, function(err, doc) {
            if (err) {
                if (err.error === 'not_found') {
                    require('util').log('Inserting ' + mfi._id);
                    !simulate && couch.save(mfi._id, mfi, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                } else throw new Error(err);
            } else if (doc._rev) {
                require('util').log('Updating ' + mfi._id);
                !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
                    debug('Updated', res);
                    if (err) throw new Error(err);
                });
            }
        });
    
        // Get list of existing IDs in CouchDB
        debug('Getting list of existing IDs in CouchDB');
        couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
            if (err) throw new Error(err);
    
            // Remove outdated documents from CouchDB
            _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
                return _.findWhere(ids, {id: id});
            }).each(function(doc) {
                require('util').log('Removing ' + doc.id);
                couch.remove(doc.id, doc.value.rev, function(err, res) {
                    debug('Removed', res);
                    if (err) throw new Error(err);
                });
            });
    
            // Insert/update all documents for this MFI
            _.each(docs, function(doc) {
                var update = _.findWhere(ids, {id: doc._id});
                if (update) {
                    require('util').log('Updating ' + doc._id);
                    !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
                        debug('Updated', res);
                        if (err) throw new Error(err);
                    });
                } else {
                    require('util').log('Inserting ' + doc._id);
                    !simulate && couch.save(doc._id, doc, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                }
            });
    
            callback();
        });
    }
    
    // Initialize MFI document
    var mfi = {
        _id: 'mfi/' + args.source + '/' + args.mfi,
        type: 'mfi',
        source_id: args.source,
        mfi_id: args.mfi,
        updated: new Date()
    };
    
    getMySQLData(mfi, function(mfi, docs) {
        getSalesforceData(mfi, docs, function(mfi, docs) {
            // Merge MFI metadata into each period
            _.each(docs, function(doc, id) {
                docs[id] = _.extend(_.clone(mfi), doc);
            });
            calculatePeerGroupData(docs, function(docs) {
                // Convert to array for bulk updating
                docs = _.union([mfi], _.values(docs));
                updateCouchDB(docs, function() {
                    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
                });
            });
        });
    });
    

Perguntas

Eu gostaria de saber:

  1. Por que esses subprocessos parecem congelar? (Não consigo encontrar nenhuma evidência de que aqueles que congelam sejam diferentes dos que executam e param).
  2. Como posso programar a interrupção de um subprocesso que congela por vários minutos, para que eu não precise matá-lo manualmente?
por decibel.places 25.05.2014 / 17:12

1 resposta

1

Tem que adivinhar que você está ficando sem algum tipo de recurso (embora você provavelmente já esteja adivinhando isso). Talvez arquivos max abertos, ou mysql, ou salesforce. Não sei.

Mas de uma forma você pode ser capaz de resolvê-lo em vez do bash script com uma tonelada dessas coisas mf-sync, colocar mf-sync em um módulo e usar um script Node de controle que usa uma fila para rodar este mf -sync tipo de coisa em lotes controlados com a fila. Tente algo como link

Parece um pouco louco do jeito que você está fazendo. Mas se ficar sem arquivos, talvez você consiga aumentar o limite. link

Ou se você realmente quiser fazer isso no shell, talvez você possa usar o comando batch para manipular a fila. link

Ou talvez algo como isso seria melhor link

    
por 25.05.2014 / 18:14