/*
 * Decompiled with CFR 0.152.
 */
package com.cognos.xqe.resultsets;

import com.cognos.xqe.config.ServiceEnumeration;
import com.cognos.xqe.data.types.IDataType;
import com.cognos.xqe.data.values.DataValueFactory;
import com.cognos.xqe.data.values.IRow;
import com.cognos.xqe.data.values.RowValue;
import com.cognos.xqe.exception.XQERuntimeException;
import com.cognos.xqe.resultset.interfaces.IIterator;
import com.cognos.xqe.runtree.XDataContext;
import com.cognos.xqe.runtree.XTabularIterator;
import com.cognos.xqe.trace.LogLevel;
import com.cognos.xqe.trace.XQEDebugLog;
import com.cognos.xqe.trace.XQELog;
import com.cognos.xqe.trace.XQELogger;
import com.cognos.xqe.util.concurrent.ThreadPool;
import com.cognos.xqe.util.context.ExecutionEnvironmentContext;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

public class ExchangeIterator
extends XTabularIterator {
    private final Thread mainThread = Thread.currentThread();
    private static final IRow POISON = DataValueFactory.createRowValue(new IDataType[0]);
    private BlockingQueue<IRow> queue;
    private AtomicReference<XQERuntimeException> exception = new AtomicReference();
    private IIterator tabIt;
    private static final int DEFAULT_CAPACITY = 50;
    private String name;
    private static XQELogger mErrorLogger = XQELog.getLogger(ServiceEnumeration.XQE, "XQE", "Exception", LogLevel.ERROR);

    public ExchangeIterator(XDataContext context, IIterator uIterator) {
        this(context, uIterator, 50);
    }

    public ExchangeIterator(XDataContext context, IIterator uIterator, int capacity) {
        this(context, new LinkedBlockingQueue<IRow>(capacity));
        this.tabIt = uIterator;
        ThreadPool.getInstance().submit(new Producer());
    }

    public ExchangeIterator(XDataContext context, LinkedBlockingQueue<IRow> theQueue) {
        super(context, context.getNodeId());
        this.queue = theQueue;
        this.name = String.format("ExchangeIterator[%s]", this.queue.hashCode());
    }

    @Override
    public Object nextImpl() {
        IRow row = null;
        try {
            row = this.queue.take();
            if (row.getNumColumns() == 0) {
                XQEDebugLog.out.println(String.format("%s: read poison after consuming %d rows", this.name, this.nRows));
                return null;
            }
            ++this.nRows;
        }
        catch (InterruptedException e) {
            mErrorLogger.log(e);
            if (this.exception.get() != null) {
                throw this.exception.get();
            }
            row = null;
        }
        return row;
    }

    public boolean hasData() {
        return !this.queue.isEmpty();
    }

    @Override
    public void release() {
        super.release();
        if (this.tabIt != null) {
            this.tabIt.release();
        }
        XQEDebugLog.out.println(String.format("%s: consumed %d rows", this.name, this.nRows));
    }

    private class Producer
    implements Runnable {
        Producer() {
        }

        @Override
        public void run() {
            ExecutionEnvironmentContext executionEnvironmentContext = ExecutionEnvironmentContext.enter(ExchangeIterator.this.context.getEnvironment());
            try {
                while (true) {
                    IRow row;
                    if ((row = (IRow)ExchangeIterator.this.tabIt.next()) == null) {
                        ExchangeIterator.this.queue.put(POISON);
                        break;
                    }
                    if (row instanceof RowValue) {
                        ExchangeIterator.this.queue.put((IRow)((RowValue)row).copy(false));
                        continue;
                    }
                    ExchangeIterator.this.queue.put((IRow)row.copy());
                }
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
            catch (XQERuntimeException e) {
                ExchangeIterator.this.exception.set(e);
                ExchangeIterator.this.mainThread.interrupt();
            }
            finally {
                executionEnvironmentContext.exit();
            }
        }
    }
}

