package com.lifx.core.transport.rx;

import com.lifx.core.entity.LUID;
import com.lifx.core.structle.Message;
import com.lifx.core.structle.Protocol;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import kotlin.Pair;
import kotlin.jvm.internal.Intrinsics;

/* loaded from: classes.dex */
public final class MessageExtensionsKt {
    public static final Single<SourcedMessage> awaitAcknowledgement(Flowable<SourcedMessage> receiver, final Message message, long j) {
        Intrinsics.b(receiver, "$receiver");
        Intrinsics.b(message, "message");
        Single<SourcedMessage> a = receiver.a(new Predicate<SourcedMessage>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$awaitAcknowledgement$1
            @Override // io.reactivex.functions.Predicate
            public final boolean test(SourcedMessage it) {
                Intrinsics.b(it, "it");
                Protocol.Header header = it.getMessage().getHeader();
                Intrinsics.a((Object) header, "it.message.header");
                long source = header.getSource();
                Protocol.Header header2 = Message.this.getHeader();
                Intrinsics.a((Object) header2, "message.header");
                if (source == header2.getSource() && it.getMessage().getHeader().sequence == Message.this.getHeader().sequence && it.getMessage().getType() == Protocol.MessageType.DEVICE_ACKNOWLEDGEMENT) {
                    LUID source2 = it.getMessage().getSource();
                    Intrinsics.a((Object) source2, "it.message.source");
                    byte[] target = source2.getTarget();
                    LUID source3 = Message.this.getSource();
                    Intrinsics.a((Object) source3, "message.source");
                    if (Arrays.equals(target, source3.getTarget())) {
                        return true;
                    }
                }
                return false;
            }
        }).e().a(j, TimeUnit.MILLISECONDS);
        Intrinsics.a((Object) a, "this.filter {\n          …s, TimeUnit.MILLISECONDS)");
        return a;
    }

    public static final <T> Single<T> awaitResponse(Flowable<SourcedMessage> receiver, final Message message, final Protocol.MessageType resultType, final Class<T> resultClass, long j) {
        Intrinsics.b(receiver, "$receiver");
        Intrinsics.b(message, "message");
        Intrinsics.b(resultType, "resultType");
        Intrinsics.b(resultClass, "resultClass");
        Single<T> a = receiver.a(new Predicate<SourcedMessage>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$awaitResponse$1
            @Override // io.reactivex.functions.Predicate
            public final boolean test(SourcedMessage it) {
                Intrinsics.b(it, "it");
                Protocol.Header header = it.getMessage().getHeader();
                Intrinsics.a((Object) header, "it.message.header");
                long source = header.getSource();
                Protocol.Header header2 = Message.this.getHeader();
                Intrinsics.a((Object) header2, "message.header");
                if (source == header2.getSource() && it.getMessage().getHeader().sequence == Message.this.getHeader().sequence && it.getMessage().getType() == resultType) {
                    LUID source2 = it.getMessage().getSource();
                    Intrinsics.a((Object) source2, "it.message.source");
                    byte[] target = source2.getTarget();
                    LUID source3 = Message.this.getSource();
                    Intrinsics.a((Object) source3, "message.source");
                    if (Arrays.equals(target, source3.getTarget())) {
                        return true;
                    }
                }
                return false;
            }
        }).d((Function<? super SourcedMessage, ? extends R>) new Function<T, R>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$awaitResponse$2
            @Override // io.reactivex.functions.Function
            public final T apply(SourcedMessage it) {
                Intrinsics.b(it, "it");
                return (T) resultClass.cast(it.getMessage().getPayload());
            }
        }).e().a(j, TimeUnit.MILLISECONDS);
        Intrinsics.a((Object) a, "this.filter {\n          …s, TimeUnit.MILLISECONDS)");
        return a;
    }

    public static final <TR> Single<ObservableResult> observeResponse(final Message receiver, final Flowable<SourcedMessage> messages, final Protocol.MessageType resultType, final Class<TR> resultClass, final boolean z, final long j) {
        Intrinsics.b(receiver, "$receiver");
        Intrinsics.b(messages, "messages");
        Intrinsics.b(resultType, "resultType");
        Intrinsics.b(resultClass, "resultClass");
        Single<ObservableResult> a = Single.a(new SingleOnSubscribe<T>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$observeResponse$1
            @Override // io.reactivex.SingleOnSubscribe
            public final void subscribe(final SingleEmitter<ObservableResult> subscriber) {
                Intrinsics.b(subscriber, "subscriber");
                Protocol.Header header = Message.this.getHeader();
                Intrinsics.a((Object) header, "header");
                if (header.getResponseRequired()) {
                    MessageExtensionsKt.awaitResponse(messages, Message.this, resultType, resultClass, j).a((Consumer) new Consumer<TR>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$observeResponse$1.1
                        @Override // io.reactivex.functions.Consumer
                        public final void accept(TR tr) {
                            SingleEmitter.this.a((SingleEmitter) new ObservableResult(ObservableResult.Companion.getStatusSuccess(), tr));
                        }
                    }, new Consumer<Throwable>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$observeResponse$1.2
                        @Override // io.reactivex.functions.Consumer
                        public final void accept(Throwable th) {
                            SingleEmitter.this.a(th);
                        }
                    });
                    return;
                }
                Protocol.Header header2 = Message.this.getHeader();
                Intrinsics.a((Object) header2, "header");
                if (header2.getAcknowledgementRequired() && z) {
                    MessageExtensionsKt.awaitAcknowledgement(messages, Message.this, j).a(new Consumer<SourcedMessage>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$observeResponse$1.3
                        @Override // io.reactivex.functions.Consumer
                        public final void accept(SourcedMessage sourcedMessage) {
                            SingleEmitter.this.a((SingleEmitter) new ObservableResult(ObservableResult.Companion.getStatusAcknowledged(), null));
                        }
                    }, new Consumer<Throwable>() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$observeResponse$1.4
                        @Override // io.reactivex.functions.Consumer
                        public final void accept(Throwable th) {
                            SingleEmitter.this.a(th);
                        }
                    });
                } else {
                    subscriber.a((SingleEmitter<ObservableResult>) new ObservableResult(ObservableResult.Companion.getStatusSuccess(), null));
                }
            }
        });
        Intrinsics.a((Object) a, "Single.create { subscrib…s, null))\n        }\n    }");
        return a;
    }

    public static final Completable sendMessage(final Message receiver, final boolean z, final ITransportAndAddressResolver transportAndAddressResolver) {
        Intrinsics.b(receiver, "$receiver");
        Intrinsics.b(transportAndAddressResolver, "transportAndAddressResolver");
        Completable a = Completable.a(new CompletableOnSubscribe() { // from class: com.lifx.core.transport.rx.MessageExtensionsKt$sendMessage$1
            @Override // io.reactivex.CompletableOnSubscribe
            public final void subscribe(CompletableEmitter subscriber) {
                Intrinsics.b(subscriber, "subscriber");
                ITransportAndAddressResolver iTransportAndAddressResolver = transportAndAddressResolver;
                boolean z2 = z;
                LUID destination = Message.this.getDestination();
                Intrinsics.a((Object) destination, "this.destination");
                Pair<IOutgoingTransport, InetAddress> mostSignificantTransportAndAddress = iTransportAndAddressResolver.mostSignificantTransportAndAddress(z2, destination);
                if (mostSignificantTransportAndAddress == null) {
                    subscriber.a(new NoTransportAvailableException());
                } else if (mostSignificantTransportAndAddress.a().sendMessage(new TargetedMessage(Message.this, mostSignificantTransportAndAddress.b()))) {
                    subscriber.r_();
                } else {
                    subscriber.a(new SentMessageFailedException());
                }
            }
        });
        Intrinsics.a((Object) a, "Completable.create { sub…eption())\n        }\n    }");
        return a;
    }

    public static /* synthetic */ Completable sendMessage$default(Message message, boolean z, ITransportAndAddressResolver iTransportAndAddressResolver, int i, Object obj) {
        if ((i & 1) != 0) {
            z = false;
        }
        return sendMessage(message, z, iTransportAndAddressResolver);
    }
}
