001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.TimeUnit;
012
013import org.openstreetmap.josm.tools.Logging;
014
015/**
016 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
017 * and it will set a runnable task with semaphore release, when job has finished.
018 * <p>
019 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
020 * guarantee that all threads will be busy, when there is work for them[2]. <br>
021 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
022 *     tasks do not go through the Queue <br>
023 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
024 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
025 *     for some task further in queue, but this implementation doesn't try to detect such situation
026 *
027 * @author Wiktor Niesiobędzki
028 */
029public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
030    private static final long serialVersionUID = 1L;
031
032    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
033    private final int hostLimit;
034
035    /**
036     * Creates an unbounded queue
037     * @param hostLimit how many parallel calls to host to allow
038     */
039    public HostLimitQueue(int hostLimit) {
040        super(); // create unbounded queue
041        this.hostLimit = hostLimit;
042    }
043
044    /**
045     * Creates bounded queue
046     * @param hostLimit how many parallel calls to host to allow
047     * @param queueLimit how deep the queue should be
048     */
049    public HostLimitQueue(int hostLimit, int queueLimit) {
050        super(queueLimit); // create bounded queue
051        this.hostLimit = hostLimit;
052    }
053
054    private JCSCachedTileLoaderJob<?, ?> findJob() {
055        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
056            Runnable r = it.next();
057            if (r instanceof JCSCachedTileLoaderJob) {
058                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
059                if (tryAcquireSemaphore(job)) {
060                    if (remove(job)) {
061                        return job;
062                    } else {
063                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
064                        // release the semaphore and look for another candidate
065                        releaseSemaphore(job);
066                    }
067                } else {
068                    URL url = null;
069                    try {
070                        url = job.getUrl();
071                    } catch (IOException e) {
072                        Logging.debug(e);
073                    }
074                    Logging.debug("TMS - Skipping job {0} because host limit reached", url);
075                }
076            }
077        }
078        return null;
079    }
080
081    @Override
082    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
083        Runnable job = findJob();
084        if (job != null) {
085            return job;
086        }
087        job = pollFirst(timeout, unit);
088        if (job != null) {
089            try {
090                boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
091                return gotLock ? job : null;
092            } catch (InterruptedException e) {
093                // acquire my got interrupted, first offer back what was taken
094                if (!offer(job)) {
095                    Logging.warn("Unable to offer back " + job);
096                }
097                throw e;
098            }
099        }
100        return job;
101    }
102
103    @Override
104    public Runnable take() throws InterruptedException {
105        Runnable job = findJob();
106        if (job != null) {
107            return job;
108        }
109        job = takeFirst();
110        try {
111            acquireSemaphore(job);
112        } catch (InterruptedException e) {
113            // acquire my got interrupted, first offer back what was taken
114            if (!offer(job)) {
115                Logging.warn("Unable to offer back " + job);
116            }
117            throw e;
118        }
119        return job;
120    }
121
122    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
123        String host;
124        try {
125            host = job.getUrl().getHost();
126        } catch (IOException e) {
127            // do not pass me illegal URL's
128            throw new IllegalArgumentException(e);
129        }
130        Semaphore limit = hostSemaphores.get(host);
131        if (limit == null) {
132            synchronized (hostSemaphores) {
133                limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit));
134            }
135        }
136        return limit;
137    }
138
139    private void acquireSemaphore(Runnable job) throws InterruptedException {
140        if (job instanceof JCSCachedTileLoaderJob) {
141            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
142            getSemaphore(jcsJob).acquire();
143            jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
144        }
145    }
146
147    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
148        boolean ret = true;
149        Semaphore limit = getSemaphore(job);
150        if (limit != null) {
151            ret = limit.tryAcquire();
152            if (ret) {
153                job.setFinishedTask(() -> releaseSemaphore(job));
154            }
155        }
156        return ret;
157    }
158
159    private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
160        boolean ret = true;
161        if (job instanceof JCSCachedTileLoaderJob) {
162            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
163            Semaphore limit = getSemaphore(jcsJob);
164            if (limit != null) {
165                ret = limit.tryAcquire(timeout, unit);
166                if (ret) {
167                    jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
168                }
169            }
170        }
171        return ret;
172    }
173
174    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
175        Semaphore limit = getSemaphore(job);
176        if (limit != null) {
177            limit.release();
178            if (limit.availablePermits() > hostLimit) {
179                Logging.warn("More permits than it should be");
180            }
181        }
182    }
183}