/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.ws.rtcomm.internal;

import com.ibm.json.java.JSONObject;
import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.rtcomm.internal.RTCommUtils;
import com.ibm.wsspi.kernel.service.utils.FrameworkState;
import com.ibm.wsspi.rtcomm.RTCommException;
import com.ibm.wsspi.rtcomm.RTCommProvider;
import com.ibm.wsspi.rtcomm.RTCommProviderListener;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import javax.net.SocketFactory;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RTCommProviderImpl
extends RTCommProvider {
    private static final TraceComponent tc = Tr.register(RTCommProviderImpl.class, (String)"Rtcomm", (String)"com.ibm.ws.rtcomm.resources.Rtcomm");
    public static final String ENCODING_UTF_8 = "UTF-8";
    private static final int MAX_CLIENTID_LENGTH = 23;
    private static final int CONNECTION_RETRY_TIMEOUT = 60000;
    private static final int CONNECTION_TIMEOUT = 5;
    private static Timer timer = new Timer(true);
    private static Vector<String> mqttClientIDs = new Vector();
    private static String serverID = null;
    private static int connectionTimeout = 5;
    private static Vector<String> successfulConnections = new Vector();
    private static Vector<String> unsuccessfulConnections = new Vector();
    private static SocketFactory socketFactory = null;
    private String primaryMessagingServerURL = null;
    private String secondaryMessagingServerURL = null;
    private RTCommProviderListener rtCommListener = null;
    private String mqttClientID = null;
    private IMqttAsyncClient mqttClient = null;
    private RTCommUtils utils = null;
    private ConnectionRetryTimer connectionRetryTimer = new ConnectionRetryTimer();
    private String connectedServer = null;
    private String clientLWTTopicName = null;
    private JSONObject clientLWTObject = null;

    public static void setConnectionTimeout(int localConnectionTimeout) {
        connectionTimeout = localConnectionTimeout;
    }

    public static void setServerID(String tempServerID) {
        serverID = tempServerID;
    }

    public static void setSocketFactory(SocketFactory localFactory) {
        socketFactory = localFactory;
    }

    public RTCommProviderImpl(String primaryMessageServerURL, String secondaryMessageServerURL) {
        this.primaryMessagingServerURL = primaryMessageServerURL;
        this.secondaryMessagingServerURL = secondaryMessageServerURL;
    }

    @Override
    public void registerListener(RTCommProviderListener listener, String sharedTopicName, String serverTopicName, String sphereTopicName) throws RTCommException {
        if (this.rtCommListener != null) {
            throw new RTCommException("RTCommListener already exist!");
        }
        this.rtCommListener = listener;
        this.utils = new RTCommUtils();
        timer.schedule((TimerTask)this.connectionRetryTimer, 60000L, 60000L);
        this.utils.setSharedTopicName(sharedTopicName);
        this.utils.setServerTopicName(serverTopicName);
        this.utils.setSphereTopicName(sphereTopicName);
        this.connectToServer();
    }

    @Override
    public void unregisterListener(RTCommProviderListener listener) throws RTCommException {
        if (this.rtCommListener == listener) {
            block12: {
                try {
                    if (this.utils.getSharedTopicName() != null) {
                        String tempSharedTopicName = this.utils.getSharedTopicName() + "/#";
                        if (tc.isDebugEnabled()) {
                            Tr.debug((TraceComponent)tc, (String)("unsubscribing to " + tempSharedTopicName), (Object[])new Object[0]);
                        }
                        this.unsubscribe(tempSharedTopicName);
                    }
                    if (this.utils.getServerTopicName() != null) {
                        String tempChannelTopicName = this.utils.getServerTopicName() + "/#";
                        if (tc.isDebugEnabled()) {
                            Tr.debug((TraceComponent)tc, (String)("unsubscribing to " + tempChannelTopicName), (Object[])new Object[0]);
                        }
                        this.unsubscribe(tempChannelTopicName);
                    }
                    if (this.utils.getSphereTopicName() != null) {
                        String tempSphereTopicName = this.utils.getSphereTopicName() + "/#";
                        if (tc.isDebugEnabled()) {
                            Tr.debug((TraceComponent)tc, (String)("unsubscribing to " + tempSphereTopicName), (Object[])new Object[0]);
                        }
                        this.unsubscribe(tempSphereTopicName);
                    }
                    if (this.mqttClient.isConnected()) {
                        if (tc.isDebugEnabled()) {
                            Tr.debug((TraceComponent)tc, (String)("disconnecting from Server, clientID = " + this.mqttClient.getClientId()), (Object[])new Object[0]);
                        }
                        this.mqttClient.disconnect();
                    }
                    if (this.connectionRetryTimer != null) {
                        this.connectionRetryTimer.cancel();
                    }
                }
                catch (MqttException e) {
                    if (!tc.isDebugEnabled()) break block12;
                    Tr.debug((TraceComponent)tc, (String)"unregisterListener: MqttException", (Object[])new Object[]{e.getLocalizedMessage(), e.getStackTrace()});
                }
            }
            this.rtCommListener = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendMessage(JSONObject message, String destTopic, String fromEndpointID, boolean retain) throws RTCommException {
        block12: {
            MqttMessage mqttMessage = null;
            String outputObject = null;
            String mqttTopicString = destTopic;
            if (fromEndpointID != null) {
                mqttTopicString = destTopic + "/" + fromEndpointID;
            }
            try {
                if (message != null) {
                    outputObject = message.serialize();
                }
            }
            catch (Exception ex) {
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("IOException occured converting message = " + ex), (Object[])new Object[0]);
                }
                throw new RTCommException(ex.getMessage());
            }
            if (tc.isDebugEnabled()) {
                Tr.debug((TraceComponent)tc, (String)("publishMessage: toTopic = " + mqttTopicString), (Object[])new Object[0]);
                Tr.debug((TraceComponent)tc, (String)("publishMessage: retain = " + retain + "    outputObject = " + outputObject), (Object[])new Object[0]);
            }
            try {
                mqttMessage = new MqttMessage();
                if (outputObject != null) {
                    mqttMessage.setPayload(outputObject.getBytes(ENCODING_UTF_8));
                }
                mqttMessage.setQos(0);
                mqttMessage.setRetained(retain);
                IMqttAsyncClient ex = this.mqttClient;
                synchronized (ex) {
                    this.mqttClient.publish(mqttTopicString, mqttMessage);
                }
            }
            catch (Exception ex) {
                if (!tc.isDebugEnabled()) break block12;
                Tr.debug((TraceComponent)tc, (String)("Exception occurred publishing msg = " + ex), (Object[])new Object[0]);
            }
        }
    }

    @Override
    public String getClientID() {
        return this.mqttClientID;
    }

    @Override
    public void setWill(String topicName, String fromEndpointID, JSONObject object) {
        this.clientLWTTopicName = topicName + "/" + fromEndpointID;
        this.clientLWTObject = object;
    }

    private void connectToServer() throws RTCommException {
        try {
            MqttConnectOptions connectOptions;
            MemoryPersistence persistence;
            block28: {
                this.mqttClientID = RTCommProviderImpl.generateMQTTClientID(this.utils);
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("connectToServer: mqttClientID = " + this.mqttClientID.toString() + "    connectionTimeout = " + connectionTimeout), (Object[])new Object[0]);
                }
                persistence = new MemoryPersistence();
                connectOptions = new MqttConnectOptions();
                connectOptions.setMqttVersion(3);
                connectOptions.setConnectionTimeout(connectionTimeout);
                if (this.clientLWTTopicName != null) {
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)("connectToServer: Setting Will:  topic = " + this.clientLWTTopicName + "   msg = " + this.clientLWTObject), (Object[])new Object[0]);
                    }
                    connectOptions.setWill(this.clientLWTTopicName, this.clientLWTObject.serialize().getBytes(ENCODING_UTF_8), 0, false);
                }
                if (socketFactory != null) {
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)"connectToServer: Setting socket factory", (Object[])new Object[0]);
                    }
                    connectOptions.setSocketFactory(socketFactory);
                }
                try {
                    this.mqttClient = new MqttAsyncClient(this.primaryMessagingServerURL, this.mqttClientID, (MqttClientPersistence)persistence);
                    this.mqttClient.setCallback((MqttCallback)new MyCallback());
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)("Connecting to Primary Server at URL " + this.primaryMessagingServerURL), (Object[])new Object[0]);
                    }
                    this.mqttClient.connect(connectOptions).waitForCompletion((long)((connectionTimeout + 1) * 1000));
                    this.connectedServer = this.primaryMessagingServerURL;
                }
                catch (Exception ex) {
                    if (!tc.isDebugEnabled()) break block28;
                    Tr.debug((TraceComponent)tc, (String)"RTCommProviderImpl: exception occurred during connect to Primary Server URL", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                }
            }
            if (!this.mqttClient.isConnected()) {
                this.reportUnsuccessfulConnection(this.primaryMessagingServerURL);
                if (this.primaryMessagingServerURL.compareTo(this.secondaryMessagingServerURL) == 0) {
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)"Unable to connect to primary message server URL.    The secondary message server URL is the same as the primary", (Object[])new Object[0]);
                    }
                    return;
                }
                try {
                    this.mqttClient = new MqttAsyncClient(this.secondaryMessagingServerURL, this.mqttClientID, (MqttClientPersistence)persistence);
                    this.mqttClient.setCallback((MqttCallback)new MyCallback());
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)("Connecting to Secondary Server at address " + this.secondaryMessagingServerURL), (Object[])new Object[0]);
                    }
                    this.mqttClient.connect(connectOptions).waitForCompletion((long)((connectionTimeout + 1) * 1000));
                    this.connectedServer = this.secondaryMessagingServerURL;
                }
                catch (MqttException ex) {
                    this.reportUnsuccessfulConnection(this.secondaryMessagingServerURL);
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)"exception occurred during connect to Secondary Server address", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                    }
                    return;
                }
            }
            this.reportSuccessfulConnection(this.connectedServer);
            if (this.utils.getSharedTopicName() != null) {
                String tempSharedTopicName = this.utils.getSharedTopicName() + "/#";
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("subscribing to " + tempSharedTopicName), (Object[])new Object[0]);
                }
                this.subscribe(tempSharedTopicName);
            }
            if (this.utils.getServerTopicName() != null) {
                String tempChannelTopicName = this.utils.getServerTopicName() + "/#";
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("subscribing to " + tempChannelTopicName), (Object[])new Object[0]);
                }
                this.subscribe(tempChannelTopicName);
            }
            if (this.utils.getSphereTopicName() != null) {
                String tempSphereTopicName = this.utils.getSphereTopicName() + "/#";
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("subscribing to " + tempSphereTopicName), (Object[])new Object[0]);
                }
                this.subscribe(tempSphereTopicName);
            }
        }
        catch (MqttException ex) {
            if (tc.isDebugEnabled()) {
                Tr.debug((TraceComponent)tc, (String)("clientID (" + this.mqttClient.getClientId() + ")  exception occurred during MQTT connect / subscribe = " + (Object)((Object)ex)), (Object[])new Object[0]);
            }
            this.connectedServer = null;
            throw new RTCommException(ex.getMessage());
        }
        catch (IOException ex2) {
            if (tc.isDebugEnabled()) {
                Tr.debug((TraceComponent)tc, (String)("clientID (" + this.mqttClient.getClientId() + ")  exception occurred during MQTT connect / subscribe = " + ex2), (Object[])new Object[0]);
            }
            this.connectedServer = null;
            throw new RTCommException(ex2.getMessage());
        }
        catch (IllegalArgumentException ex3) {
            if (tc.isDebugEnabled()) {
                Tr.debug((TraceComponent)tc, (String)("clientID (" + this.mqttClient.getClientId() + ")  exception occurred during MQTT connect / subscribe = " + ex3), (Object[])new Object[0]);
            }
            this.connectedServer = null;
            throw new RTCommException(ex3.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportSuccessfulConnection(String tempServerName) {
        if (tc.isDebugEnabled()) {
            Tr.debug((TraceComponent)tc, (String)("successful connection to -   " + tempServerName), (Object[])new Object[0]);
        }
        Vector<String> vector = successfulConnections;
        synchronized (vector) {
            if (tc.isInfoEnabled()) {
                if (!successfulConnections.contains(tempServerName)) {
                    successfulConnections.add(tempServerName);
                    String[] tempArray = new String[]{tempServerName};
                    Tr.info((TraceComponent)tc, (String)"CWRTC0001I", (Object[])tempArray);
                }
                if (unsuccessfulConnections.contains(tempServerName)) {
                    unsuccessfulConnections.remove(tempServerName);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportUnsuccessfulConnection(String tempServerName) {
        if (tc.isDebugEnabled()) {
            Tr.debug((TraceComponent)tc, (String)("unsuccessful connection to -   " + tempServerName), (Object[])new Object[0]);
        }
        Vector<String> vector = successfulConnections;
        synchronized (vector) {
            if (tc.isErrorEnabled()) {
                if (!unsuccessfulConnections.contains(tempServerName)) {
                    unsuccessfulConnections.add(tempServerName);
                    String[] tempArray = new String[]{tempServerName};
                    Tr.error((TraceComponent)tc, (String)"CWRTC0002E", (Object[])tempArray);
                }
                if (successfulConnections.contains(tempServerName)) {
                    successfulConnections.remove(tempServerName);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportLostConnection(String tempServerName) {
        if (tc.isDebugEnabled()) {
            Tr.debug((TraceComponent)tc, (String)("lost connection to -   " + tempServerName), (Object[])new Object[0]);
        }
        Vector<String> vector = successfulConnections;
        synchronized (vector) {
            if (tc.isErrorEnabled()) {
                if (!unsuccessfulConnections.contains(tempServerName) && !FrameworkState.isStopping()) {
                    unsuccessfulConnections.add(tempServerName);
                    String[] tempArray = new String[]{tempServerName};
                    Tr.error((TraceComponent)tc, (String)"CWRTC0003E", (Object[])tempArray);
                }
                if (successfulConnections.contains(tempServerName)) {
                    successfulConnections.remove(tempServerName);
                }
            }
        }
    }

    @Override
    public String getConnectedMessageServer() {
        return this.connectedServer;
    }

    public boolean isConnected() {
        boolean connected = false;
        if (this.mqttClient != null) {
            connected = this.mqttClient.isConnected();
        }
        return connected;
    }

    public void destroy() {
        this.connectionRetryTimer.cancel();
    }

    private void subscribe(String topicName) throws MqttException {
        IMqttToken token = this.mqttClient.subscribe(topicName, 0);
        token.waitForCompletion((long)((connectionTimeout + 1) * 1000));
        int[] grantedQos = token.getGrantedQos();
        if (grantedQos.length == 1 && grantedQos[0] == 128) {
            throw new MqttException(128);
        }
    }

    private void unsubscribe(String topicName) throws MqttException {
        this.mqttClient.unsubscribe(topicName).waitForCompletion((long)((connectionTimeout + 1) * 1000));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static String generateMQTTClientID(RTCommUtils utils) {
        String localID;
        block16: {
            localID = null;
            if (serverID == null || utils.getSharedTopicName() == null) {
                boolean isIDValid = false;
                String originalID = localID;
                int counter = 0;
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)"generateMQTTClientID:   Generating UNIQUE ID", (Object[])new Object[0]);
                    Tr.debug((TraceComponent)tc, (String)("generateMQTTClientID:   originalID = " + originalID), (Object[])new Object[0]);
                }
                localID = MqttClient.generateClientId();
                Vector<String> vector = mqttClientIDs;
                synchronized (vector) {
                    while (!isIDValid && counter < 10000) {
                        if (localID.length() > 23) {
                            localID = localID.substring(localID.length() - 23, localID.length());
                        }
                        if (mqttClientIDs.contains(localID)) {
                            localID = originalID + counter++;
                            continue;
                        }
                        mqttClientIDs.add(localID);
                        isIDValid = true;
                    }
                }
            }
            try {
                String localUniqueString = serverID + utils.getSharedTopicName();
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("generateMQTTClientID:   Generating ID based on serverID = " + serverID), (Object[])new Object[0]);
                    Tr.debug((TraceComponent)tc, (String)("sharedTopicName = " + utils.getSharedTopicName()), (Object[])new Object[0]);
                    Tr.debug((TraceComponent)tc, (String)("unique string = " + localUniqueString), (Object[])new Object[0]);
                }
                MessageDigest messageDigest = MessageDigest.getInstance("MD5");
                byte[] result = messageDigest.digest(localUniqueString.getBytes(ENCODING_UTF_8));
                StringBuffer buffer = new StringBuffer();
                for (int i = 0; i < result.length; ++i) {
                    buffer.append(String.format("%02X", result[i]));
                }
                localID = buffer.toString();
                if (localID.length() > 23) {
                    localID = localID.substring(0, 23);
                }
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("generateMQTTClientID:  localID truncated string = " + localID), (Object[])new Object[0]);
                }
            }
            catch (NoSuchAlgorithmException ex) {
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)"generateMQTTClientID:  ERROR:    Unable to create MD5 MessageDigest", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                }
            }
            catch (UnsupportedEncodingException ex2) {
                if (!tc.isDebugEnabled()) break block16;
                Tr.debug((TraceComponent)tc, (String)"generateMQTTClientID:  ERROR:    Unsuppored encoding exception", (Object[])new Object[]{ex2.getLocalizedMessage(), ex2.getStackTrace()});
            }
        }
        return localID;
    }

    public class MyCallback
    implements MqttCallback {
        public void connectionLost(Throwable cause) {
            block3: {
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("connectionLost:  clientID(" + RTCommProviderImpl.this.mqttClient.getClientId() + ") cause = " + cause), (Object[])new Object[0]);
                }
                RTCommProviderImpl.this.reportLostConnection(RTCommProviderImpl.this.connectedServer);
                try {
                    RTCommProviderImpl.this.mqttClient.close();
                }
                catch (MqttException ex) {
                    if (!tc.isDebugEnabled()) break block3;
                    Tr.debug((TraceComponent)tc, (String)"connectionLost:  Exception caught while closing connection", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                }
            }
        }

        public void messageArrived(String topic, MqttMessage message) {
            block6: {
                JSONObject jsonObject = null;
                byte[] payload = null;
                if (tc.isDebugEnabled()) {
                    Tr.debug((TraceComponent)tc, (String)("messageArrived:   topic = " + topic), (Object[])new Object[0]);
                    Tr.debug((TraceComponent)tc, (String)("messageArrived:   length = " + message.getPayload().length + "    message = " + message), (Object[])new Object[0]);
                }
                try {
                    payload = message.getPayload();
                    if (payload.length != 0) {
                        jsonObject = JSONObject.parse((String)new String(message.getPayload(), RTCommProviderImpl.ENCODING_UTF_8));
                    }
                    RTCommProviderImpl.this.rtCommListener.messageReceived(topic, jsonObject);
                }
                catch (RuntimeException ex) {
                    if (tc.isDebugEnabled()) {
                        Tr.debug((TraceComponent)tc, (String)"RuntimeException occurred while processing inbound msg", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                    }
                }
                catch (Exception ex) {
                    if (!tc.isDebugEnabled()) break block6;
                    Tr.debug((TraceComponent)tc, (String)"Exception occurred processing inbound msg", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                }
            }
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
            if (tc.isDebugEnabled() && token.getException() != null) {
                Tr.debug((TraceComponent)tc, (String)("deliveryComplete:   ex = " + (Object)((Object)token.getException())), (Object[])new Object[0]);
            }
        }
    }

    public class ConnectionRetryTimer
    extends TimerTask {
        @Override
        public void run() {
            block3: {
                try {
                    if (!RTCommProviderImpl.this.mqttClient.isConnected() && !FrameworkState.isStopping()) {
                        RTCommProviderImpl.this.connectToServer();
                    }
                }
                catch (RTCommException ex) {
                    if (!tc.isDebugEnabled()) break block3;
                    Tr.debug((TraceComponent)tc, (String)"Exception while attempting to reconnect.", (Object[])new Object[]{ex.getLocalizedMessage(), ex.getStackTrace()});
                }
            }
        }
    }
}

