/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pekko.actor.typed.delivery.internal;

import java.io.Serializable;
import java.util.concurrent.TimeoutException;
import org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorRef$;
import org.apache.pekko.actor.typed.ActorRef$ActorRefOps$;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.DispatcherSelector$;
import org.apache.pekko.actor.typed.delivery.ConsumerController;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$LoadState$;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$MessageSent$;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$State$;
import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController;
import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController$RequestNext$;
import org.apache.pekko.actor.typed.delivery.internal.ProducerControllerImpl$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$Ack$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$AskTimeout$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$CurrentWorkers$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$DurableQueueTerminated$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$HandOver$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$LoadStateFailed$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$LoadStateReply$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$Msg$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$OutState$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$PreselectedWorker$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$RegisterConsumerDone$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$ResendDurableMsg$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$State$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$StoreMessageSentCompleted$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$StoreMessageSentFailed$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$StoreMessageSentReply$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$Unconfirmed$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl$WorkerRequestNext$;
import org.apache.pekko.actor.typed.receptionist.Receptionist;
import org.apache.pekko.actor.typed.receptionist.Receptionist$Subscribe$;
import org.apache.pekko.actor.typed.receptionist.ServiceKey;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.apache.pekko.actor.typed.scaladsl.Behaviors$;
import org.apache.pekko.actor.typed.scaladsl.StashBuffer;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.Timeout$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

@InternalApi
public final class WorkPullingProducerControllerImpl$
implements Serializable {
    public static final WorkPullingProducerControllerImpl$WorkerRequestNext$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$WorkerRequestNext;
    public static final WorkPullingProducerControllerImpl$Ack$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Ack;
    public static final WorkPullingProducerControllerImpl$AskTimeout$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$AskTimeout;
    public static final WorkPullingProducerControllerImpl$RegisterConsumerDone$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$RegisterConsumerDone;
    private static final WorkPullingProducerControllerImpl$LoadStateReply$ LoadStateReply;
    private static final WorkPullingProducerControllerImpl$LoadStateFailed$ LoadStateFailed;
    private static final WorkPullingProducerControllerImpl$StoreMessageSentReply$ StoreMessageSentReply;
    public static final WorkPullingProducerControllerImpl$StoreMessageSentFailed$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$StoreMessageSentFailed;
    public static final WorkPullingProducerControllerImpl$StoreMessageSentCompleted$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$StoreMessageSentCompleted;
    public static final WorkPullingProducerControllerImpl$DurableQueueTerminated$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$DurableQueueTerminated;
    public static final WorkPullingProducerControllerImpl$OutState$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$OutState;
    public static final WorkPullingProducerControllerImpl$Unconfirmed$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Unconfirmed;
    private static final WorkPullingProducerControllerImpl$State$ State;
    public static final WorkPullingProducerControllerImpl$PreselectedWorker$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$PreselectedWorker;
    public static final WorkPullingProducerControllerImpl$HandOver$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$HandOver;
    private static final WorkPullingProducerControllerImpl$CurrentWorkers$ CurrentWorkers;
    public static final WorkPullingProducerControllerImpl$Msg$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Msg;
    public static final WorkPullingProducerControllerImpl$ResendDurableMsg$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$ResendDurableMsg;
    public static final WorkPullingProducerControllerImpl$ MODULE$;

    private WorkPullingProducerControllerImpl$() {
    }

    static {
        MODULE$ = new WorkPullingProducerControllerImpl$();
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(WorkPullingProducerControllerImpl$.class);
    }

    public <A> Behavior<WorkPullingProducerController.Command<A>> apply(String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings, ClassTag<A> evidence$1) {
        return Behaviors$.MODULE$.withStash(settings.bufferSize(), (Function1<StashBuffer, Behavior> & Serializable)stashBuffer -> Behaviors$.MODULE$.setup((Function1<ActorContext, Behavior> & Serializable)context -> {
            Tuple2[] tuple2Array = new Tuple2[1];
            String string2 = Predef$.MODULE$.ArrowAssoc("producerId");
            tuple2Array[0] = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(string2, producerId);
            Map map = (Map)Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(tuple2Array));
            context.setLoggerName("org.apache.pekko.actor.typed.delivery.WorkPullingProducerController");
            ActorRef<Receptionist.Listing> listingAdapter = context.messageAdapter((Function1<Receptionist.Listing, WorkPullingProducerControllerImpl.InternalCommand> & Serializable)listing -> WorkPullingProducerControllerImpl$CurrentWorkers$.MODULE$.apply(listing.allServiceInstances(workerServiceKey)), ClassTag$.MODULE$.apply(Receptionist.Listing.class));
            ActorRef actorRef = ActorRef$.MODULE$.ActorRefOps(context.system().receptionist());
            ActorRef$ActorRefOps$.MODULE$.$bang$extension(actorRef, Receptionist$Subscribe$.MODULE$.apply(workerServiceKey, listingAdapter));
            Option durableQueue = MODULE$.askLoadState((ActorContext<WorkPullingProducerControllerImpl.InternalCommand>)context, durableQueueBehavior, settings);
            return Behaviors$.MODULE$.withMdc(map, MODULE$.waitingForStart(producerId, (ActorContext<WorkPullingProducerControllerImpl.InternalCommand>)context, (StashBuffer<WorkPullingProducerControllerImpl.InternalCommand>)stashBuffer, durableQueue, settings, None$.MODULE$, MODULE$.createInitialState(durableQueue.nonEmpty()), evidence$1), ClassTag$.MODULE$.apply(WorkPullingProducerControllerImpl.InternalCommand.class));
        })).narrow();
    }

    private <A> Option<DurableProducerQueue.State<A>> createInitialState(boolean hasDurableQueue) {
        if (hasDurableQueue) {
            return None$.MODULE$;
        }
        return Some$.MODULE$.apply(DurableProducerQueue$State$.MODULE$.empty());
    }

    private <A> Behavior<WorkPullingProducerControllerImpl.InternalCommand> waitingForStart(String producerId, ActorContext<WorkPullingProducerControllerImpl.InternalCommand> context, StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer, Option<ActorRef<DurableProducerQueue.Command<A>>> durableQueue, WorkPullingProducerController.Settings settings, Option<ActorRef<WorkPullingProducerController.RequestNext<A>>> producer, Option<DurableProducerQueue.State<A>> initialState, ClassTag<A> evidence$1) {
        return (Behavior)((Object)Behaviors$.MODULE$.receiveMessage((Function1<WorkPullingProducerControllerImpl.InternalCommand, Behavior> & Serializable)x$1 -> {
            WorkPullingProducerControllerImpl.InternalCommand internalCommand = x$1;
            if (internalCommand instanceof WorkPullingProducerController.Start) {
                WorkPullingProducerController.Start start = (WorkPullingProducerController.Start)internalCommand;
                ProducerControllerImpl$.MODULE$.enforceLocalProducer(start.producer());
                Option option = initialState;
                if (option instanceof Some) {
                    DurableProducerQueue.State s2 = (DurableProducerQueue.State)((Some)option).value();
                    return this.becomeActive$1(context, evidence$1, stashBuffer, producerId, durableQueue, settings, start.producer(), s2);
                }
                if (None$.MODULE$.equals(option)) {
                    return this.waitingForStart(producerId, context, stashBuffer, durableQueue, settings, Some$.MODULE$.apply(start.producer()), initialState, evidence$1);
                }
                throw new MatchError(option);
            }
            if (internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateReply) {
                WorkPullingProducerControllerImpl.LoadStateReply load = (WorkPullingProducerControllerImpl.LoadStateReply)internalCommand;
                Option option = producer;
                if (option instanceof Some) {
                    ActorRef p = (ActorRef)((Some)option).value();
                    return this.becomeActive$1(context, evidence$1, stashBuffer, producerId, durableQueue, settings, p, load.state());
                }
                if (None$.MODULE$.equals(option)) {
                    return this.waitingForStart(producerId, context, stashBuffer, durableQueue, settings, producer, Some$.MODULE$.apply(load.state()), evidence$1);
                }
                throw new MatchError(option);
            }
            if (internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateFailed) {
                WorkPullingProducerControllerImpl.LoadStateFailed loadStateFailed = WorkPullingProducerControllerImpl$LoadStateFailed$.MODULE$.unapply((WorkPullingProducerControllerImpl.LoadStateFailed)internalCommand);
                int n = loadStateFailed._1();
                int attempt = n;
                if (attempt >= settings.producerControllerSettings().durableQueueRetryAttempts()) {
                    String errorMessage = new StringBuilder(46).append("LoadState failed after [").append(attempt).append("] attempts, giving up.").toString();
                    context.log().error(errorMessage);
                    throw new TimeoutException(errorMessage);
                }
                context.log().warn("LoadState failed, attempt [{}] of [{}], retrying.", (Object)BoxesRunTime.boxToInteger(attempt), (Object)BoxesRunTime.boxToInteger(settings.producerControllerSettings().durableQueueRetryAttempts()));
                this.askLoadState(context, durableQueue, settings, attempt + 1);
                return Behaviors$.MODULE$.same();
            }
            if (WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$.equals(internalCommand)) {
                throw new IllegalStateException("DurableQueue was unexpectedly terminated.");
            }
            WorkPullingProducerControllerImpl.InternalCommand other = internalCommand;
            this.org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$checkStashFull(stashBuffer);
            stashBuffer.stash(other);
            return Behaviors$.MODULE$.same();
        }));
    }

    public <A> void org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$checkStashFull(StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer) {
        if (stashBuffer.isFull()) {
            throw new IllegalArgumentException(new StringBuilder(24).append("Buffer is full, size [").append(stashBuffer.size()).append("].").toString());
        }
    }

    private <A> Option<ActorRef<DurableProducerQueue.Command<A>>> askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> context, Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings) {
        return durableQueueBehavior.map((Function1<Behavior, ActorRef> & Serializable)b -> {
            ActorRef ref = context.spawn(b, "durable", DispatcherSelector$.MODULE$.sameAsParent());
            context.watchWith(ref, WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$);
            MODULE$.askLoadState(context, Some$.MODULE$.apply(ref), settings, 1);
            return ref;
        });
    }

    private <A> void askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> context, Option<ActorRef<DurableProducerQueue.Command<A>>> durableQueue, WorkPullingProducerController.Settings settings, int attempt) {
        Timeout loadTimeout = Timeout$.MODULE$.durationToTimeout(settings.producerControllerSettings().durableQueueRequestTimeout());
        durableQueue.foreach(ref -> context.ask(ref, (Function1<ActorRef, DurableProducerQueue.LoadState> & Serializable)askReplyTo -> DurableProducerQueue$LoadState$.MODULE$.apply(askReplyTo), (Function1<Try, WorkPullingProducerControllerImpl.InternalCommand> & Serializable)x$1 -> {
            Product product;
            Try try_ = x$1;
            if (try_ instanceof Success) {
                DurableProducerQueue.State s2 = (DurableProducerQueue.State)((Success)try_).value();
                product = WorkPullingProducerControllerImpl$LoadStateReply$.MODULE$.apply(s2);
            } else if (try_ instanceof Failure) {
                product = WorkPullingProducerControllerImpl$LoadStateFailed$.MODULE$.apply(attempt);
            } else {
                throw new MatchError(try_);
            }
            return product;
        }, loadTimeout, ClassTag$.MODULE$.apply(DurableProducerQueue.State.class)));
    }

    private <A> WorkPullingProducerControllerImpl.State<A> createInitialState(long currentSeqNr, ActorRef<WorkPullingProducerController.RequestNext<A>> producer) {
        return WorkPullingProducerControllerImpl$State$.MODULE$.apply(currentSeqNr, Predef$.MODULE$.Set().empty(), Predef$.MODULE$.Map().empty(), (Map<Object, WorkPullingProducerControllerImpl.PreselectedWorker>)Predef$.MODULE$.Map().empty(), (Map<Object, ActorRef<Done>>)Predef$.MODULE$.Map().empty(), (Map<Object, WorkPullingProducerControllerImpl.HandOver>)Predef$.MODULE$.Map().empty(), producer, false);
    }

    private final Behavior becomeActive$1(ActorContext context$1, ClassTag evidence$1$3, StashBuffer stashBuffer$2, String producerId$3, Option durableQueue$1, WorkPullingProducerController.Settings settings$3, ActorRef p, DurableProducerQueue.State s2) {
        s2.unconfirmed().foreach(x$1 -> {
            Option<Tuple5<Object, Object, Object, String, Object>> option;
            DurableProducerQueue.MessageSent messageSent = x$1;
            if (messageSent != null && !(option = DurableProducerQueue$MessageSent$.MODULE$.unapply(messageSent)).isEmpty()) {
                Tuple5<Object, Object, Object, String, Object> tuple5 = option.get();
                long oldSeqNr = BoxesRunTime.unboxToLong(tuple5._1());
                Object msg = tuple5._2();
                String oldConfirmationQualifier = tuple5._4();
                ActorRef actorRef = ActorRef$.MODULE$.ActorRefOps(context$1.self());
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(actorRef, WorkPullingProducerControllerImpl$ResendDurableMsg$.MODULE$.apply(msg, oldConfirmationQualifier, oldSeqNr));
                return;
            }
        });
        ActorRef msgAdapter = context$1.messageAdapter((Function1<Object, WorkPullingProducerControllerImpl.InternalCommand> & Serializable)msg -> WorkPullingProducerControllerImpl$Msg$.MODULE$.apply(msg, false, None$.MODULE$), evidence$1$3);
        WorkPullingProducerController.RequestNext requestNext = WorkPullingProducerController$RequestNext$.MODULE$.apply(msgAdapter, context$1.self());
        Behavior<WorkPullingProducerControllerImpl.InternalCommand> b = new WorkPullingProducerControllerImpl(context$1, stashBuffer$2, producerId$3, requestNext, durableQueue$1, settings$3, evidence$1$3).org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$active(this.createInitialState(s2.currentSeqNr(), p));
        return stashBuffer$2.unstashAll(b);
    }
}

