/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pekko.stream.impl;

import org.apache.pekko.actor.Actor;
import org.apache.pekko.actor.ActorLogging;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.StreamSubscriptionTimeoutSettings;
import org.apache.pekko.stream.StreamSubscriptionTimeoutTerminationMode;
import org.apache.pekko.stream.StreamSubscriptionTimeoutTerminationMode$CancelTermination$;
import org.apache.pekko.stream.StreamSubscriptionTimeoutTerminationMode$NoopTermination$;
import org.apache.pekko.stream.StreamSubscriptionTimeoutTerminationMode$WarnTermination$;
import org.apache.pekko.stream.impl.StreamSubscriptionTimeoutSupport$NoopSubscriptionTimeout$;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import scala.MatchError;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;
import scala.util.control.NoStackTrace;

@InternalApi
public interface StreamSubscriptionTimeoutSupport {
    public StreamSubscriptionTimeoutSettings subscriptionTimeoutSettings();

    public static Cancellable scheduleSubscriptionTimeout$(StreamSubscriptionTimeoutSupport $this, ActorRef actor, Object message) {
        return $this.scheduleSubscriptionTimeout(actor, message);
    }

    default public Cancellable scheduleSubscriptionTimeout(ActorRef actor, Object message) {
        StreamSubscriptionTimeoutTerminationMode streamSubscriptionTimeoutTerminationMode = this.subscriptionTimeoutSettings().mode();
        if (StreamSubscriptionTimeoutTerminationMode$NoopTermination$.MODULE$.equals(streamSubscriptionTimeoutTerminationMode)) {
            return StreamSubscriptionTimeoutSupport$NoopSubscriptionTimeout$.MODULE$;
        }
        Cancellable cancellable = ((Actor)((Object)this)).context().system().scheduler().scheduleOnce(this.subscriptionTimeoutSettings().timeout(), actor, message, (ExecutionContext)((Actor)((Object)this)).context().dispatcher(), ((Actor)((Object)this)).self());
        return cancellable;
    }

    private void cancel(Publisher<?> target, FiniteDuration timeout) {
        long millis = timeout.toMillis();
        Publisher<?> publisher = target;
        if (publisher instanceof Processor) {
            Processor p = (Processor)publisher;
            ((ActorLogging)((Object)((Actor)((Object)this)))).log().debug("Cancelling {} Processor's publisher and subscriber sides (after {} ms)", p, BoxesRunTime.boxToLong(millis));
            this.handleSubscriptionTimeout(target, (Exception)((Object)new NoStackTrace(millis){
                {
                    NoStackTrace.$init$(this);
                }

                public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
                    return super.fillInStackTrace();
                }
            }));
            return;
        }
        if (publisher != null) {
            Publisher<?> p = publisher;
            ((ActorLogging)((Object)((Actor)((Object)this)))).log().debug("Cancelling {} (after: {} ms)", p, BoxesRunTime.boxToLong(millis));
            this.handleSubscriptionTimeout(target, (Exception)((Object)new NoStackTrace(p){
                {
                    NoStackTrace.$init$(this);
                }

                public Throwable scala$util$control$NoStackTrace$$super$fillInStackTrace() {
                    return super.fillInStackTrace();
                }
            }));
            return;
        }
        throw new MatchError(publisher);
    }

    private void warn(Publisher<?> target, FiniteDuration timeout) {
        ((ActorLogging)((Object)((Actor)((Object)this)))).log().warning("Timed out {} detected (after {} ms)! You should investigate if you either cancel or consume all {} instances", target, (Object)BoxesRunTime.boxToLong(timeout.toMillis()), (Object)target.getClass().getCanonicalName());
    }

    public static void subscriptionTimedOut$(StreamSubscriptionTimeoutSupport $this, Publisher target) {
        $this.subscriptionTimedOut(target);
    }

    default public void subscriptionTimedOut(Publisher<?> target) {
        StreamSubscriptionTimeoutTerminationMode streamSubscriptionTimeoutTerminationMode = this.subscriptionTimeoutSettings().mode();
        if (StreamSubscriptionTimeoutTerminationMode$NoopTermination$.MODULE$.equals(streamSubscriptionTimeoutTerminationMode)) {
            return;
        }
        if (StreamSubscriptionTimeoutTerminationMode$WarnTermination$.MODULE$.equals(streamSubscriptionTimeoutTerminationMode)) {
            this.warn(target, this.subscriptionTimeoutSettings().timeout());
            return;
        }
        if (StreamSubscriptionTimeoutTerminationMode$CancelTermination$.MODULE$.equals(streamSubscriptionTimeoutTerminationMode)) {
            this.cancel(target, this.subscriptionTimeoutSettings().timeout());
            return;
        }
        throw new MatchError(streamSubscriptionTimeoutTerminationMode);
    }

    public void handleSubscriptionTimeout(Publisher<?> var1, Exception var2);
}

