Содержание

понедельник, 26 сентября 2011 г.

Приемы в многопоточном программировании

В своей практике мне достаточно приходилось сталкиваться с различными многопоточными задачами, большинство из которых удавалось решить довольно успешно. Под успшностью я понимаю решение, которое работает правильно и быстро, а реализация проста и понятна другим программистам, помимо автора. В данном топике я перечислю несколько ситуаций и опишу как я поступал в данной ситуации.

Асинхронная обработка в батчах

Данный паттерн я применял в трех проектах, где требовалось сохранять большое количество сообщений в базу данных асинхронно. Основная задача состоит тут в том, чтобы эффективно вытаскивать сразу пачку значений из очереди в нескольких потоках параллельно. Здесь меня всегда спасал метод BlockingQueue.drainTo(Collection)
    private final BlockingQueue<E> queue;

    public void start(){

        for (int i = 0; i < NUMBER_OF_BATCH_PROCESSOR; i++){
            new Thread(){
                private final Collection<E> batch = new ArrayList<E>(MAX_BATCH_SIZE);
                @Override
                public void run() {
                    while(true){
                        try {
                            queue.drainTo(batch, MAX_BATCH_SIZE);
                            perform(batch);
                            batch.clear();
                        } catch (Throwable e) {
                            //rethrow OOM else log error and continue
                        }
                    }
                }
            }.start();
        }
    }

Эффективный и масштабируемый Кэш

Если какая-то операция выполняется значительное время, то, возможно, имеет смысл закэшировать её результат. Если же есть большая вероятность, что за результатом одной и той же операции приходит зразу несколько потоков практически одновременно, и тем более если эта операция внутри может блокироваться, то надо сделать так, чтобы операция выполнилась только один раз, и все потоки сразу получили этот результат. С виду задача очень простая, но чтобы сделать так, чтобы разные операции практически не блокировали друг друга есть один трюк. Первый раз я его увидел в моей любимой книжке Brian Goetz "Java Concurrency In Practice". А недавно встретил и в проекте на работе, когда делал ревью кода, оптимизирующего загрузку сервера после рестарта. Там по запросу пользователя приходилось вычислять получать некоторые данные, но при рестарте сразу много пользователей начинали запрашивать эти данные. Так с помощью приведенного ниже кэша удалось пофиксить данный bottleneck.

public interface Computable<A, V>

    V compute(A arg) throws InterruptedException;

}

public class BrianGoetzCache<A, V> implements Computable<A, V>

    private final ConcurrentHashMap<A, V> cache = new ConcurrentHashMap<A, V>
    private final Computable<A, V>

    public BrianGoetzCache(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException{
        while (true){
            Future< V> f = cache.get(arg);
            if (f == null){
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask< V>(eval);
                f = cache.putIfAbsent(arg, f);
                if (f == null){ f = ft; ft.run();}
            }
            try{
                return f.get();
            }  catch (CancellationException e){
                cache.remove(arg, f);
            }  catch (ExecutionException e){
                throw new RuntimeException(e.getCause());
            }
        }
    }
}

"SELECT FOR UPDATE" в java

Иногда требуется получить эксклюзивный доступ к какой-нибудь сущности, чтобы проводя над ней операцию, быть уверенным, что в этот момент ни одна другая операция не проводиться над этой сущностью. Сложность возникает в том случае, если у вас нет в распоряжении объекта, над которым проводиться операция, а есть только его идентификатор. Например, я видел такую задачу при обработке сделки на бирже, или при изменении нескольких полей объекта, находящегося в кэше. Данную проблему нередко решают, блокировкой строчки в базе данных. Например, с помощью инструкции "SELECT FOR UPDATE". Мне кажется, это не самый удачный способ. Если вы можете себе позволить выполнение операции в текущем потоке (например при обработке http запроса), то тут особо ничего придумывать не стоит, ведь, можно просто хранить лок в мапе. Благо, сейчас в ней присутствует метод putIfAbsence, который позволяет написать данный код весьма компактно и эффективно.
    private final ConcurrentHashMap<Object, Lock> map = new ConcurrentHashMap<Object, Lock>();

    public void operation(Object id){
        Lock lock = map.putIfAbsent(id, new ReentrantLock(true));
        try {
            lock.lock();
            perform(id);
        } finally {
            lock.unlock();
        }
    }

    public void clear(Object id){
        map.remove(id);
    }
Прелесть данного подхода в том, что разные операции практически не мешают друг-другу. Но приходиться расплачиваться постоянным созданием новых локов, причем эффективно и просто удалить из мапы их не получиться, придется городить synchronized блоки. В моем случае, в бизнеслогике была явно последняя операция из которой я мог легко удалять данный лок методом clear.
Так же я видел еще более эффективную реализацию решения похожей задачи, где к тому же нет проблемы с генерированием и очисткой локов, но при не верном выборе размера массива или плохой реализации хешкода, разные операцию могут начать сильно мешать друг-другу, что может негативно сказать на производительности всей системы.
    private final Lock[] locks = new Lock[SIZE];
    {
        for (int i = 0 ; i < locks.length; i++){
            locks[i] = new ReentrantLock();
        }
    }

    private void operation(final Object id){
        Lock lock = locks[id.hashCode()%SIZE];
        try {
            lock.lock();
            perform(id);
        } catch (Exception e) {
            lock.unlock();
        }
    }

Распараллеливание потока на операции, часть которых требует блокировок

Допустим вы слушаете JMS очередь по которой приходит очень много сообщений, большинство из которых вы можете выполнять параллельно, однако сообщение относящиеся к одной сущности надо все же выполнять последовательно. В данном случае создание потока для каждого сообщения, которое вы будете класть в мапу, может разорить ресурсы вашего приложение. Если же иметь пул потоков и запоминать в мапе, для какого идентификатора какой поток используется, то в этой реализации получается немного хитрый код, который должен знать есть ли уже что-то на выполнение в очереди для идентификатора, чтобы не удалить его из мапы раньше времени. Так же тут нужна будет общая точка синхронизации для всех потоков. Поэтому для такого рода задач лучше создать массив, заполнить его потоками, и для сообщения выбирать поток по хешу идентификатора сущности, для которой приходит сообщение. Конечно этот подход далеко не идеален, ведь, балансировка нагрузки основана на хеше идентификатора, что не даст равномерного использования потоков, так что в некоторых из них могут скопиться очереди, тогда как другие будут простаивать без дела. Так же если один из потоков сломается или заблокируется, то это явно окажет влияние на будущие задачи, попадающие по хешу идентификатора в тот же поток. Но так как описанное решение очень просто и эффективно, то можно и пренебречь описанными минусами.
    private final Executor[] executors = new Executor[SIZE];
    {
        for (int i = 0 ; i < executors.length; i++){
            executors[i] = Executors.newSingleThreadExecutor();
        }
    }

    private void operation(final Object id){
        executors[id.hashCode()%SIZE].execute(new Runnable() {
            @Override
            public void run() {
                perform(id);
            }
        });

    }
В данной реализации необходимо мониторить очередь экзекютора и принимать меры, очередь начнет сильно увеличиваться.

Последовательноe выполнение операций в разных поток

Уже много лет мы используем громоздкие конструкции из wait в цикле и блок синхронизации совместно с notifyAll(). Нет, нет, никакой проблемы с ними конечно же нет, за исключением разве что с местом, занимаемым данным кодом на экране. Поэтому для такого рода задачи я предпочитаю использовать CountDownLatch, ведь кода для его написания требуется значительно меньше.
    //private final Object monitor = new Object();    
    //private volatile boolean  flag = false;
    private final CountDownLatch latch = new CountDownLatch(1);

    public void waitMethod() throws InterruptedException {
        //synchronized (monitor) {
        //    while (!flag) {
        //        monitor.wait();
        //    }
        //}
        latch.await();
    }

    public void notifyMethod(){
        //flag=true;
        //monitor.notify();
        latch.countDown();
    }