package org.apache.pekko.io;

import java.io.Serializable;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.PortUnreachableException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import org.apache.pekko.actor.Actor;
import org.apache.pekko.actor.ActorContext;
import org.apache.pekko.actor.ActorLogging;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.dispatch.RequiresMessageQueue;
import org.apache.pekko.dispatch.UnboundedMessageQueueSemantics;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.io.UdpConnected;
import org.apache.pekko.io.dns.DnsProtocol;
import org.apache.pekko.io.dns.DnsProtocol$Resolve$;
import org.apache.pekko.util.ByteString$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;
import scala.util.control.NonFatal$;

/* compiled from: UdpConnection.scala */
/* loaded from: input_file:org/apache/pekko/io/UdpConnection.class */
public class UdpConnection implements Actor, ActorLogging, RequiresMessageQueue<UnboundedMessageQueueSemantics> {
    private ActorContext context;
    private ActorRef self;
    private LoggingAdapter org$apache$pekko$actor$ActorLogging$$_log;
    public final UdpConnectedExt org$apache$pekko$io$UdpConnection$$udpConn;
    private final ChannelRegistry channelRegistry;
    public final ActorRef org$apache$pekko$io$UdpConnection$$commander;
    public final UdpConnected.Connect org$apache$pekko$io$UdpConnection$$connect;
    private Tuple2 pendingSend;
    private DatagramChannel channel;

    public UdpConnection(UdpConnectedExt udpConnectedExt, ChannelRegistry channelRegistry, ActorRef actorRef, UdpConnected.Connect connect) {
        this.org$apache$pekko$io$UdpConnection$$udpConn = udpConnectedExt;
        this.channelRegistry = channelRegistry;
        this.org$apache$pekko$io$UdpConnection$$commander = actorRef;
        this.org$apache$pekko$io$UdpConnection$$connect = connect;
        Actor.$init$(this);
        ActorLogging.$init$(this);
        this.pendingSend = null;
        context().watch(connect.handler());
        this.channel = null;
        if (connect.remoteAddress().isUnresolved()) {
            Option<DnsProtocol.Resolved> resolve = Dns$.MODULE$.resolve(DnsProtocol$Resolve$.MODULE$.apply(connect.remoteAddress().getHostName()), context().system(), self());
            if (resolve instanceof Some) {
                DnsProtocol.Resolved resolved = (DnsProtocol.Resolved) ((Some) resolve).value();
                org$apache$pekko$io$UdpConnection$$reportConnectFailure(() -> {
                    $init$$$anonfun$1(resolved, connect);
                    return BoxedUnit.UNIT;
                });
            } else {
                if (!None$.MODULE$.equals(resolve)) {
                    throw new MatchError(resolve);
                }
                context().become(resolving());
            }
        } else {
            org$apache$pekko$io$UdpConnection$$reportConnectFailure(() -> {
                $init$$$anonfun$2(connect);
                return BoxedUnit.UNIT;
            });
        }
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.actor.Actor
    public ActorContext context() {
        return this.context;
    }

    @Override // org.apache.pekko.actor.Actor
    public final ActorRef self() {
        return this.self;
    }

    @Override // org.apache.pekko.actor.Actor
    public void org$apache$pekko$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    @Override // org.apache.pekko.actor.Actor
    public void org$apache$pekko$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ ActorRef sender() {
        ActorRef sender;
        sender = sender();
        return sender;
    }

    @Override // org.apache.pekko.actor.Actor
    @InternalApi
    public /* bridge */ /* synthetic */ void aroundReceive(PartialFunction partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // org.apache.pekko.actor.Actor
    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPreStart() {
        aroundPreStart();
    }

    @Override // org.apache.pekko.actor.Actor
    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPostStop() {
        aroundPostStop();
    }

    @Override // org.apache.pekko.actor.Actor
    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPreRestart(Throwable th, Option option) {
        aroundPreRestart(th, option);
    }

    @Override // org.apache.pekko.actor.Actor
    @InternalApi
    public /* bridge */ /* synthetic */ void aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ SupervisorStrategy supervisorStrategy() {
        SupervisorStrategy supervisorStrategy;
        supervisorStrategy = supervisorStrategy();
        return supervisorStrategy;
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ void preStart() throws Exception {
        preStart();
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ void preRestart(Throwable th, Option option) throws Exception {
        preRestart(th, option);
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ void postRestart(Throwable th) throws Exception {
        postRestart(th);
    }

    @Override // org.apache.pekko.actor.Actor
    public /* bridge */ /* synthetic */ void unhandled(Object obj) {
        unhandled(obj);
    }

    @Override // org.apache.pekko.actor.ActorLogging
    public LoggingAdapter org$apache$pekko$actor$ActorLogging$$_log() {
        return this.org$apache$pekko$actor$ActorLogging$$_log;
    }

    @Override // org.apache.pekko.actor.ActorLogging
    public void org$apache$pekko$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$actor$ActorLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.actor.ActorLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    public Tuple2<UdpConnected.Send, ActorRef> pendingSend() {
        return this.pendingSend;
    }

    public void pendingSend_$eq(Tuple2<UdpConnected.Send, ActorRef> tuple2) {
        this.pendingSend = tuple2;
    }

    public boolean writePending() {
        return pendingSend() != null;
    }

    public DatagramChannel channel() {
        return this.channel;
    }

    public void channel_$eq(DatagramChannel datagramChannel) {
        this.channel = datagramChannel;
    }

    public PartialFunction<Object, BoxedUnit> resolving() {
        return new UdpConnection$$anon$1(this);
    }

    public void doConnect(InetSocketAddress inetSocketAddress) {
        channel_$eq(DatagramChannel.open());
        channel().configureBlocking(false);
        DatagramSocket socket = channel().socket();
        this.org$apache$pekko$io$UdpConnection$$connect.options().foreach(socketOption -> {
            socketOption.beforeDatagramBind(socket);
        });
        this.org$apache$pekko$io$UdpConnection$$connect.localAddress().foreach(socketAddress -> {
            socket.bind(socketAddress);
        });
        channel().connect(this.org$apache$pekko$io$UdpConnection$$connect.remoteAddress());
        this.channelRegistry.register(channel(), 1, self());
        log().debug("Successfully connected to [{}]", this.org$apache$pekko$io$UdpConnection$$connect.remoteAddress());
    }

    @Override // org.apache.pekko.actor.Actor
    public PartialFunction<Object, BoxedUnit> receive() {
        return new UdpConnection$$anon$2(this);
    }

    public PartialFunction<Object, BoxedUnit> connected(ChannelRegistration channelRegistration) {
        return new UdpConnection$$anon$3(channelRegistration, this);
    }

    public void doRead(ChannelRegistration channelRegistration, ActorRef actorRef) {
        ByteBuffer acquire = this.org$apache$pekko$io$UdpConnection$$udpConn.bufferPool().acquire();
        try {
            try {
                innerRead$1(actorRef, this.org$apache$pekko$io$UdpConnection$$udpConn.settings().BatchReceiveLimit(), acquire);
            } catch (PortUnreachableException unused) {
                if (this.org$apache$pekko$io$UdpConnection$$udpConn.settings().TraceLogging()) {
                    log().debug("Ignoring PortUnreachableException in doRead");
                }
            }
        } finally {
            channelRegistration.enableInterest(1);
            this.org$apache$pekko$io$UdpConnection$$udpConn.bufferPool().release(acquire);
        }
    }

    public final void doWrite() {
        ByteBuffer acquire = this.org$apache$pekko$io$UdpConnection$$udpConn.bufferPool().acquire();
        try {
            Tuple2<UdpConnected.Send, ActorRef> pendingSend = pendingSend();
            if (pendingSend == null) {
                throw new MatchError(pendingSend);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply(pendingSend.mo4945_1(), pendingSend.mo4944_2());
            UdpConnected.Send send = (UdpConnected.Send) apply.mo4945_1();
            ActorRef actorRef = (ActorRef) apply.mo4944_2();
            acquire.clear();
            send.payload().copyToBuffer(acquire);
            acquire.flip();
            int write = channel().write(acquire);
            if (this.org$apache$pekko$io$UdpConnection$$udpConn.settings().TraceLogging()) {
                log().debug("Wrote [{}] bytes to channel", BoxesRunTime.boxToInteger(write));
            }
            if (write == 0) {
                actorRef.$bang(UdpConnected$CommandFailed$.MODULE$.apply(send), self());
            } else if (send.wantsAck()) {
                actorRef.$bang(send.ack(), self());
            }
        } finally {
            this.org$apache$pekko$io$UdpConnection$$udpConn.bufferPool().release(acquire);
            pendingSend_$eq(null);
        }
    }

    @Override // org.apache.pekko.actor.Actor
    public void postStop() {
        if (channel() == null || !channel().isOpen()) {
            return;
        }
        log().debug("Closing DatagramChannel after being stopped");
        try {
            channel().close();
        } catch (Throwable th) {
            if (th != null) {
                Option<Throwable> unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    log().debug("Error closing DatagramChannel: {}", unapply.get());
                    return;
                }
            }
            throw th;
        }
    }

    public void org$apache$pekko$io$UdpConnection$$reportConnectFailure(Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
        } catch (Throwable th) {
            if (th != null) {
                Option<Throwable> unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    log().debug("Failure while connecting UDP channel to remote address [{}] local address [{}]: {}", this.org$apache$pekko$io$UdpConnection$$connect.remoteAddress(), this.org$apache$pekko$io$UdpConnection$$connect.localAddress().getOrElse(UdpConnection::reportConnectFailure$$anonfun$1), unapply.get());
                    this.org$apache$pekko$io$UdpConnection$$commander.$bang(UdpConnected$CommandFailed$.MODULE$.apply(this.org$apache$pekko$io$UdpConnection$$connect), self());
                    context().stop(self());
                    return;
                }
            }
            throw th;
        }
    }

    private final void $init$$$anonfun$1(DnsProtocol.Resolved resolved, UdpConnected.Connect connect) {
        doConnect(new InetSocketAddress(resolved.address(), connect.remoteAddress().getPort()));
    }

    private final void $init$$$anonfun$2(UdpConnected.Connect connect) {
        doConnect(connect.remoteAddress());
    }

    private static final void applyOrElse$$anonfun$2(Throwable th) {
        throw new RuntimeException(th);
    }

    public static /* bridge */ /* synthetic */ Object org$apache$pekko$io$UdpConnection$$anon$1$$_$applyOrElse$$anonfun$adapted$2(Throwable th) {
        applyOrElse$$anonfun$2(th);
        return BoxedUnit.UNIT;
    }

    private final void innerRead$1(ActorRef actorRef, int i, ByteBuffer byteBuffer) {
        while (true) {
            byteBuffer.clear();
            byteBuffer.limit(this.org$apache$pekko$io$UdpConnection$$udpConn.settings().DirectBufferSize());
            if (channel().read(byteBuffer) <= 0) {
                return;
            }
            byteBuffer.flip();
            actorRef.$bang(UdpConnected$Received$.MODULE$.apply(ByteString$.MODULE$.apply(byteBuffer)), self());
            i--;
        }
    }

    private static final Serializable reportConnectFailure$$anonfun$1() {
        return "undefined";
    }
}
