package com.didi.comlab.horcrux.chat.manager;

import com.didi.comlab.dim.ability.logger.DIMLogger;
import com.didi.comlab.horcrux.chat.message.MessageActivity;
import com.didi.comlab.horcrux.core.TeamContext;
import com.didi.comlab.horcrux.core.data.RealmWriteExecutorKt;
import com.didi.comlab.horcrux.core.data.helper.ConversationHelper;
import com.didi.comlab.horcrux.core.data.helper.MessageHelper;
import com.didi.comlab.horcrux.core.data.personal.model.Conversation;
import com.didi.comlab.horcrux.core.data.personal.model.Message;
import com.didi.comlab.horcrux.core.exception.DIMException;
import com.didi.comlab.horcrux.core.network.model.request.QueryMessagesRequestBody;
import com.didi.comlab.horcrux.core.network.model.response.QueryMessageResponseBody;
import com.didi.comlab.horcrux.core.network.snitch.ResponseToResult;
import com.didichuxing.ep.im.tracelog.trace.Trace;
import com.didichuxing.ep.im.tracelog.trace.child.TraceChild;
import com.didichuxing.ep.im.tracelog.trace.child.TraceHTTPChild;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.realm.Realm;
import io.realm.RealmResults;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kotlin.Unit;
import kotlin.a.a;
import kotlin.collections.aa;
import kotlin.collections.m;
import kotlin.h;
import kotlin.io.b;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.ranges.IntRange;
import kotlin.text.k;

/* compiled from: DIMMessageManager.kt */
@h
/* loaded from: classes2.dex */
public final class DIMMessageManager {
    public static final DIMMessageManager INSTANCE;
    private static final int MESSAGE_LIMIT = 20;
    private static final DIMLogger mLogger;

    static {
        DIMMessageManager dIMMessageManager = new DIMMessageManager();
        INSTANCE = dIMMessageManager;
        mLogger = DIMLogger.Companion.getLogger(dIMMessageManager.getClass());
    }

    private DIMMessageManager() {
    }

    private final Observable<List<Message>> fetchLatestMessagesAndClear(String str) {
        Message latestMessage;
        TeamContext current = TeamContext.Companion.current();
        if (current == null) {
            Observable<List<Message>> a2 = Observable.a((Throwable) new DIMException(1));
            kotlin.jvm.internal.h.a((Object) a2, "Observable.error(DIMExce….FIELD_REQUIRED_IS_NULL))");
            return a2;
        }
        String str2 = null;
        Realm personalRealm$default = TeamContext.personalRealm$default(current, false, 1, null);
        Throwable th = (Throwable) null;
        try {
            try {
                Conversation fetchByVid = ConversationHelper.INSTANCE.fetchByVid(personalRealm$default, str);
                if (fetchByVid != null && (latestMessage = fetchByVid.getLatestMessage()) != null) {
                    str2 = latestMessage.getKey();
                }
                b.a(personalRealm$default, th);
                Observable b2 = fetchMessagesFromServer(current, str, QueryMessagesRequestBody.Companion.createLatestQuery(20), false).b(RealmWriteExecutorKt.runRealmWriteTask(current, new DIMMessageManager$fetchLatestMessagesAndClear$1(str, str2)));
                kotlin.jvm.internal.h.a((Object) b2, "fetchMessagesFromServer(…         }\n            })");
                return b2;
            } finally {
            }
        } catch (Throwable th2) {
            b.a(personalRealm$default, th);
            throw th2;
        }
    }

    private final Observable<List<Message>> fetchMessagesFromServer(TeamContext teamContext, final String str, final QueryMessagesRequestBody queryMessagesRequestBody, final boolean z) {
        final TraceHTTPChild deriveHTTP = Trace.Companion.in$default(Trace.Companion, null, null, null, null, 15, null).deriveHTTP(teamContext.getBaseUrl() + "/api/vchannels/" + str + "/messages/query", queryMessagesRequestBody);
        Observable<List<Message>> b2 = teamContext.conversationApi().queryMessages(str, queryMessagesRequestBody, deriveHTTP.traceId(), deriveHTTP.cspanId()).c(new Consumer<Disposable>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$1
            @Override // io.reactivex.functions.Consumer
            public final void accept(Disposable disposable) {
                DIMLogger dIMLogger;
                DIMMessageManager dIMMessageManager = DIMMessageManager.INSTANCE;
                dIMLogger = DIMMessageManager.mLogger;
                dIMLogger.i("MessageFlow[" + str + "] start fetch messages with params:" + queryMessagesRequestBody + ", saveToRealm:" + z);
            }
        }).d(new ResponseToResult()).a(new Consumer<Throwable>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$2
            @Override // io.reactivex.functions.Consumer
            public final void accept(Throwable th) {
                TraceChild.out$default(TraceHTTPChild.failure$default(TraceHTTPChild.this, th, null, 2, null), null, null, null, 7, null);
            }
        }).b(new Consumer<QueryMessageResponseBody>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$3
            @Override // io.reactivex.functions.Consumer
            public final void accept(QueryMessageResponseBody queryMessageResponseBody) {
                TraceChild.out$default(TraceHTTPChild.success$default(TraceHTTPChild.this, queryMessageResponseBody, null, 2, null), null, null, null, 7, null);
            }
        }).d(new Function<T, R>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$4
            @Override // io.reactivex.functions.Function
            public final List<Message> apply(QueryMessageResponseBody queryMessageResponseBody) {
                kotlin.jvm.internal.h.b(queryMessageResponseBody, "it");
                return m.a((Iterable) queryMessageResponseBody.getMessages(), (Comparator) new Comparator<T>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$4$$special$$inlined$sortedBy$1
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.Comparator
                    public final int compare(T t, T t2) {
                        return a.a(Long.valueOf(((Message) t).getCreatedTs()), Long.valueOf(((Message) t2).getCreatedTs()));
                    }
                });
            }
        }).b(new Consumer<List<? extends Message>>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$observable$5
            @Override // io.reactivex.functions.Consumer
            public final void accept(List<? extends Message> list) {
                DIMLogger dIMLogger;
                kotlin.jvm.internal.h.a((Object) list, "it");
                List<? extends Message> list2 = list;
                ArrayList arrayList = new ArrayList(m.a(list2, 10));
                Iterator<T> it2 = list2.iterator();
                while (it2.hasNext()) {
                    arrayList.add(k.c(((Message) it2.next()).getKey(), 6));
                }
                DIMMessageManager dIMMessageManager = DIMMessageManager.INSTANCE;
                dIMLogger = DIMMessageManager.mLogger;
                dIMLogger.i("MessageFlow[" + str + "] fetched " + list.size() + " messages: " + arrayList);
            }
        });
        if (!z) {
            kotlin.jvm.internal.h.a((Object) b2, "observable");
            return b2;
        }
        Observable b3 = b2.b(RealmWriteExecutorKt.runRealmWriteTask(teamContext, new Function2<Realm, List<? extends Message>, Unit>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServer$1
            @Override // kotlin.jvm.functions.Function2
            public /* bridge */ /* synthetic */ Unit invoke(Realm realm, List<? extends Message> list) {
                invoke2(realm, list);
                return Unit.f16169a;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Realm realm, List<? extends Message> list) {
                kotlin.jvm.internal.h.b(realm, "realm");
                kotlin.jvm.internal.h.b(list, "messages");
                MessageHelper.INSTANCE.createOrUpdateMessagesFromServer(realm, list);
            }
        }));
        kotlin.jvm.internal.h.a((Object) b3, "observable.flatMap(runRe… messages)\n            })");
        return b3;
    }

    private final Observable<List<Message>> fetchRangeMessages(String str, String str2) {
        TeamContext current = TeamContext.Companion.current();
        if (current != null) {
            return fetchMessagesFromServer(current, str, QueryMessagesRequestBody.Companion.createSinceRangeForKeyQuery(str2, 20, 20, true), false);
        }
        Observable<List<Message>> a2 = Observable.a((Throwable) new DIMException(1));
        kotlin.jvm.internal.h.a((Object) a2, "Observable.error(DIMExce….FIELD_REQUIRED_IS_NULL))");
        return a2;
    }

    public final Observable<List<Message>> fetchBackwardMessagesFromServer(String str, String str2) {
        kotlin.jvm.internal.h.b(str, "vchannelId");
        kotlin.jvm.internal.h.b(str2, MessageActivity.KEY_MESSAGE_KEY);
        TeamContext current = TeamContext.Companion.current();
        if (current != null) {
            return fetchMessagesFromServer(current, str, QueryMessagesRequestBody.Companion.createSinceBackwardForKeyQuery(str2, 20, false), true);
        }
        Observable<List<Message>> a2 = Observable.a((Throwable) new DIMException(1));
        kotlin.jvm.internal.h.a((Object) a2, "Observable.error(DIMExce….FIELD_REQUIRED_IS_NULL))");
        return a2;
    }

    public final Observable<List<Message>> fetchForwardMessagesFromServer(String str, String str2) {
        kotlin.jvm.internal.h.b(str, "vchannelId");
        kotlin.jvm.internal.h.b(str2, MessageActivity.KEY_MESSAGE_KEY);
        TeamContext current = TeamContext.Companion.current();
        if (current != null) {
            return fetchMessagesFromServer(current, str, QueryMessagesRequestBody.Companion.createSinceForwardForKeyQuery(str2, 20, false), true);
        }
        Observable<List<Message>> a2 = Observable.a((Throwable) new DIMException(1));
        kotlin.jvm.internal.h.a((Object) a2, "Observable.error(DIMExce….FIELD_REQUIRED_IS_NULL))");
        return a2;
    }

    public final Observable<List<Message>> fetchLatestMessagesFromServer(String str) {
        kotlin.jvm.internal.h.b(str, "vchannelId");
        TeamContext current = TeamContext.Companion.current();
        if (current != null) {
            return fetchMessagesFromServer(current, str, QueryMessagesRequestBody.Companion.createLatestQuery(20), true);
        }
        Observable<List<Message>> a2 = Observable.a((Throwable) new DIMException(1));
        kotlin.jvm.internal.h.a((Object) a2, "Observable.error(DIMExce….FIELD_REQUIRED_IS_NULL))");
        return a2;
    }

    public final Observable<RealmResults<Message>> fetchMessages(final Realm realm, final String str, String str2) {
        Observable observable;
        Observable observable2;
        Observable observable3;
        kotlin.jvm.internal.h.b(realm, "realm");
        kotlin.jvm.internal.h.b(str, "vchannelId");
        Conversation fetchByVid = ConversationHelper.INSTANCE.fetchByVid(realm, str);
        if (fetchByVid == null) {
            Observable<RealmResults<Message>> b2 = Observable.b();
            kotlin.jvm.internal.h.a((Object) b2, "Observable.empty()");
            return b2;
        }
        if (str2 != null) {
            if (!(str2.length() == 0)) {
                if (MessageHelper.INSTANCE.fetchByKey(realm, str2) == null || !fetchByVid.isContinuous()) {
                    observable3 = fetchRangeMessages(str, str2).a(io.reactivex.a.b.a.a()).b((Function<? super List<Message>, ? extends ObservableSource<? extends R>>) new Function<T, ObservableSource<? extends R>>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessages$2
                        @Override // io.reactivex.functions.Function
                        public final Observable<RealmResults<Message>> apply(final List<? extends Message> list) {
                            RealmResults<Message> fetchRangeByVid;
                            kotlin.jvm.internal.h.b(list, "messages");
                            Message message = (Message) m.f((List) list);
                            Message message2 = (Message) m.h((List) list);
                            Realm.this.refresh();
                            if (message == null || message2 == null || kotlin.jvm.internal.h.a((Object) message.getKey(), (Object) message2.getKey())) {
                                return Observable.a(MessageHelper.fetchAllByVid$default(MessageHelper.INSTANCE, Realm.this, str, null, 4, null));
                            }
                            final Conversation fetchByVid2 = ConversationHelper.INSTANCE.fetchByVid(Realm.this, str);
                            if (fetchByVid2 == null) {
                                throw new RuntimeException("MessageFlow[" + str + "] after fetch range messages, conversation is null!");
                            }
                            if (MessageHelper.INSTANCE.isSaveInRealm(Realm.this, message2.getKey()) && fetchByVid2.isContinuous()) {
                                final Realm realm2 = Realm.this;
                                if (realm2.isInTransaction()) {
                                    MessageHelper.INSTANCE.createOrUpdateMessagesFromServer(realm2, list);
                                } else {
                                    realm2.executeTransaction(new Realm.Transaction() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessages$2$$special$$inlined$execSafeTransaction$1
                                        @Override // io.realm.Realm.Transaction
                                        public final void execute(Realm realm3) {
                                            Realm realm4 = Realm.this;
                                            MessageHelper messageHelper = MessageHelper.INSTANCE;
                                            List<? extends Message> list2 = list;
                                            kotlin.jvm.internal.h.a((Object) list2, "messages");
                                            messageHelper.createOrUpdateMessagesFromServer(realm4, list2);
                                        }
                                    });
                                }
                                fetchRangeByVid = MessageHelper.fetchAllByVid$default(MessageHelper.INSTANCE, Realm.this, str, null, 4, null);
                            } else {
                                final Realm realm3 = Realm.this;
                                if (realm3.isInTransaction()) {
                                    fetchByVid2.setContinuous(false);
                                    MessageHelper.INSTANCE.createOrUpdateMessagesFromServer(realm3, list);
                                } else {
                                    realm3.executeTransaction(new Realm.Transaction() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessages$2$$special$$inlined$execSafeTransaction$2
                                        @Override // io.realm.Realm.Transaction
                                        public final void execute(Realm realm4) {
                                            Realm realm5 = Realm.this;
                                            fetchByVid2.setContinuous(false);
                                            MessageHelper messageHelper = MessageHelper.INSTANCE;
                                            List<? extends Message> list2 = list;
                                            kotlin.jvm.internal.h.a((Object) list2, "messages");
                                            messageHelper.createOrUpdateMessagesFromServer(realm5, list2);
                                        }
                                    });
                                }
                                fetchRangeByVid = MessageHelper.INSTANCE.fetchRangeByVid(Realm.this, str, message, message2, true);
                            }
                            return Observable.a(fetchRangeByVid);
                        }
                    });
                } else {
                    RealmResults fetchAllByVid$default = MessageHelper.fetchAllByVid$default(MessageHelper.INSTANCE, realm, str, null, 4, null);
                    mLogger.i("MessageFlow[" + str + "] fetched " + fetchAllByVid$default.size() + " continuous local messages for " + str2);
                    observable3 = Observable.a(fetchAllByVid$default);
                }
                kotlin.jvm.internal.h.a((Object) observable3, "if (message != null && c…          }\n            }");
                observable2 = observable3;
                return observable2;
            }
        }
        if (fetchByVid.isContinuous()) {
            RealmResults fetchAllByVid$default2 = MessageHelper.fetchAllByVid$default(MessageHelper.INSTANCE, realm, str, null, 4, null);
            mLogger.i("MessageFlow[" + str + "] fetched " + fetchAllByVid$default2.size() + " continuous local messages");
            observable = Observable.a(fetchAllByVid$default2);
        } else {
            observable = fetchLatestMessagesAndClear(str).a(io.reactivex.a.b.a.a()).b((Function<? super List<Message>, ? extends ObservableSource<? extends R>>) new Function<T, ObservableSource<? extends R>>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessages$1
                @Override // io.reactivex.functions.Function
                public final Observable<RealmResults<Message>> apply(List<? extends Message> list) {
                    kotlin.jvm.internal.h.b(list, "it");
                    Realm.this.refresh();
                    return Observable.a(MessageHelper.fetchAllByVid$default(MessageHelper.INSTANCE, Realm.this, str, null, 4, null));
                }
            });
        }
        kotlin.jvm.internal.h.a((Object) observable, "if (conversation.isConti…          }\n            }");
        observable2 = observable;
        return observable2;
    }

    public final Completable fetchMessagesFromServerBulkByKey(final String str, List<String> list, int i) {
        kotlin.jvm.internal.h.b(str, "vchannelId");
        kotlin.jvm.internal.h.b(list, "messageKeys");
        final TeamContext current = TeamContext.Companion.current();
        if (current == null) {
            Completable a2 = Completable.a(new DIMException(1));
            kotlin.jvm.internal.h.a((Object) a2, "Completable.error(DIMExc….FIELD_REQUIRED_IS_NULL))");
            return a2;
        }
        int size = list.size() % i == 0 ? list.size() / i : (list.size() / i) + 1;
        IntRange b2 = kotlin.ranges.k.b(0, size);
        ArrayList arrayList = new ArrayList(m.a(b2, 10));
        Iterator<Integer> it2 = b2.iterator();
        while (it2.hasNext()) {
            int b3 = ((aa) it2).b();
            int i2 = b3 * i;
            arrayList.add(m.a(m.a((List) list, new IntRange(i2, b3 < size + (-1) ? (i - 1) + i2 : list.size() - 1)), null, null, null, 0, null, new Function1<String, String>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServerBulkByKey$messageKeysList$1$1
                @Override // kotlin.jvm.functions.Function1
                public final String invoke(String str2) {
                    kotlin.jvm.internal.h.b(str2, "it");
                    return str2;
                }
            }, 31, null));
        }
        Completable b4 = Observable.a((Iterable) arrayList).b(new Function<T, ObservableSource<? extends R>>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServerBulkByKey$1
            @Override // io.reactivex.functions.Function
            public final Observable<Map<String, Message>> apply(String str2) {
                kotlin.jvm.internal.h.b(str2, "keys");
                return TeamContext.this.conversationApi().queryMessagesBulkByKey(str, str2).d(new ResponseToResult()).b(RealmWriteExecutorKt.runRealmWriteTask(TeamContext.this, new Function2<Realm, Map<String, ? extends Message>, Unit>() { // from class: com.didi.comlab.horcrux.chat.manager.DIMMessageManager$fetchMessagesFromServerBulkByKey$1.1
                    @Override // kotlin.jvm.functions.Function2
                    public /* bridge */ /* synthetic */ Unit invoke(Realm realm, Map<String, ? extends Message> map) {
                        invoke2(realm, map);
                        return Unit.f16169a;
                    }

                    /* renamed from: invoke, reason: avoid collision after fix types in other method */
                    public final void invoke2(Realm realm, Map<String, ? extends Message> map) {
                        kotlin.jvm.internal.h.b(realm, "realm");
                        kotlin.jvm.internal.h.b(map, "input");
                        MessageHelper.INSTANCE.createOrUpdateMessagesFromServer(realm, m.h(map.values()));
                    }
                }));
            }
        }).e().b();
        kotlin.jvm.internal.h.a((Object) b4, "Observable.fromIterable(…       .onErrorComplete()");
        return b4;
    }
}
