package DAN;

import CSMAPI.CSMAPI;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: classes.dex */
public class DAN {
    public static final String CONTROL_CHANNEL = "Control_channel";
    private static final String DEFAULT_EC_HOST = "http://openmtc.darkgerm.com:9999";
    public static final int EC_BROADCAST_PORT = 17000;
    private static String d_id = null;
    private static final String dan_log_tag = "DAN";
    private static boolean initialized = false;
    private static String log_tag = "DAN";
    private static JSONObject profile = null;
    private static long request_interval = 150;
    public static final String version = "20160509";
    private static final Long HEART_BEAT_DEAD_MILLISECOND = 3000L;
    private static final Set<Subscriber> event_subscribers = Collections.synchronizedSet(new HashSet());
    private static final ConcurrentHashMap<String, UpStreamThread> upstream_thread_pool = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, DownStreamThread> downstream_thread_pool = new ConcurrentHashMap<>();
    private static final Map<String, Long> detected_ec_heartbeat = Collections.synchronizedMap(new LinkedHashMap());

    /* loaded from: classes.dex */
    private static class DownStreamThread extends Thread {
        String data_timestamp;
        String feature;
        Subscriber subscriber;
        long timestamp = 0;
        boolean working_permission;

        public DownStreamThread(String str, Subscriber subscriber) {
            this.feature = str;
            this.subscriber = subscriber;
        }

        private void deliver_data(JSONArray jSONArray) throws JSONException {
            DAN.logging("DownStreamThread(%s).deliver_data(): %s", this.feature, jSONArray.toString());
            if (jSONArray.length() == 0) {
                DAN.logging("DownStreamThread(%s).deliver_data(): No any data", this.feature);
                return;
            }
            JSONArray jSONArray2 = jSONArray.getJSONArray(0);
            String string = jSONArray2.getString(0);
            JSONArray jSONArray3 = jSONArray2.getJSONArray(1);
            if (string.equals(this.data_timestamp)) {
                DAN.logging("DownStreamThread(%s).deliver_data(): No new data", this.feature);
            } else {
                this.data_timestamp = string;
                this.subscriber.odf_handler(this.feature, new ODFObject(string, jSONArray3));
            }
        }

        public boolean has_subscriber(Subscriber subscriber) {
            return this.subscriber.equals(subscriber);
        }

        public void kill() {
            this.working_permission = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            DAN.logging("DownStreamThread(%s) starts", this.feature);
            try {
                if (!DAN.json_array_has_string(DAN.profile.getJSONArray("df_list"), this.feature)) {
                    DAN.logging("DownStreamThread(%s).run(): feature not exists, exit", this.feature);
                    return;
                }
                this.working_permission = true;
                this.data_timestamp = "";
                while (this.working_permission) {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - this.timestamp < DAN.request_interval) {
                            Thread.sleep(DAN.request_interval - (currentTimeMillis - this.timestamp));
                        }
                        this.timestamp = System.currentTimeMillis();
                        if (SessionThread.status()) {
                            DAN.logging("DownStreamThread(%s).run(): pull", this.feature);
                            deliver_data(CSMAPI.pull(DAN.d_id, this.feature));
                        } else {
                            DAN.logging("DownStreamThread(%s).run(): skip. (ec_status == false)", this.feature);
                        }
                    } catch (CSMAPI.CSMError unused) {
                        DAN.logging("DownStreamThread(%s).run(): CSMError", this.feature);
                        DAN.broadcast_control_message(Event.PULL_FAILED, this.feature);
                    } catch (InterruptedException unused2) {
                        DAN.logging("DownStreamThread(%s).run(): InterruptedException", this.feature);
                    } catch (JSONException unused3) {
                        DAN.logging("DownStreamThread(%s).run(): JSONException", this.feature);
                    }
                }
                DAN.logging("DownStreamThread(%s) ends", this.feature);
            } catch (JSONException unused4) {
                DAN.logging("DownStreamThread(%s).run(): JSONException", this.feature);
            }
        }
    }

    /* loaded from: classes.dex */
    public enum Event {
        FOUND_NEW_EC,
        REGISTER_FAILED,
        REGISTER_SUCCEED,
        PUSH_FAILED,
        PUSH_SUCCEED,
        PULL_FAILED,
        DEREGISTER_FAILED,
        DEREGISTER_SUCCEED;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static Event[] valuesCustom() {
            Event[] valuesCustom = values();
            int length = valuesCustom.length;
            Event[] eventArr = new Event[length];
            System.arraycopy(valuesCustom, 0, eventArr, 0, length);
            return eventArr;
        }
    }

    /* loaded from: classes.dex */
    public static class ODFObject {
        public JSONArray data;
        public Event event;
        public String message;
        public String timestamp;

        public ODFObject(Event event, String str) {
            this.timestamp = null;
            this.data = null;
            this.event = event;
            this.message = str;
        }

        public ODFObject(String str, JSONArray jSONArray) {
            this.timestamp = str;
            this.data = jSONArray;
            this.event = null;
            this.message = null;
        }
    }

    /* loaded from: classes.dex */
    public static abstract class Reducer {
        public static final Reducer LAST = new Reducer() { // from class: DAN.DAN.Reducer.1
            @Override // DAN.DAN.Reducer
            public JSONArray reduce(JSONArray jSONArray, JSONArray jSONArray2, int i, int i2) {
                return jSONArray2;
            }
        };

        public abstract JSONArray reduce(JSONArray jSONArray, JSONArray jSONArray2, int i, int i2);
    }

    /* loaded from: classes.dex */
    private static class SearchECThread extends Thread {
        private static final Semaphore instance_lock = new Semaphore(1);
        private static SearchECThread self;
        private DatagramSocket socket;

        private SearchECThread() {
        }

        public static SearchECThread instance() {
            try {
                instance_lock.acquire();
                if (self == null) {
                    DAN.logging("SearchECThread.instance(): create instance");
                    self = new SearchECThread();
                }
                instance_lock.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return self;
        }

        public void kill() {
            DAN.logging("SearchECThread.kill()");
            try {
                instance_lock.acquire();
                if (self == null) {
                    DAN.logging("SearchECThread.kill(): not running, skip");
                    return;
                }
                self.socket.close();
                try {
                    self.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                self = null;
                instance_lock.release();
                DAN.logging("SearchECThread.kill(): singleton cleaned");
            } catch (InterruptedException unused) {
                DAN.logging("SearchECThread.kill(): InterruptedException");
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.socket = new DatagramSocket((SocketAddress) null);
                this.socket.setReuseAddress(true);
                this.socket.bind(new InetSocketAddress("0.0.0.0", 17000));
                byte[] bArr = new byte[20];
                DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
                while (true) {
                    this.socket.receive(datagramPacket);
                    if (new String(bArr, 0, datagramPacket.getLength()).equals("easyconnect")) {
                        String str = "http://" + datagramPacket.getAddress().getHostAddress() + ":9999";
                        synchronized (DAN.detected_ec_heartbeat) {
                            if (!DAN.detected_ec_heartbeat.containsKey(str)) {
                                DAN.logging("FOUND_NEW_EC: %s", str);
                                DAN.broadcast_control_message(Event.FOUND_NEW_EC, str);
                            }
                            DAN.detected_ec_heartbeat.put(str, Long.valueOf(System.currentTimeMillis()));
                        }
                    }
                }
            } catch (IOException unused) {
                DAN.logging("SearchECThread: IOException");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public static class SessionThread extends Thread {
        private static /* synthetic */ int[] $SWITCH_TABLE$DAN$DAN$SessionThread$CommandOpCode = null;
        private static final int RETRY_COUNT = 3;
        private static final int RETRY_INTERVAL = 2000;
        private static final Semaphore instance_lock = new Semaphore(1);
        private static SessionThread self;
        private final LinkedBlockingQueue<SessionCommand> command_channel = new LinkedBlockingQueue<>();
        private final LinkedBlockingQueue<Integer> response_channel = new LinkedBlockingQueue<>();
        private boolean session_status;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: classes.dex */
        public enum CommandOpCode {
            REGISTER,
            DEREGISTER;

            /* renamed from: values, reason: to resolve conflict with enum method */
            public static CommandOpCode[] valuesCustom() {
                CommandOpCode[] valuesCustom = values();
                int length = valuesCustom.length;
                CommandOpCode[] commandOpCodeArr = new CommandOpCode[length];
                System.arraycopy(valuesCustom, 0, commandOpCodeArr, 0, length);
                return commandOpCodeArr;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: classes.dex */
        public class SessionCommand {
            public String ec_endpoint;
            public CommandOpCode op_code;

            public SessionCommand(CommandOpCode commandOpCode, String str) {
                this.op_code = commandOpCode;
                this.ec_endpoint = str;
            }
        }

        static /* synthetic */ int[] $SWITCH_TABLE$DAN$DAN$SessionThread$CommandOpCode() {
            int[] iArr = $SWITCH_TABLE$DAN$DAN$SessionThread$CommandOpCode;
            if (iArr != null) {
                return iArr;
            }
            int[] iArr2 = new int[CommandOpCode.valuesCustom().length];
            try {
                iArr2[CommandOpCode.DEREGISTER.ordinal()] = 2;
            } catch (NoSuchFieldError unused) {
            }
            try {
                iArr2[CommandOpCode.REGISTER.ordinal()] = 1;
            } catch (NoSuchFieldError unused2) {
            }
            $SWITCH_TABLE$DAN$DAN$SessionThread$CommandOpCode = iArr2;
            return iArr2;
        }

        private SessionThread() {
        }

        public static SessionThread instance() {
            try {
                instance_lock.acquire();
                if (self == null) {
                    DAN.logging("SessionThread.instance(): create instance");
                    self = new SessionThread();
                }
                instance_lock.release();
            } catch (InterruptedException unused) {
                DAN.logging("SessionThread.instance(): InterruptedException");
            }
            return self;
        }

        public static boolean status() {
            SessionThread sessionThread = self;
            if (sessionThread != null) {
                return sessionThread.session_status;
            }
            return false;
        }

        public void deregister() {
            DAN.logging("SessionThread.disconnect()");
            try {
                this.command_channel.add(new SessionCommand(CommandOpCode.DEREGISTER, ""));
                this.response_channel.take();
            } catch (IllegalStateException unused) {
                DAN.logging("SessionThread.connect(): IllegalStateException, command channel is full");
            } catch (InterruptedException unused2) {
                DAN.logging("SessionThread.disconnect(): InterruptedException, response_channel is interrupted");
            }
        }

        public void kill() {
            DAN.logging("SessionThread.kill()");
            try {
                instance_lock.acquire();
                if (self == null) {
                    DAN.logging("SessionThread.kill(): not running, skip");
                    return;
                }
                if (status()) {
                    deregister();
                }
                self.interrupt();
                try {
                    self.join();
                } catch (InterruptedException unused) {
                    DAN.logging("SessionThread.kill(): InterruptedException");
                }
                self = null;
                DAN.logging("SessionThread.kill(): singleton cleaned");
                instance_lock.release();
            } catch (InterruptedException unused2) {
                DAN.logging("SessionThread.kill(): InterruptedException");
            }
        }

        public void register(String str) {
            DAN.logging("SessionThread.connect(%s)", str);
            try {
                this.command_channel.add(new SessionCommand(CommandOpCode.REGISTER, str));
            } catch (IllegalStateException unused) {
                DAN.logging("SessionThread.connect(): IllegalStateException, command channel is full");
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    SessionCommand take = this.command_channel.take();
                    int i = $SWITCH_TABLE$DAN$DAN$SessionThread$CommandOpCode()[take.op_code.ordinal()];
                    if (i == 1) {
                        DAN.logging("SessionThread.run(): REGISTER: %s", take.ec_endpoint);
                        if (!this.session_status || CSMAPI.ENDPOINT.equals(take.ec_endpoint)) {
                            CSMAPI.ENDPOINT = take.ec_endpoint;
                            for (int i2 = 0; i2 < 3; i2++) {
                                try {
                                    this.session_status = CSMAPI.register(DAN.d_id, DAN.profile);
                                    DAN.logging("SessionThread.run(): REGISTER: %s: %b", CSMAPI.ENDPOINT, Boolean.valueOf(this.session_status));
                                    if (this.session_status) {
                                        break;
                                    }
                                } catch (CSMAPI.CSMError unused) {
                                    DAN.logging("SessionThread.run(): REGISTER: CSMError");
                                }
                                DAN.logging("SessionThread.run(): REGISTER: Wait %d milliseconds before retry", 2000);
                                Thread.sleep(2000L);
                            }
                            if (this.session_status) {
                                DAN.broadcast_control_message(Event.REGISTER_SUCCEED, CSMAPI.ENDPOINT);
                            } else {
                                DAN.logging("SessionThread.run(): REGISTER: Give up");
                                DAN.broadcast_control_message(Event.REGISTER_FAILED, CSMAPI.ENDPOINT);
                            }
                        } else {
                            DAN.logging("SessionThread.run(): REGISTER: Already registered to another EC");
                        }
                    } else if (i == 2) {
                        DAN.logging("SessionThread.run(): DEREGISTER: %s", CSMAPI.ENDPOINT);
                        if (this.session_status) {
                            boolean z = false;
                            for (int i3 = 0; i3 < 3; i3++) {
                                try {
                                    z = CSMAPI.deregister(DAN.d_id);
                                    DAN.logging("SessionThread.run(): DEREGISTER: %s: %b", CSMAPI.ENDPOINT, Boolean.valueOf(z));
                                } catch (CSMAPI.CSMError unused2) {
                                    DAN.logging("SessionThread.run(): DEREGISTER: CSMError");
                                }
                                if (z) {
                                    break;
                                }
                                DAN.logging("SessionThread.run(): DEREGISTER: Wait %d milliseconds before retry", 2000);
                                Thread.sleep(2000L);
                            }
                            if (z) {
                                DAN.broadcast_control_message(Event.DEREGISTER_SUCCEED, CSMAPI.ENDPOINT);
                            } else {
                                DAN.logging("SessionThread.run(): DEREGISTER: Give up");
                                DAN.broadcast_control_message(Event.DEREGISTER_FAILED, CSMAPI.ENDPOINT);
                            }
                            this.session_status = false;
                        } else {
                            DAN.logging("SessionThread.run(): DEREGISTER: Not registered to any EC, abort");
                        }
                        this.response_channel.add(0);
                    }
                } catch (InterruptedException unused3) {
                    DAN.logging("SessionThread.run(): InterruptedException");
                    return;
                }
            }
        }
    }

    /* loaded from: classes.dex */
    public static abstract class Subscriber {
        public abstract void odf_handler(String str, ODFObject oDFObject);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public static class UpStreamThread extends Thread {
        String feature;
        boolean working_permission;
        final LinkedBlockingQueue<JSONArray> queue = new LinkedBlockingQueue<>();
        long timestamp = 0;
        Reducer reducer = Reducer.LAST;

        public UpStreamThread(String str) {
            this.feature = str;
        }

        public void enqueue(JSONArray jSONArray) {
            try {
                this.queue.put(jSONArray);
            } catch (InterruptedException unused) {
                DAN.logging("UpStreamThread(%s).enqueue(): InterruptedException", this.feature);
            }
        }

        public void enqueue(JSONArray jSONArray, Reducer reducer) {
            enqueue(jSONArray);
            this.reducer = reducer;
        }

        public void kill() {
            this.working_permission = false;
            interrupt();
        }

        /* JADX WARN: Code restructure failed: missing block: B:28:0x0070, code lost:
        
            r1 = new org.json.JSONObject();
            r1.put(com.google.android.gms.common.data.DataBufferSafeParcelable.DATA_FIELD, r4);
         */
        /* JADX WARN: Code restructure failed: missing block: B:29:0x007e, code lost:
        
            if (DAN.DAN.SessionThread.status() == false) goto L41;
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:0x00b7, code lost:
        
            DAN.DAN.logging("UpStreamThread(%s).run(): skip. (ec_status == false)", r9.feature);
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x0080, code lost:
        
            DAN.DAN.logging("UpStreamThread(%s).run(): push %s", r9.feature, r1.toString());
         */
        /* JADX WARN: Code restructure failed: missing block: B:37:0x0092, code lost:
        
            CSMAPI.CSMAPI.push(DAN.DAN.d_id, r9.feature, r1);
            DAN.DAN.broadcast_control_message(DAN.DAN.Event.PUSH_SUCCEED, r9.feature);
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x00a3, code lost:
        
            DAN.DAN.logging("UpStreamThread(%s).run(): CSMError", r9.feature);
            DAN.DAN.broadcast_control_message(DAN.DAN.Event.PUSH_FAILED, r9.feature);
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 254
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: DAN.DAN.UpStreamThread.run():void");
        }
    }

    public static String[] available_ec() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(DEFAULT_EC_HOST);
        synchronized (detected_ec_heartbeat) {
            for (Map.Entry<String, Long> entry : detected_ec_heartbeat.entrySet()) {
                if (System.currentTimeMillis() - entry.getValue().longValue() < HEART_BEAT_DEAD_MILLISECOND.longValue()) {
                    arrayList.add(entry.getKey());
                } else {
                    detected_ec_heartbeat.remove(entry);
                }
            }
        }
        return (String[]) arrayList.toArray(new String[0]);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void broadcast_control_message(Event event, String str) {
        logging("broadcast_control_message()");
        synchronized (event_subscribers) {
            Iterator<Subscriber> it = event_subscribers.iterator();
            while (it.hasNext()) {
                it.next().odf_handler(CONTROL_CHANNEL, new ODFObject(event, str));
            }
        }
    }

    public static void deregister() {
        SessionThread.instance().deregister();
    }

    public static String ec_endpoint() {
        return CSMAPI.ENDPOINT;
    }

    public static String get_clean_mac_addr(String str) {
        return str.replace(":", "");
    }

    public static String get_d_id(String str) {
        return get_clean_mac_addr(str);
    }

    public static String get_d_name() {
        try {
            return profile == null ? "Error" : profile.getString("d_name");
        } catch (JSONException unused) {
            logging("get_d_name(): JSONException");
            return "Error";
        }
    }

    public static long get_request_interval() {
        return request_interval;
    }

    public static void init(Subscriber subscriber) {
        logging("init()");
        if (initialized) {
            logging("init(): Already initialized");
            return;
        }
        SearchECThread.instance().start();
        SessionThread.instance().start();
        CSMAPI.ENDPOINT = DEFAULT_EC_HOST;
        set_request_interval(150L);
        synchronized (event_subscribers) {
            event_subscribers.clear();
            event_subscribers.add(subscriber);
        }
        upstream_thread_pool.clear();
        downstream_thread_pool.clear();
        synchronized (detected_ec_heartbeat) {
            detected_ec_heartbeat.clear();
        }
        initialized = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean json_array_has_string(JSONArray jSONArray, String str) {
        for (int i = 0; i < jSONArray.length(); i++) {
            try {
            } catch (JSONException unused) {
                logging("json_array_has_string(): JSONException");
            }
            if (jSONArray.getString(i).equals(str)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void logging(String str) {
        System.out.printf("[%s][%s] %s%n", log_tag, dan_log_tag, str);
    }

    public static void logging(String str, Object... objArr) {
        logging(String.format(str, objArr));
    }

    public static void push(String str, JSONArray jSONArray) {
        push(str, jSONArray, Reducer.LAST);
    }

    public static void push(String str, JSONArray jSONArray, Reducer reducer) {
        if (!upstream_thread_pool.containsKey(str)) {
            UpStreamThread upStreamThread = new UpStreamThread(str);
            upstream_thread_pool.put(str, upStreamThread);
            upStreamThread.start();
        }
        upstream_thread_pool.get(str).enqueue(jSONArray, reducer);
    }

    public static void push(String str, double[] dArr) {
        push(str, dArr, Reducer.LAST);
    }

    public static void push(String str, double[] dArr, Reducer reducer) {
        JSONArray jSONArray = new JSONArray();
        for (double d : dArr) {
            jSONArray.put(d);
        }
        push(str, jSONArray, reducer);
    }

    public static void push(String str, float[] fArr) {
        push(str, fArr, Reducer.LAST);
    }

    public static void push(String str, float[] fArr, Reducer reducer) {
        JSONArray jSONArray = new JSONArray();
        for (float f : fArr) {
            jSONArray.put(f);
        }
        push(str, jSONArray, reducer);
    }

    public static void push(String str, int[] iArr) {
        push(str, iArr, Reducer.LAST);
    }

    public static void push(String str, int[] iArr, Reducer reducer) {
        JSONArray jSONArray = new JSONArray();
        for (int i : iArr) {
            jSONArray.put(i);
        }
        push(str, jSONArray, reducer);
    }

    public static void register(String str, String str2, JSONObject jSONObject) {
        d_id = str2;
        profile = jSONObject;
        if (!profile.has("is_sim")) {
            try {
                profile.put("is_sim", false);
            } catch (JSONException unused) {
                logging("register(): JSONException");
            }
        }
        SessionThread.instance().register(str);
    }

    public static void register(String str, JSONObject jSONObject) {
        register(CSMAPI.ENDPOINT, str, jSONObject);
    }

    public static void reregister(String str) {
        logging("reregister(%s)", str);
        SessionThread.instance().deregister();
        SessionThread.instance().register(str);
    }

    public static boolean session_status() {
        return SessionThread.status();
    }

    public static void set_log_tag(String str) {
        log_tag = str;
        CSMAPI.set_log_tag(str);
    }

    public static void set_request_interval(long j) {
        if (j > 0) {
            logging("set_request_interval(%d)", Long.valueOf(j));
            request_interval = j;
        }
    }

    public static void shutdown() {
        logging("shutdown()");
        if (!initialized) {
            logging("shutdown(): Already shutdown");
            return;
        }
        Iterator<Map.Entry<String, UpStreamThread>> it = upstream_thread_pool.entrySet().iterator();
        while (it.hasNext()) {
            UpStreamThread value = it.next().getValue();
            value.kill();
            try {
                value.join();
            } catch (InterruptedException unused) {
                logging("shutdown(): UpStreamThread: InterruptedException");
            }
        }
        upstream_thread_pool.clear();
        Iterator<Map.Entry<String, DownStreamThread>> it2 = downstream_thread_pool.entrySet().iterator();
        while (it2.hasNext()) {
            DownStreamThread value2 = it2.next().getValue();
            value2.kill();
            try {
                value2.join();
            } catch (InterruptedException unused2) {
                logging("shutdown(): DownStreamThread: InterruptedException");
            }
        }
        downstream_thread_pool.clear();
        SearchECThread.instance().kill();
        SessionThread.instance().kill();
        initialized = false;
    }

    public static void subscribe(String str, Subscriber subscriber) {
        if (str.equals(CONTROL_CHANNEL)) {
            synchronized (event_subscribers) {
                event_subscribers.add(subscriber);
            }
        } else {
            if (downstream_thread_pool.containsKey(str)) {
                return;
            }
            DownStreamThread downStreamThread = new DownStreamThread(str, subscriber);
            downstream_thread_pool.put(str, downStreamThread);
            downStreamThread.start();
        }
    }

    public static void unsubscribe(Subscriber subscriber) {
        synchronized (event_subscribers) {
            event_subscribers.remove(subscriber);
        }
        for (Map.Entry<String, DownStreamThread> entry : downstream_thread_pool.entrySet()) {
            String key = entry.getKey();
            DownStreamThread value = entry.getValue();
            if (value.has_subscriber(subscriber)) {
                value.kill();
                try {
                    value.join();
                } catch (InterruptedException unused) {
                    logging("unsubscribe(): DownStreamThread: InterruptedException");
                }
                downstream_thread_pool.remove(key);
                return;
            }
        }
    }

    public static void unsubscribe(String str) {
        if (str.equals(CONTROL_CHANNEL)) {
            synchronized (event_subscribers) {
                event_subscribers.clear();
            }
            return;
        }
        DownStreamThread downStreamThread = downstream_thread_pool.get(str);
        if (downStreamThread == null) {
            return;
        }
        downStreamThread.kill();
        try {
            downStreamThread.join();
        } catch (InterruptedException unused) {
            logging("unsubscribe(): DownStreamThread: InterruptedException");
        }
        downstream_thread_pool.remove(str);
    }
}
