/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.core.observation;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.core.observation.DispatchAction;
import org.apache.jackrabbit.core.observation.EventConsumer;
import org.apache.jackrabbit.core.observation.EventDispatcher;
import org.apache.jackrabbit.core.observation.EventStateCollection;
import org.apache.jackrabbit.core.observation.SynchronousEventListener;
import org.apache.jackrabbit.core.state.ChangeLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ObservationDispatcher
extends EventDispatcher
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ObservationDispatcher.class);
    private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
    private static final int MAX_QUEUED_EVENTS = Integer.parseInt(System.getProperty("jackrabbit.maxQueuedEvents", "200000"));
    private Set<EventConsumer> activeConsumers = new HashSet<EventConsumer>();
    private Set<EventConsumer> synchronousConsumers = new HashSet<EventConsumer>();
    private Set<EventConsumer> readOnlyConsumers;
    private Set<EventConsumer> synchronousReadOnlyConsumers;
    private Object consumerChange = new Object();
    private BlockingQueue<DispatchAction> eventQueue = new LinkedBlockingQueue<DispatchAction>();
    private AtomicInteger eventQueueSize = new AtomicInteger();
    private Thread notificationThread = new Thread((Runnable)this, "ObservationManager");
    private long lastError;

    public ObservationDispatcher() {
        this.notificationThread.setDaemon(true);
        this.notificationThread.start();
    }

    public void dispose() {
        this.eventQueue.add(DISPOSE_MARKER);
        try {
            this.notificationThread.join();
        }
        catch (InterruptedException e) {
            log.debug("while joining notificationThread", e);
        }
        log.info("Notification of EventListeners stopped.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Set<EventConsumer> getAsynchronousConsumers() {
        Object object = this.consumerChange;
        synchronized (object) {
            if (this.readOnlyConsumers == null) {
                this.readOnlyConsumers = Collections.unmodifiableSet(new HashSet<EventConsumer>(this.activeConsumers));
            }
            return this.readOnlyConsumers;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Set<EventConsumer> getSynchronousConsumers() {
        Object object = this.consumerChange;
        synchronized (object) {
            if (this.synchronousReadOnlyConsumers == null) {
                this.synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet<EventConsumer>(this.synchronousConsumers));
            }
            return this.synchronousReadOnlyConsumers;
        }
    }

    @Override
    public void run() {
        boolean done = false;
        do {
            try {
                DispatchAction action = this.eventQueue.take();
                boolean bl = done = action == DISPOSE_MARKER;
                if (done) continue;
                this.eventQueueSize.getAndAdd(-action.getEventStates().size());
                log.debug("got EventStateCollection");
                log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started...");
                for (EventConsumer c : action.getEventConsumers()) {
                    try {
                        c.consumeEvents(action.getEventStates());
                    }
                    catch (Throwable t) {
                        log.warn("EventConsumer " + c.getEventListener().getClass().getName() + " threw exception", t);
                    }
                }
            }
            catch (InterruptedException ex) {
                log.debug("event delivery interrupted", ex);
            }
        } while (!done);
        log.debug("event delivery finished.");
    }

    @Override
    void prepareEvents(EventStateCollection events) {
        HashSet<EventConsumer> consumers = new HashSet<EventConsumer>();
        consumers.addAll(this.getSynchronousConsumers());
        consumers.addAll(this.getAsynchronousConsumers());
        for (EventConsumer c : consumers) {
            c.prepareEvents(events);
        }
    }

    @Override
    void prepareDeleted(EventStateCollection events, ChangeLog changes) {
        HashSet<EventConsumer> consumers = new HashSet<EventConsumer>();
        consumers.addAll(this.getSynchronousConsumers());
        consumers.addAll(this.getAsynchronousConsumers());
        for (EventConsumer c : consumers) {
            c.prepareDeleted(events, changes.deletedStates());
        }
    }

    @Override
    void dispatchEvents(EventStateCollection events) {
        if (Thread.currentThread() == this.notificationThread) {
            log.warn("Save call with event notification thread detected. This may lead to a growing event queue. Enable debug log to see the stack trace with the class calling save().");
            if (log.isDebugEnabled()) {
                log.debug("Stack trace:", new Exception());
            }
        }
        Set<EventConsumer> synchronous = this.getSynchronousConsumers();
        if (log.isDebugEnabled()) {
            log.debug("notifying " + synchronous.size() + " synchronous listeners.");
        }
        for (EventConsumer c : synchronous) {
            try {
                c.consumeEvents(events);
            }
            catch (Throwable t) {
                log.error("Synchronous EventConsumer threw exception.", t);
            }
        }
        this.eventQueue.add(new DispatchAction(events, this.getAsynchronousConsumers()));
        this.eventQueueSize.addAndGet(events.size());
    }

    public void delayIfEventQueueOverloaded() {
        if (this.eventQueueSize.get() > MAX_QUEUED_EVENTS) {
            boolean logWarning = false;
            long now = System.currentTimeMillis();
            if (this.lastError == 0L || now > this.lastError + 5000L) {
                logWarning = true;
                log.warn("More than " + MAX_QUEUED_EVENTS + " events in the queue", new Exception("Stack Trace"));
                this.lastError = now;
            }
            if (Thread.currentThread() == this.notificationThread) {
                if (logWarning) {
                    log.warn("Recursive notification?");
                }
            } else {
                if (logWarning) {
                    log.warn("Waiting");
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    log.warn("Interrupted while rate-limiting writes", e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addConsumer(EventConsumer consumer) {
        Object object = this.consumerChange;
        synchronized (object) {
            if (consumer.getEventListener() instanceof SynchronousEventListener) {
                this.synchronousConsumers.remove(consumer);
                this.synchronousConsumers.add(consumer);
                this.synchronousReadOnlyConsumers = null;
            } else {
                this.activeConsumers.remove(consumer);
                this.activeConsumers.add(consumer);
                this.readOnlyConsumers = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeConsumer(EventConsumer consumer) {
        Object object = this.consumerChange;
        synchronized (object) {
            if (consumer.getEventListener() instanceof SynchronousEventListener) {
                this.synchronousConsumers.remove(consumer);
                this.synchronousReadOnlyConsumers = null;
            } else {
                this.activeConsumers.remove(consumer);
                this.readOnlyConsumers = null;
            }
        }
    }
}

