package org.apache.pekko.actor.typed.delivery.internal;

import java.io.Serializable;
import java.util.concurrent.TimeoutException;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorRef$;
import org.apache.pekko.actor.typed.ActorRef$ActorRefOps$;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.DispatcherSelector$;
import org.apache.pekko.actor.typed.delivery.ConsumerController;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$LoadState$;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$MessageSent$;
import org.apache.pekko.actor.typed.delivery.DurableProducerQueue$State$;
import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController;
import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController$RequestNext$;
import org.apache.pekko.actor.typed.delivery.internal.WorkPullingProducerControllerImpl;
import org.apache.pekko.actor.typed.receptionist.Receptionist;
import org.apache.pekko.actor.typed.receptionist.Receptionist$Subscribe$;
import org.apache.pekko.actor.typed.receptionist.ServiceKey;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.apache.pekko.actor.typed.scaladsl.Behaviors$;
import org.apache.pekko.actor.typed.scaladsl.StashBuffer;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.Timeout$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: WorkPullingProducerControllerImpl.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl$.class */
public final class WorkPullingProducerControllerImpl$ implements Serializable {
    public static final WorkPullingProducerControllerImpl$WorkerRequestNext$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$WorkerRequestNext = null;
    public static final WorkPullingProducerControllerImpl$Ack$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Ack = null;
    public static final WorkPullingProducerControllerImpl$AskTimeout$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$AskTimeout = null;
    public static final WorkPullingProducerControllerImpl$RegisterConsumerDone$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$RegisterConsumerDone = null;
    private static final WorkPullingProducerControllerImpl$LoadStateReply$ LoadStateReply = null;
    private static final WorkPullingProducerControllerImpl$LoadStateFailed$ LoadStateFailed = null;
    private static final WorkPullingProducerControllerImpl$StoreMessageSentReply$ StoreMessageSentReply = null;
    public static final WorkPullingProducerControllerImpl$StoreMessageSentFailed$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$StoreMessageSentFailed = null;
    public static final WorkPullingProducerControllerImpl$StoreMessageSentCompleted$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$StoreMessageSentCompleted = null;
    public static final WorkPullingProducerControllerImpl$DurableQueueTerminated$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$DurableQueueTerminated = null;
    public static final WorkPullingProducerControllerImpl$OutState$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$OutState = null;
    public static final WorkPullingProducerControllerImpl$Unconfirmed$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Unconfirmed = null;
    private static final WorkPullingProducerControllerImpl$State$ State = null;
    public static final WorkPullingProducerControllerImpl$PreselectedWorker$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$PreselectedWorker = null;
    public static final WorkPullingProducerControllerImpl$HandOver$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$HandOver = null;
    private static final WorkPullingProducerControllerImpl$CurrentWorkers$ CurrentWorkers = null;
    public static final WorkPullingProducerControllerImpl$Msg$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$Msg = null;
    public static final WorkPullingProducerControllerImpl$ResendDurableMsg$ org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$ResendDurableMsg = null;
    public static final WorkPullingProducerControllerImpl$ MODULE$ = new WorkPullingProducerControllerImpl$();

    private WorkPullingProducerControllerImpl$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(WorkPullingProducerControllerImpl$.class);
    }

    public <A> Behavior<WorkPullingProducerController.Command<A>> apply(String str, ServiceKey<ConsumerController.Command<A>> serviceKey, Option<Behavior<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, ClassTag<A> classTag) {
        return Behaviors$.MODULE$.withStash(settings.bufferSize(), stashBuffer -> {
            return Behaviors$.MODULE$.setup(actorContext -> {
                Behaviors$ behaviors$ = Behaviors$.MODULE$;
                Map<String, String> map = (Map) Predef$.MODULE$.Map().apply2(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("producerId"), str)}));
                actorContext.setLoggerName("org.apache.pekko.actor.typed.delivery.WorkPullingProducerController");
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.system().receptionist()), Receptionist$Subscribe$.MODULE$.apply(serviceKey, actorContext.messageAdapter(listing -> {
                    return WorkPullingProducerControllerImpl$CurrentWorkers$.MODULE$.apply(listing.allServiceInstances(serviceKey));
                }, ClassTag$.MODULE$.apply(Receptionist.Listing.class))));
                Option askLoadState = MODULE$.askLoadState(actorContext, option, settings);
                return behaviors$.withMdc(map, (Behavior) MODULE$.waitingForStart(str, actorContext, stashBuffer, askLoadState, settings, None$.MODULE$, MODULE$.createInitialState(askLoadState.nonEmpty()), classTag), ClassTag$.MODULE$.apply(WorkPullingProducerControllerImpl.InternalCommand.class));
            });
        }).narrow();
    }

    private <A> Option<DurableProducerQueue.State<A>> createInitialState(boolean z) {
        return z ? None$.MODULE$ : Some$.MODULE$.apply(DurableProducerQueue$State$.MODULE$.empty());
    }

    private <A> Behavior<WorkPullingProducerControllerImpl.InternalCommand> waitingForStart(String str, ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer, Option<ActorRef<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, Option<ActorRef<WorkPullingProducerController.RequestNext<A>>> option2, Option<DurableProducerQueue.State<A>> option3, ClassTag<A> classTag) {
        return (Behavior) Behaviors$.MODULE$.receiveMessage(internalCommand -> {
            if (internalCommand instanceof WorkPullingProducerController.Start) {
                WorkPullingProducerController.Start start = (WorkPullingProducerController.Start) internalCommand;
                ProducerControllerImpl$.MODULE$.enforceLocalProducer(start.producer());
                if (option3 instanceof Some) {
                    return becomeActive$1(actorContext, classTag, stashBuffer, str, option, settings, start.producer(), (DurableProducerQueue.State) ((Some) option3).value());
                }
                if (None$.MODULE$.equals(option3)) {
                    return waitingForStart(str, actorContext, stashBuffer, option, settings, Some$.MODULE$.apply(start.producer()), option3, classTag);
                }
                throw new MatchError(option3);
            }
            if (internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateReply) {
                WorkPullingProducerControllerImpl.LoadStateReply loadStateReply = (WorkPullingProducerControllerImpl.LoadStateReply) internalCommand;
                if (option2 instanceof Some) {
                    return becomeActive$1(actorContext, classTag, stashBuffer, str, option, settings, (ActorRef) ((Some) option2).value(), loadStateReply.state());
                }
                if (None$.MODULE$.equals(option2)) {
                    return waitingForStart(str, actorContext, stashBuffer, option, settings, option2, Some$.MODULE$.apply(loadStateReply.state()), classTag);
                }
                throw new MatchError(option2);
            }
            if (!(internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateFailed)) {
                if (WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$.equals(internalCommand)) {
                    throw new IllegalStateException("DurableQueue was unexpectedly terminated.");
                }
                org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$checkStashFull(stashBuffer);
                stashBuffer.stash(internalCommand);
                return Behaviors$.MODULE$.same();
            }
            int _1 = WorkPullingProducerControllerImpl$LoadStateFailed$.MODULE$.unapply((WorkPullingProducerControllerImpl.LoadStateFailed) internalCommand)._1();
            if (_1 >= settings.producerControllerSettings().durableQueueRetryAttempts()) {
                String sb = new StringBuilder(46).append("LoadState failed after [").append(_1).append("] attempts, giving up.").toString();
                actorContext.log().error(sb);
                throw new TimeoutException(sb);
            }
            actorContext.log().warn("LoadState failed, attempt [{}] of [{}], retrying.", BoxesRunTime.boxToInteger(_1), BoxesRunTime.boxToInteger(settings.producerControllerSettings().durableQueueRetryAttempts()));
            askLoadState(actorContext, option, settings, _1 + 1);
            return Behaviors$.MODULE$.same();
        });
    }

    public <A> void org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$$checkStashFull(StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer) {
        if (stashBuffer.isFull()) {
            throw new IllegalArgumentException(new StringBuilder(24).append("Buffer is full, size [").append(stashBuffer.size()).append("].").toString());
        }
    }

    private <A> Option<ActorRef<DurableProducerQueue.Command<A>>> askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, Option<Behavior<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings) {
        return (Option<ActorRef<DurableProducerQueue.Command<A>>>) option.map(behavior -> {
            ActorRef spawn = actorContext.spawn(behavior, "durable", DispatcherSelector$.MODULE$.sameAsParent());
            actorContext.watchWith(spawn, WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$);
            MODULE$.askLoadState(actorContext, Some$.MODULE$.apply(spawn), settings, 1);
            return spawn;
        });
    }

    private <A> void askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, Option<ActorRef<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, int i) {
        Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(settings.producerControllerSettings().durableQueueRequestTimeout());
        option.foreach(actorRef -> {
            actorContext.ask(actorRef, actorRef -> {
                return DurableProducerQueue$LoadState$.MODULE$.apply(actorRef);
            }, r5 -> {
                WorkPullingProducerControllerImpl.InternalCommand apply;
                if (r5 instanceof Success) {
                    apply = WorkPullingProducerControllerImpl$LoadStateReply$.MODULE$.apply((DurableProducerQueue.State) ((Success) r5).value());
                } else {
                    if (!(r5 instanceof Failure)) {
                        throw new MatchError(r5);
                    }
                    apply = WorkPullingProducerControllerImpl$LoadStateFailed$.MODULE$.apply(i);
                }
                return apply;
            }, durationToTimeout, ClassTag$.MODULE$.apply(DurableProducerQueue.State.class));
        });
    }

    private <A> WorkPullingProducerControllerImpl.State<A> createInitialState(long j, ActorRef<WorkPullingProducerController.RequestNext<A>> actorRef) {
        return WorkPullingProducerControllerImpl$State$.MODULE$.apply(j, Predef$.MODULE$.Set().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), actorRef, false);
    }

    private final Behavior becomeActive$1(ActorContext actorContext, ClassTag classTag, StashBuffer stashBuffer, String str, Option option, WorkPullingProducerController.Settings settings, ActorRef actorRef, DurableProducerQueue.State state) {
        state.unconfirmed().foreach(messageSent -> {
            if (messageSent != null) {
                Option<Tuple5<Object, Object, Object, String, Object>> unapply = DurableProducerQueue$MessageSent$.MODULE$.unapply(messageSent);
                if (unapply.isEmpty()) {
                    return;
                }
                Tuple5<Object, Object, Object, String, Object> tuple5 = unapply.get();
                long unboxToLong = BoxesRunTime.unboxToLong(tuple5._1());
                Object _2 = tuple5._2();
                String _4 = tuple5._4();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), WorkPullingProducerControllerImpl$ResendDurableMsg$.MODULE$.apply(_2, _4, unboxToLong));
            }
        });
        return stashBuffer.unstashAll(new WorkPullingProducerControllerImpl(actorContext, stashBuffer, str, WorkPullingProducerController$RequestNext$.MODULE$.apply(actorContext.messageAdapter(obj -> {
            return WorkPullingProducerControllerImpl$Msg$.MODULE$.apply(obj, false, None$.MODULE$);
        }, classTag), actorContext.self()), option, settings, classTag).org$apache$pekko$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$active(createInitialState(state.currentSeqNr(), actorRef)));
    }
}
