package com.taobao.api.internal.stream;

import com.omesoft.firstaid.util.Constants;
import com.taobao.api.internal.stream.connect.ConnectionLifeCycleListener;
import com.taobao.api.internal.stream.connect.HttpClient;
import com.taobao.api.internal.stream.message.MessageStreamImpl;
import com.taobao.api.internal.stream.message.StreamMsgConsumeFactory;
import com.taobao.api.internal.stream.message.TopCometMessageListener;
import com.taobao.api.internal.util.RequestParametersHolder;
import com.taobao.api.internal.util.StringUtils;
import com.taobao.api.internal.util.TaobaoHashMap;
import com.taobao.api.internal.util.TaobaoUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public class TopCometStreamImpl implements TopCometStream {
    private static final Logger logger = Logger.getLogger(TopCometStreamImpl.class);
    private TopCometMessageListener cometMessageListener;
    private Configuration conf;
    private ConnectionLifeCycleListener connectionListener;
    private StreamImplementation currentStreamImpl;
    private HttpClient httpClient;
    private String serverRespCode;
    private StreamMsgConsumeFactory msgConsumeFactory = null;
    private boolean stop = false;
    private ReentrantLock lock = new ReentrantLock();
    private Condition controlCondition = this.lock.newCondition();
    private int startConsumeThreadTimes = 0;
    private long lastStartConsumeThread = System.currentTimeMillis();

    /* loaded from: classes.dex */
    class ControlThread implements Runnable {
        private static final String threadName = "top-stream-consume-thread";
        private TopCometStreamConsume currentStreamConsume;
        private long lastCheckTime = 0;
        private boolean isClientReConnect = false;
        private boolean isStarted = false;

        ControlThread() {
        }

        private void startConsumeThread() {
            this.currentStreamConsume = new TopCometStreamConsume();
            new Thread(this.currentStreamConsume, threadName).start();
            this.lastCheckTime = TopCometStreamImpl.this.lastStartConsumeThread = System.currentTimeMillis();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!TopCometStreamImpl.this.stop) {
                try {
                    if (!StreamConstants.SERVER_DEPLOY.equals(TopCometStreamImpl.this.serverRespCode)) {
                        if (!StreamConstants.SERVER_REHASH.equals(TopCometStreamImpl.this.serverRespCode)) {
                            if ((StreamConstants.CLIENT_KICKOFF.equals(TopCometStreamImpl.this.serverRespCode) && !this.isClientReConnect) || StreamConstants.SERVER_KICKOFF.equals(TopCometStreamImpl.this.serverRespCode)) {
                                break;
                            }
                            if (StreamConstants.CONNECT_REACH_MAX_TIME.equals(TopCometStreamImpl.this.serverRespCode)) {
                                startConsumeThread();
                            } else if (StreamConstants.RECONNECT.equals(TopCometStreamImpl.this.serverRespCode)) {
                                startConsumeThread();
                            } else if (!this.isStarted) {
                                startConsumeThread();
                                this.isStarted = true;
                            }
                        } else {
                            startConsumeThread();
                        }
                    } else {
                        if (TopCometStreamImpl.logger.isDebugEnabled()) {
                            TopCometStreamImpl.logger.debug("Server is upgrade sleep " + TopCometStreamImpl.this.conf.getSleepTimeOfServerInUpgrade() + " seconds");
                        }
                        try {
                            Thread.sleep(TopCometStreamImpl.this.conf.getSleepTimeOfServerInUpgrade() * Constants.POISEARCH);
                        } catch (InterruptedException e) {
                        }
                        startConsumeThread();
                    }
                    if (System.currentTimeMillis() - this.lastCheckTime < (TopCometStreamImpl.this.conf.getHttpReconnectInterval() - 300) * Constants.POISEARCH) {
                        try {
                            try {
                                TopCometStreamImpl.this.lock.lock();
                                TopCometStreamImpl.this.serverRespCode = null;
                                TopCometStreamImpl.this.controlCondition.await(5L, TimeUnit.MINUTES);
                            } catch (Exception e2) {
                                TopCometStreamImpl.this.lock.unlock();
                            }
                        } catch (Throwable th) {
                            throw th;
                            break;
                        }
                    } else {
                        TopCometStreamImpl.this.serverRespCode = StreamConstants.RECONNECT;
                        this.currentStreamConsume.notify.set(false);
                        this.isClientReConnect = true;
                        if (TopCometStreamImpl.this.connectionListener != null) {
                            TopCometStreamImpl.this.connectionListener.onReconnect();
                        }
                    }
                } finally {
                    try {
                    } catch (Exception e3) {
                    } finally {
                    }
                }
            }
            if (TopCometStreamImpl.this.currentStreamImpl != null) {
                try {
                    TopCometStreamImpl.this.currentStreamImpl.close();
                } catch (IOException e4) {
                }
            }
            TopCometStreamImpl.this.msgConsumeFactory.shutdown();
            TopCometStreamImpl.logger.info("Stop stream consume");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public class TopCometStreamConsume implements Runnable {
        private boolean closed = false;
        protected AtomicBoolean notify = new AtomicBoolean(true);
        private StreamImplementation stream;

        TopCometStreamConsume() {
        }

        StreamImplementation getStream() throws TopCometSysErrorException {
            return TopCometStreamImpl.this.getMsgStreamImpl();
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (!this.closed) {
                try {
                    if (!this.closed && this.stream == null) {
                        if (TopCometStreamImpl.logger.isDebugEnabled()) {
                            TopCometStreamImpl.logger.debug("Establishing connection.");
                        }
                        this.stream = getStream();
                        z = true;
                        if (TopCometStreamImpl.logger.isDebugEnabled()) {
                            TopCometStreamImpl.logger.info("Connection established.");
                        }
                        if (TopCometStreamImpl.this.connectionListener != null) {
                            try {
                                TopCometStreamImpl.this.connectionListener.onConnect();
                            } catch (Exception e) {
                                TopCometStreamImpl.logger.warn(e.getMessage());
                            }
                        }
                        if (TopCometStreamImpl.logger.isDebugEnabled()) {
                            TopCometStreamImpl.logger.debug("Receiving message stream.");
                        }
                        while (!this.closed) {
                            try {
                                this.stream.nextMsg();
                            } catch (IOException e2) {
                                this.stream.onException(e2);
                                this.closed = true;
                                if (TopCometStreamImpl.this.connectionListener != null) {
                                    TopCometStreamImpl.this.connectionListener.onReadTimeout();
                                }
                                throw e2;
                                break;
                            }
                        }
                    }
                } catch (Exception e3) {
                    TopCometStreamImpl.logger.error(e3.getMessage(), e3);
                    if (this.stream != null) {
                        try {
                            this.stream.close();
                        } catch (IOException e4) {
                            TopCometStreamImpl.logger.error(e4, e4);
                        }
                    }
                    this.stream = null;
                    z = false;
                    this.closed = true;
                    if (e3 instanceof TopCometSysErrorException) {
                        TopCometStreamImpl.this.stop = true;
                        if (TopCometStreamImpl.this.connectionListener != null) {
                            TopCometStreamImpl.this.connectionListener.onSysErrorException(e3);
                        }
                    } else {
                        if (TopCometStreamImpl.this.connectionListener != null) {
                            TopCometStreamImpl.this.connectionListener.onException(e3);
                        }
                        if (System.currentTimeMillis() - TopCometStreamImpl.this.lastStartConsumeThread < 1800000) {
                            TopCometStreamImpl.access$1008(TopCometStreamImpl.this);
                            if (TopCometStreamImpl.this.startConsumeThreadTimes >= 10) {
                                TopCometStreamImpl.this.stop = true;
                                if (TopCometStreamImpl.this.connectionListener != null) {
                                    TopCometStreamImpl.this.connectionListener.onMaxReadTimeoutException();
                                }
                                TopCometStreamImpl.logger.error("Occure too many exception,stop the system,please check");
                            }
                        } else {
                            TopCometStreamImpl.this.startConsumeThreadTimes = 0;
                        }
                        TopCometStreamImpl.this.serverRespCode = StreamConstants.RECONNECT;
                    }
                    if (this.notify.get()) {
                        try {
                            TopCometStreamImpl.this.lock.lock();
                            TopCometStreamImpl.this.controlCondition.signalAll();
                        } catch (Exception e5) {
                        } finally {
                            TopCometStreamImpl.this.lock.unlock();
                        }
                    }
                    if (TopCometStreamImpl.this.connectionListener != null) {
                        try {
                            TopCometStreamImpl.this.connectionListener.onDisconnect();
                        } catch (Exception e6) {
                            TopCometStreamImpl.logger.warn(e6.getMessage());
                        }
                    }
                }
            }
            if (this.stream != null) {
                try {
                    if (z) {
                        try {
                            this.stream.close();
                            if (TopCometStreamImpl.this.connectionListener != null) {
                                try {
                                    TopCometStreamImpl.this.connectionListener.onDisconnect();
                                } catch (Exception e7) {
                                    TopCometStreamImpl.logger.warn(e7.getMessage());
                                }
                            }
                        } catch (Exception e8) {
                            TopCometStreamImpl.logger.warn(e8.getMessage(), e8);
                            if (TopCometStreamImpl.this.connectionListener != null) {
                                try {
                                    TopCometStreamImpl.this.connectionListener.onDisconnect();
                                } catch (Exception e9) {
                                    TopCometStreamImpl.logger.warn(e9.getMessage());
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (TopCometStreamImpl.this.connectionListener != null) {
                        try {
                            TopCometStreamImpl.this.connectionListener.onDisconnect();
                        } catch (Exception e10) {
                            TopCometStreamImpl.logger.warn(e10.getMessage());
                        }
                    }
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopCometStreamImpl(Configuration configuration) {
        this.conf = configuration;
    }

    static /* synthetic */ int access$1008(TopCometStreamImpl topCometStreamImpl) {
        int i = topCometStreamImpl.startConsumeThreadTimes;
        topCometStreamImpl.startConsumeThreadTimes = i + 1;
        return i;
    }

    public Condition getControlCondition() {
        return this.controlCondition;
    }

    public ReentrantLock getLock() {
        return this.lock;
    }

    public StreamImplementation getMsgStreamImpl() throws TopCometSysErrorException {
        try {
            MessageStreamImpl messageStreamImpl = new MessageStreamImpl(this.msgConsumeFactory, this.httpClient.post(), this.cometMessageListener, this);
            this.currentStreamImpl = messageStreamImpl;
            return messageStreamImpl;
        } catch (TopCometSysErrorException e) {
            if (this.connectionListener != null) {
                this.connectionListener.onConnectError(e);
            }
            throw e;
        }
    }

    @Override // com.taobao.api.internal.stream.TopCometStream
    public void setConnectionListener(ConnectionLifeCycleListener connectionLifeCycleListener) {
        this.connectionListener = connectionLifeCycleListener;
    }

    @Override // com.taobao.api.internal.stream.TopCometStream
    public void setMessageListener(TopCometMessageListener topCometMessageListener) {
        this.cometMessageListener = topCometMessageListener;
    }

    public void setServerRespCode(String str) {
        this.serverRespCode = str;
    }

    @Override // com.taobao.api.internal.stream.TopCometStream
    public void start() {
        if (this.cometMessageListener == null) {
            throw new RuntimeException("Comet message listener must not null");
        }
        TaobaoHashMap taobaoHashMap = new TaobaoHashMap();
        taobaoHashMap.put(StreamConstants.PARAM_APPKEY, this.conf.getAppkey());
        if (!StringUtils.isEmpty(this.conf.getUserid())) {
            taobaoHashMap.put(StreamConstants.PARAM_USERID, this.conf.getUserid());
        }
        taobaoHashMap.put(StreamConstants.PARAM_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
        RequestParametersHolder requestParametersHolder = new RequestParametersHolder();
        requestParametersHolder.setProtocalMustParams(taobaoHashMap);
        try {
            String signTopRequestNew = TaobaoUtils.signTopRequestNew(requestParametersHolder, this.conf.getSecret(), false);
            if (StringUtils.isEmpty(signTopRequestNew)) {
                throw new RuntimeException("Get sign error");
            }
            taobaoHashMap.put("sign", signTopRequestNew);
            this.httpClient = new HttpClient(this.conf, taobaoHashMap);
            this.msgConsumeFactory = new StreamMsgConsumeFactory(this.conf.getMinThreads(), this.conf.getMaxThreads(), this.conf.getQueueSize());
            new Thread(new ControlThread(), "stream-control-thread").start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.taobao.api.internal.stream.TopCometStream
    public void stop() {
        this.stop = true;
        try {
            this.lock.lock();
            this.controlCondition.signalAll();
        } catch (Exception e) {
        } finally {
            this.lock.unlock();
        }
    }
}
