package weborb.messaging.v3;

import com.google.android.exoplayer.hls.HlsChunkSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import weborb.ORBConstants;
import weborb.util.ThreadContext;
import weborb.util.log.ILoggingConstants;
import weborb.util.log.Log;
import weborb.v3types.AckMessage;
import weborb.v3types.CommandMessage;
import weborb.v3types.V3Message;
import weborb.v3types.core.IDestination;
import weborb.writer.MessageWriter;
import weborb.writer.amf.AmfV3Formatter;

/* loaded from: classes.dex */
public final class StreamingSubscriber extends Subscriber implements IDataPush {
    private static final byte[] CRLF_BYTES = {13, 10};
    private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
    public static final String HEADER_NAME_CACHE_CONTROL = "Cache-Control";
    public static final String HEADER_NAME_EXPIRES = "Expires";
    public static final String HEADER_NAME_PRAGMA = "Pragma";
    private static final byte NULL_BYTE = 0;
    private static final byte ZERO_BYTE = 48;
    private final CommandMessage commandMessage;
    private final LinkedList<V3Message> queue;
    private volatile boolean running;

    public StreamingSubscriber(String str, String str2, IDestination iDestination, CommandMessage commandMessage) throws Exception {
        super(str, str2, iDestination);
        this.queue = new LinkedList<>();
        this.commandMessage = commandMessage;
    }

    private void streamChunk(byte[] bArr, ServletOutputStream servletOutputStream, HttpServletResponse httpServletResponse) throws IOException {
        if (bArr == null || bArr.length <= 0) {
            servletOutputStream.write(48);
            servletOutputStream.write(CRLF_BYTES);
            httpServletResponse.flushBuffer();
        } else {
            servletOutputStream.write(Integer.toHexString(bArr.length).getBytes("ASCII"));
            servletOutputStream.write(CRLF_BYTES);
            servletOutputStream.write(bArr);
            servletOutputStream.write(CRLF_BYTES);
            httpServletResponse.flushBuffer();
        }
    }

    private void streamMessage(Object obj, ServletOutputStream servletOutputStream, HttpServletResponse httpServletResponse) throws IOException {
        AmfV3Formatter amfV3Formatter = new AmfV3Formatter();
        MessageWriter.writeObject(obj, amfV3Formatter);
        streamChunk(amfV3Formatter.getBytes(), servletOutputStream, httpServletResponse);
    }

    @Override // weborb.messaging.v3.IDataPush
    public void deliverMessage(Object obj) {
        Log.log(ILoggingConstants.DEBUG, "Received message, adding it to queue");
        ArrayList arrayList = new ArrayList();
        arrayList.add((V3Message) obj);
        ArrayList<V3Message> filterMessages = filterMessages(arrayList);
        if (filterMessages.size() > 0) {
            V3Message v3Message = filterMessages.get(0);
            v3Message.headers.clear();
            synchronized (this.queue) {
                this.queue.add(v3Message);
                this.queue.notifyAll();
            }
        }
    }

    public void start() throws IOException {
        this.running = true;
        this.clientIds.clear();
        HttpServletResponse httpResponse = ThreadContext.getHttpResponse();
        ThreadContext.getHttpRequest();
        httpResponse.reset();
        httpResponse.setContentType(ORBConstants.AMF_CONTENTTYPE);
        httpResponse.setHeader("Transfer-Encoding", "chunked");
        httpResponse.flushBuffer();
        ServletOutputStream outputStream = httpResponse.getOutputStream();
        AckMessage ackMessage = new AckMessage();
        ackMessage.correlationId = "open";
        streamMessage(ackMessage, outputStream, httpResponse);
        while (this.running) {
            synchronized (this.queue) {
                while (this.queue.isEmpty()) {
                    try {
                        this.queue.wait(HlsChunkSource.DEFAULT_MIN_BUFFER_TO_SWITCH_UP_MS);
                    } catch (InterruptedException e) {
                        Log.log(ILoggingConstants.EXCEPTION, (Throwable) e);
                        this.running = false;
                    }
                    if (this.queue.isEmpty()) {
                        outputStream.write(0);
                        outputStream.flush();
                    }
                }
            }
            while (!this.queue.isEmpty()) {
                streamMessage((V3Message) this.queue.poll(), outputStream, httpResponse);
            }
        }
    }

    public void stop() {
        this.running = false;
        synchronized (this.queue) {
            this.queue.notifyAll();
        }
    }
}
