package org.apache.pekko.stream.impl;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.impl.ReactiveStreamsCompliance;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Seq;
import scala.util.control.NoStackTrace;

/* compiled from: ActorPublisher.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/impl/ActorPublisher.class */
public class ActorPublisher<T> implements Publisher<T> {
    private final ActorRef impl;
    private final AtomicReference<Seq<Subscriber<? super T>>> pendingSubscribers = new AtomicReference<>(scala.package$.MODULE$.Nil());
    private final Object wakeUpMsg = SubscribePending$.MODULE$;
    private volatile Option<Throwable> shutdownReason = None$.MODULE$;

    /* compiled from: ActorPublisher.scala */
    /* loaded from: input_file:org/apache/pekko/stream/impl/ActorPublisher$NormalShutdownException.class */
    public static class NormalShutdownException extends IllegalStateException implements NoStackTrace {
        public NormalShutdownException() {
            super(ActorPublisher$.MODULE$.NormalShutdownReasonMessage());
            NoStackTrace.$init$(this);
        }

        @Override // java.lang.Throwable, scala.util.control.NoStackTrace
        public /* bridge */ /* synthetic */ Throwable fillInStackTrace() {
            Throwable fillInStackTrace;
            fillInStackTrace = fillInStackTrace();
            return fillInStackTrace;
        }

        @Override // scala.util.control.NoStackTrace
        public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
            return super.fillInStackTrace();
        }
    }

    public static Throwable NormalShutdownReason() {
        return ActorPublisher$.MODULE$.NormalShutdownReason();
    }

    public static String NormalShutdownReasonMessage() {
        return ActorPublisher$.MODULE$.NormalShutdownReasonMessage();
    }

    public static Some<Throwable> SomeNormalShutdownReason() {
        return ActorPublisher$.MODULE$.SomeNormalShutdownReason();
    }

    public static <T> ActorPublisher<T> apply(ActorRef actorRef) {
        return ActorPublisher$.MODULE$.apply(actorRef);
    }

    public ActorPublisher(ActorRef actorRef) {
        this.impl = actorRef;
    }

    public ActorRef impl() {
        return this.impl;
    }

    public Object wakeUpMsg() {
        return this.wakeUpMsg;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        ReactiveStreamsCompliance$.MODULE$.requireNonNullSubscriber(subscriber);
        doSubscribe$1(subscriber);
    }

    public Seq<Subscriber<? super T>> takePendingSubscribers() {
        Seq<Subscriber<? super T>> andSet = this.pendingSubscribers.getAndSet(scala.package$.MODULE$.Nil());
        return andSet == null ? scala.package$.MODULE$.Nil() : (Seq) andSet.reverse();
    }

    public void shutdown(Option<Throwable> option) {
        this.shutdownReason = option;
        Seq<Subscriber<? super T>> andSet = this.pendingSubscribers.getAndSet(null);
        if (andSet == null) {
            return;
        }
        andSet.foreach(subscriber -> {
            reportSubscribeFailure(subscriber);
        });
    }

    private void reportSubscribeFailure(Subscriber<? super T> subscriber) {
        boolean z;
        try {
            Option<Throwable> option = this.shutdownReason;
            if (option instanceof Some) {
                Throwable th = (Throwable) ((Some) option).value();
                if (!(th instanceof ReactiveStreamsCompliance.SpecViolation)) {
                    ReactiveStreamsCompliance$.MODULE$.tryOnSubscribe(subscriber, CancelledSubscription$.MODULE$);
                    ReactiveStreamsCompliance$.MODULE$.tryOnError(subscriber, th);
                }
            } else {
                if (!None$.MODULE$.equals(option)) {
                    throw new MatchError(option);
                }
                ReactiveStreamsCompliance$.MODULE$.tryOnSubscribe(subscriber, CancelledSubscription$.MODULE$);
                ReactiveStreamsCompliance$.MODULE$.tryOnComplete(subscriber);
            }
        } finally {
            if (z) {
            }
        }
    }

    private final void doSubscribe$1(Subscriber subscriber) {
        Seq<Subscriber<? super T>> seq;
        do {
            seq = this.pendingSubscribers.get();
            if (seq == null) {
                reportSubscribeFailure(subscriber);
                return;
            }
        } while (!this.pendingSubscribers.compareAndSet(seq, seq.$plus$colon(subscriber)));
        impl().$bang(wakeUpMsg(), impl().$bang$default$2(wakeUpMsg()));
    }
}
