/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pekko.stream.impl;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.impl.ActorPublisher$;
import org.apache.pekko.stream.impl.CancelledSubscription$;
import org.apache.pekko.stream.impl.ReactiveStreamsCompliance;
import org.apache.pekko.stream.impl.ReactiveStreamsCompliance$;
import org.apache.pekko.stream.impl.SubscribePending$;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.util.control.NoStackTrace;

@InternalApi
public class ActorPublisher<T>
implements Publisher<T> {
    private final ActorRef impl;
    private final AtomicReference<Seq<Subscriber<? super T>>> pendingSubscribers;
    private final Object wakeUpMsg;
    private volatile Option<Throwable> shutdownReason;

    public static Throwable NormalShutdownReason() {
        return ActorPublisher$.MODULE$.NormalShutdownReason();
    }

    public static String NormalShutdownReasonMessage() {
        return ActorPublisher$.MODULE$.NormalShutdownReasonMessage();
    }

    public static Some<Throwable> SomeNormalShutdownReason() {
        return ActorPublisher$.MODULE$.SomeNormalShutdownReason();
    }

    public static <T> ActorPublisher<T> apply(ActorRef actorRef) {
        return ActorPublisher$.MODULE$.apply(actorRef);
    }

    public ActorPublisher(ActorRef impl) {
        this.impl = impl;
        this.pendingSubscribers = new AtomicReference<Nil$>(package$.MODULE$.Nil());
        this.wakeUpMsg = SubscribePending$.MODULE$;
        this.shutdownReason = None$.MODULE$;
    }

    public ActorRef impl() {
        return this.impl;
    }

    public Object wakeUpMsg() {
        return this.wakeUpMsg;
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        ReactiveStreamsCompliance$.MODULE$.requireNonNullSubscriber(subscriber);
        this.doSubscribe$1(subscriber);
    }

    public Seq<Subscriber<? super T>> takePendingSubscribers() {
        Seq pending = this.pendingSubscribers.getAndSet(package$.MODULE$.Nil());
        if (pending == null) {
            return package$.MODULE$.Nil();
        }
        return (Seq)pending.reverse();
    }

    public void shutdown(Option<Throwable> reason) {
        this.shutdownReason = reason;
        Seq seq = this.pendingSubscribers.getAndSet(null);
        if (seq == null) {
            return;
        }
        Seq pending = seq;
        pending.foreach(subscriber -> this.reportSubscribeFailure((Subscriber<? super T>)subscriber));
    }

    private void reportSubscribeFailure(Subscriber<? super T> subscriber) {
        block5: {
            try {
                Option<Throwable> option = this.shutdownReason;
                if (option instanceof Some) {
                    Throwable throwable = (Throwable)((Some)option).value();
                    if (!(throwable instanceof ReactiveStreamsCompliance.SpecViolation)) {
                        Throwable e = throwable;
                        ReactiveStreamsCompliance$.MODULE$.tryOnSubscribe(subscriber, CancelledSubscription$.MODULE$);
                        ReactiveStreamsCompliance$.MODULE$.tryOnError(subscriber, e);
                    }
                    break block5;
                }
                if (None$.MODULE$.equals(option)) {
                    ReactiveStreamsCompliance$.MODULE$.tryOnSubscribe(subscriber, CancelledSubscription$.MODULE$);
                    ReactiveStreamsCompliance$.MODULE$.tryOnComplete(subscriber);
                    break block5;
                }
                throw new MatchError(option);
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                if (throwable2 instanceof ReactiveStreamsCompliance.SpecViolation) break block5;
                throw throwable;
            }
        }
    }

    private final void doSubscribe$1(Subscriber subscriber$1) {
        Seq<Subscriber<T>> current;
        do {
            if ((current = this.pendingSubscribers.get()) != null) continue;
            this.reportSubscribeFailure(subscriber$1);
            return;
        } while (!this.pendingSubscribers.compareAndSet(current, (Seq<Subscriber<T>>)current.$plus$colon(subscriber$1)));
        this.impl().$bang(this.wakeUpMsg(), this.impl().$bang$default$2(this.wakeUpMsg()));
    }

    public static class NormalShutdownException
    extends IllegalStateException
    implements NoStackTrace {
        public NormalShutdownException() {
            super(ActorPublisher$.MODULE$.NormalShutdownReasonMessage());
            NoStackTrace.$init$(this);
        }

        @Override
        public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
            return super.fillInStackTrace();
        }
    }
}

