/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.matrix;

import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.instructions.MRInstructionParser;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.instructions.mr.AggregateBinaryInstruction;
import org.apache.sysml.runtime.matrix.JobReturn;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
import org.apache.sysml.runtime.matrix.mapred.MMCJMRMapper;
import org.apache.sysml.runtime.matrix.mapred.MMCJMRReducerWithAggregator;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.yarn.DMLAppMasterUtils;
import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer;

public class MMCJMR {
    private static final boolean AUTOMATIC_CONFIG_NUM_REDUCERS = true;
    private static final Log LOG = LogFactory.getLog(MMCJMR.class);

    private MMCJMR() {
    }

    public static JobReturn runJob(MRJobInstruction inst, String[] inputs, InputInfo[] inputInfos, long[] rlens, long[] clens, int[] brlens, int[] bclens, String instructionsInMapper, String aggInstructionsInReducer, String aggBinInstrction, int numReducers, int replication, String output, OutputInfo outputinfo) throws Exception {
        long numBlocks;
        JobConf job = new JobConf(MMCJMR.class);
        boolean inBlockRepresentation = MRJobConfiguration.deriveRepresentation(inputInfos);
        byte resultDimsUnknown = 0;
        MatrixCharacteristics[] stats = MMCJMR.commonSetup(job, inBlockRepresentation, inputs, inputInfos, rlens, clens, brlens, bclens, instructionsInMapper, aggInstructionsInReducer, aggBinInstrction, numReducers, replication, resultDimsUnknown, output, outputinfo);
        if (LOG.isTraceEnabled()) {
            inst.printCompleteMRJobInstruction(stats);
        }
        if (stats[0].getRows() == -1L || stats[0].getCols() == -1L) {
            resultDimsUnknown = 1;
            byte[] resultIndexes = new byte[]{MRInstructionParser.parseSingleInstruction((String)aggBinInstrction).output};
            byte[] resultDimsUnknown_Array = new byte[]{resultDimsUnknown};
            MRJobConfiguration.setUpMultipleOutputs(job, resultIndexes, resultDimsUnknown_Array, new String[]{output}, new OutputInfo[]{outputinfo}, inBlockRepresentation);
        }
        AggregateBinaryInstruction ins = (AggregateBinaryInstruction)MRInstructionParser.parseSingleInstruction(aggBinInstrction);
        MatrixCharacteristics dim1 = MRJobConfiguration.getMatrixCharactristicsForBinAgg(job, ins.input1);
        MatrixCharacteristics dim2 = MRJobConfiguration.getMatrixCharactristicsForBinAgg(job, ins.input2);
        if ((long)dim1.getRowsPerBlock() > dim1.getRows()) {
            dim1.setRowsPerBlock((int)dim1.getRows());
        }
        if ((long)dim1.getColsPerBlock() > dim1.getCols()) {
            dim1.setColsPerBlock((int)dim1.getCols());
        }
        if ((long)dim2.getRowsPerBlock() > dim2.getRows()) {
            dim2.setRowsPerBlock((int)dim2.getRows());
        }
        if ((long)dim2.getColsPerBlock() > dim2.getCols()) {
            dim2.setColsPerBlock((int)dim2.getCols());
        }
        long blockSize1 = 77 + 8 * dim1.getRowsPerBlock() * dim1.getColsPerBlock();
        long blockSize2 = 77 + 8 * dim2.getRowsPerBlock() * dim2.getColsPerBlock();
        long blockSizeResult = 77 + 8 * dim1.getRowsPerBlock() * dim2.getColsPerBlock();
        long cacheSize = -1L;
        if (dim1.getRows() < dim2.getCols()) {
            numBlocks = (long)Math.ceil((double)dim1.getRows() / (double)dim1.getRowsPerBlock());
            cacheSize = numBlocks * (20L + blockSize1) + 32L;
        } else {
            numBlocks = (long)Math.ceil((double)dim2.getCols() / (double)dim2.getColsPerBlock());
            cacheSize = numBlocks * (20L + blockSize2) + 32L;
        }
        MRJobConfiguration.setMMCJCacheSize(job, (int)(cacheSize += 2L * Math.max(blockSize1, blockSize2) + blockSizeResult + (long)MRJobConfiguration.getMiscMemRequired(job)));
        MRJobConfiguration.setUniqueWorkingDir(job);
        RunningJob runjob = JobClient.runJob(job);
        int outputIndex = 0;
        Byte outputMatrixID = MRInstructionParser.parseSingleInstruction((String)aggBinInstrction).output;
        Counters.Group group = runjob.getCounters().getGroup("nonzeros");
        stats[outputIndex].setNonZeros(group.getCounter(Byte.toString(outputMatrixID)));
        return new JobReturn(stats[outputIndex], outputinfo, runjob.isSuccessful());
    }

    private static MatrixCharacteristics[] commonSetup(JobConf job, boolean inBlockRepresentation, String[] inputs, InputInfo[] inputInfos, long[] rlens, long[] clens, int[] brlens, int[] bclens, String instructionsInMapper, String aggInstructionsInReducer, String aggBinInstrction, int numReducers, int replication, byte resultDimsUnknown, String output, OutputInfo outputinfo) throws Exception {
        job.setJobName("MMCJ-MR");
        if (numReducers <= 0) {
            throw new Exception("MMCJ-MR has to have at least one reduce task!");
        }
        MRJobConfiguration.setMatrixValueClass(job, inBlockRepresentation);
        byte[] realIndexes = new byte[inputs.length];
        for (int b = 0; b < realIndexes.length; b = (int)((byte)(b + 1))) {
            realIndexes[b] = b;
        }
        MRJobConfiguration.setUpMultipleInputs(job, realIndexes, inputs, inputInfos, brlens, bclens, true, inBlockRepresentation ? MRJobConfiguration.ConvertTarget.BLOCK : MRJobConfiguration.ConvertTarget.CELL);
        MRJobConfiguration.setMatricesDimensions(job, realIndexes, rlens, clens);
        MRJobConfiguration.setBlocksSizes(job, realIndexes, brlens, bclens);
        MRJobConfiguration.setInstructionsInMapper(job, instructionsInMapper);
        MRJobConfiguration.setAggregateInstructions(job, aggInstructionsInReducer);
        MRJobConfiguration.setAggregateBinaryInstructions(job, aggBinInstrction);
        job.setInt("dfs.replication", replication);
        MRJobConfiguration.addBinaryBlockSerializationFramework(job);
        DMLConfig config = ConfigurationManager.getDMLConfig();
        DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
        MRJobConfiguration.setupCustomMRConfigurations(job, config);
        byte[] resultIndexes = new byte[]{MRInstructionParser.parseSingleInstruction((String)aggBinInstrction).output};
        byte[] resultDimsUnknown_Array = new byte[]{resultDimsUnknown};
        HashSet<Byte> mapoutputIndexes = MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes, instructionsInMapper, aggInstructionsInReducer, aggBinInstrction, resultIndexes);
        MRJobConfiguration.setUpMultipleOutputs(job, resultIndexes, resultDimsUnknown_Array, new String[]{output}, new OutputInfo[]{outputinfo}, inBlockRepresentation);
        job.setMapperClass(MMCJMRMapper.class);
        job.setMapOutputKeyClass(TaggedFirstSecondIndexes.class);
        if (inBlockRepresentation) {
            job.setMapOutputValueClass(MatrixBlock.class);
        } else {
            job.setMapOutputValueClass(MatrixCell.class);
        }
        job.setOutputKeyComparatorClass(TaggedFirstSecondIndexes.Comparator.class);
        job.setPartitionerClass(TaggedFirstSecondIndexes.FirstIndexPartitioner.class);
        MRJobConfiguration.MatrixChar_N_ReducerGroups ret = MRJobConfiguration.computeMatrixCharacteristics(job, realIndexes, instructionsInMapper, aggInstructionsInReducer, aggBinInstrction, null, resultIndexes, mapoutputIndexes, true);
        int numRed = MMCJMR.determineNumReducers(rlens, clens, numReducers, ret.numReducerGroups);
        job.setNumReduceTasks(numRed);
        job.setReducerClass(MMCJMRReducerWithAggregator.class);
        return ret.stats;
    }

    protected static int determineNumReducers(long[] rlen, long[] clen, int defaultNumRed, long numRedGroups) {
        int ret = defaultNumRed;
        long maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks();
        long blockSize = InfrastructureAnalyzer.getHDFSBlockSize() / 0x100000L;
        long maxSize = -1L;
        for (int i = 0; i < rlen.length; ++i) {
            long tmp = MatrixBlock.estimateSizeOnDisk(rlen[i], clen[i], rlen[i] * clen[i]) / 0x100000L;
            maxSize = Math.max(maxSize, tmp);
        }
        if (InfrastructureAnalyzer.isYarnEnabled()) {
            maxNumRed = Math.max(maxNumRed, YarnClusterAnalyzer.getNumCores() / 2L);
        }
        ret = (int)Math.max((long)ret, Math.min(maxSize / blockSize, maxNumRed));
        ret = (int)Math.min((long)ret, numRedGroups);
        ret = Math.max(ret, 1);
        return ret;
    }
}

