/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.controlprogram.parfor.mqo;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.sysml.lops.runtime.RunMRJobs;
import org.apache.sysml.runtime.controlprogram.parfor.mqo.MergedMRJobInstruction;
import org.apache.sysml.runtime.controlprogram.parfor.mqo.PiggybackingWorker;
import org.apache.sysml.runtime.controlprogram.parfor.mqo.RuntimePiggybacking;
import org.apache.sysml.runtime.controlprogram.parfor.mqo.RuntimePiggybackingUtils;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.matrix.JobReturn;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.utils.Statistics;

public class PiggybackingWorkerUtilDecayParallel
extends PiggybackingWorker {
    private static long MIN_MERGE_INTERVAL = 1000L;
    private static double UTILIZATION_DECAY = 0.5;
    private ExecutorService _parSubmit = null;
    private long _minTime = -1L;
    private double _utilDecay = -1.0;
    private int _par = -1;

    public PiggybackingWorkerUtilDecayParallel(int par) {
        this(MIN_MERGE_INTERVAL, UTILIZATION_DECAY, par);
    }

    public PiggybackingWorkerUtilDecayParallel(long minInterval, double utilDecay, int par) {
        this._minTime = minInterval;
        this._utilDecay = utilDecay;
        this._par = par;
        this._parSubmit = Executors.newFixedThreadPool(this._par);
    }

    @Override
    public void setStopped() {
        super.setStopped();
        this._parSubmit.shutdown();
    }

    @Override
    public void run() {
        long lastTime = System.currentTimeMillis();
        while (!this._stop) {
            try {
                LinkedList<Pair<Long, MRJobInstruction>> workingSet;
                double utilThreshold;
                double util;
                long currentTime = System.currentTimeMillis() + 1L;
                Thread.sleep(this._minTime);
                if (RuntimePiggybacking.isEmptyJobPool() || (util = RuntimePiggybackingUtils.getCurrentClusterUtilization()) > (utilThreshold = 1.0 - Math.pow(this._utilDecay, Math.ceil(((double)currentTime - (double)lastTime) / 60000.0))) || (workingSet = RuntimePiggybacking.getMaxWorkingSet()) == null) continue;
                LinkedList<MergedMRJobInstruction> mergedWorkingSet = this.mergeMRJobInstructions(workingSet);
                for (MergedMRJobInstruction minst : mergedWorkingSet) {
                    this._parSubmit.execute(new MRJobSubmitTask(minst));
                }
                lastTime = currentTime;
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    public class MRJobSubmitTask
    implements Runnable {
        private MergedMRJobInstruction _minst = null;

        public MRJobSubmitTask(MergedMRJobInstruction minst) {
            this._minst = minst;
        }

        @Override
        public void run() {
            try {
                JobReturn mret = RunMRJobs.submitJob(this._minst.inst);
                Statistics.incrementNoOfExecutedMRJobs();
                if (!mret.successful) {
                    PiggybackingWorker.LOG.error("Failed to run merged mr-job instruction:\n" + this._minst.inst.toString());
                }
                LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
                for (Long id : this._minst.ids) {
                    ret.add(this._minst.constructJobReturn(id, mret));
                    Statistics.decrementNoOfExecutedMRJobs();
                }
                PiggybackingWorkerUtilDecayParallel.this.putJobResults(this._minst.ids, ret);
            }
            catch (Exception ex) {
                PiggybackingWorker.LOG.error("Failed to run merged mr-job instruction:\n" + this._minst.inst.toString(), ex);
                LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
                for (Long id : this._minst.ids) {
                    JobReturn fret = new JobReturn(new MatrixCharacteristics[this._minst.outIxLens.get(id).intValue()], false);
                    ret.add(this._minst.constructJobReturn(id, fret));
                    Statistics.decrementNoOfExecutedMRJobs();
                }
                PiggybackingWorkerUtilDecayParallel.this.putJobResults(this._minst.ids, ret);
            }
        }
    }
}

