用简单的脚本实现超大数据量的期初导入?附带Map/Reduce源
User Case:我上传的文件,每个在17000行左右,感觉没有一个会成功,这是怎么回事?如果将文件内容做成十几二十行,就可以成功上传。
User Case:6700行刚导了一个,成功了,17000行根本导不动,全部失败。
脚本简单思路:文件传到File cabinet,Map/Reduce读取文件分解成行给Map,Map按照SKU分配给Reduce,Reduce把具体的IA做出来,收工~
直接贴源代码出来,这个是基于亚马逊库存日志做的,读取文件是一样的道理。
有问题请直接发问,谢谢大家的捧场!
/**
* @NApiVersion 2.x
* @NScriptType MapReduceScript
* @NModuleScope SameAccount
*/
define(['N/error', 'N/email', 'N/runtime', 'N/record', 'N/search', './Helper/Amazon', './Helper/Moment', './Helper/Summarization'],
function(error, email, runtime, record, search, amazon, moment, summarization) {
const INVENTORY_ADJUSTMENT_ACCOUNT_ID = 4232;
const LOT_NUMBER = '20180719001';
/**
* Marks the beginning of the Map/Reduce process and generates input data.
*
* @typedef {Object} ObjectRef
* @property {number} id - Internal ID of the record instance
* @property {string} type - Record type id
*
* @return {Array|Object|Search|RecordRef} inputSummary
* @since 2015.1
*/
function getInputData() {
var input = [];
search.create({
type: 'customrecord_mws_inventory',
filters: [
{name: 'custrecordwmi_snapshot_date', operator: 'startswith', values: '2018-07-19'}
],
columns: [
{name: 'internalid', join: 'custrecordi_account'},
{name: 'custrecord_amz_subsidiary', join: 'custrecordwmi_account'},
{name: 'custrecord_amz_dept', join: 'custrecordwmi_account'},
{name: 'custrecord_amz_fba_warehouse', join: 'custrecordwmi_account'},
{name: 'custrecord_amz_fba_bin', join: 'custrecordwmi_account'},
{name: 'name', join: 'custrecordwmi_account'},
{name: 'custrecordwmi_seller_sku'},
{name: 'custrecordwmi_fulfillment_channel_sku'},
{name: 'custrecordwmi_condition_type'},
{name: 'custrecordwmi_quantity'}
]
}).run().each(function(rec) {
input.push({
account_id: rec.getValue(rec.columns[0]),
subsidiary: rec.getValue(rec.columns[1]),
dept: rec.getValue(rec.columns[2]),
location: rec.getValue(rec.columns[3]),
bin: rec.getValue(rec.columns[4]),
name: rec.getValue(rec.columns[5]),
sku: rec.getValue(rec.columns[6]),
fnsku: rec.getValue(rec.columns[7]),
condition: rec.getValue(rec.columns[8]),
qty: rec.getValue(rec.columns[9]),
});
return true;
});
return input;
}
/**
* Executes when the map entry point is triggered and applies to each key/value pair.
*
* @param {MapSummary} context - Data collection containing the key/value pairs to process through the map stage
* @since 2015.1
*/
function map(context) {
// log.debug('map', JSON.parse(context.value));
var f = JSON.parse(context.value);
context.write([
f.subsidiary,
f.dept,
f.location
].join('|'), f);
}
/**
* Executes when the reduce entry point is triggered and applies to each group.
*
* @param {ReduceSummary} context - Data collection containing the groups to process through the reduce stage
* @since 2015.1
*/
function reduce(context) {
[subsidiary, dept, location] = context.key.split('|');
var info = {
subsidiary: subsidiary,
dept: dept,
location: location,
items: []
}
var ia = record.create({
type: 'inventoryadjustment',
isDynamic: true
});
ia.setValue({fieldId: 'subsidiary', value: subsidiary});
ia.setValue({fieldId: 'department', value: dept});
ia.setValue({fieldId: 'adjlocation', value: location});
ia.setValue({fieldId: 'account', value: INVENTORY_ADJUSTMENT_ACCOUNT_ID});
ia.setValue({fieldId: 'memo', value: 'FIST INITIALIZATION'});
context.values.map(function(v) {
var f = JSON.parse(v);
info.items.push({
item: f.sku,
condition: f.condition,
qty: f.qty,
lot: LOT_NUMBER
});
search.create({
type: 'item',
filters: [
{name: 'itemid', operator: 'is', values: f.sku}
]
}).run().each(function(rec) {
var item_id = rec.id;
ia.selectNewLine({sublistId: 'inventory'});
ia.setCurrentSublistValue({sublistId: 'inventory', fieldId: 'item', value: item_id});
ia.setCurrentSublistValue({sublistId: 'inventory', fieldId: 'location', value: f.location});
ia.setCurrentSublistValue({sublistId: 'inventory', fieldId: 'adjustqtyby', value: f.qty});
ia.setCurrentSublistValue({sublistId: 'inventory', fieldId: 'department', value: f.dept});
var inventorydetail = ia.getCurrentSublistSubrecord({sublistId: 'inventory', fieldId: 'inventorydetail'});
inventorydetail.selectNewLine({sublistId: 'inventoryassignment'});
// 库存减少用issueinventorynumber,增加用receiptinventorynumber
// inventorydetail.setCurrentSublistValue({sublistId: 'inventoryassignment', fieldId: 'issueinventorynumber', value: LOT_NUMBER});
inventorydetail.setCurrentSublistValue({sublistId: 'inventoryassignment', fieldId: 'receiptinventorynumber', value: LOT_NUMBER});
inventorydetail.setCurrentSublistValue({sublistId: 'inventoryassignment', fieldId: 'binnumber', value: f.bin});
inventorydetail.setCurrentSublistValue({sublistId: 'inventoryassignment', fieldId: 'quantity', value: f.qty});
inventorydetail.commitLine({sublistId: 'inventoryassignment'});
ia.commitLine({sublistId: 'inventory'});
});
});
if (ia.getLineCount({sublistId: 'inventory'})) {
ia.save();
log.debug('info', info);
} else {
log.error('找不到ITEM', info);
}
}
/**
* Executes when the summarize entry point is triggered and applies to the result set.
*
* @param {Summary} summary - Holds statistics regarding the execution of a map/reduce script
* @since 2015.1
*/
function summarize(summary) {
summarization.summarize(summary);
}
return {
getInputData: getInputData,
map: map,
reduce: reduce,
summarize: summarize
};
});