Сергей Деревяго

Хеш-таблица с транзакциями



1. Введение

Любой мужчина в возрасте внезапно для себя осознает, что ему уже не хватает синхронизированной хеш-таблицы...

Все конечно неплохо. Удобно и быстро. Но все чаще и чаще на ровном месте, вспыхивают претензии, и приятное некогда действие превращается в нудную тягомотину.

Прогресса нет! Потому что залочен mutex, и все ждут завершения.

Конечно, немного поизвращавшись, мы так-то разрулим очередную истерику. Но за ней неизбежно придет другая... И раз за разом ком прошлых глупостей будет только расти, пока не придет Катастрофа!

Знакомая ситуация?

Да, угадали: проблему решают транзакции! Их ACID прелести, не отходя от кассы, приносят свежесть и простоту.

Но увы, переход к Базе Данных с Транзакциями:

А бывают ли быстрые хеш-таблицы с транзакциями? Знамо дело!

У меня есть Решение, и оно называется ders::mem_db.

С уважением, Сергей Деревяго.


2. Краеугольное

2.1. Транзакции

Транзакции -- это, гмм... Наверное, все-таки сложно!

Сферические транзакции в вакууме, их теоретические ACID проперти и практические особенности реализации... Тут есть о чем поговорить, но мы не станем.

Потому что хорошие новости в том, что mem_db предельно проста и удобна!

А для теоретического старта вполне достаточно указать, что в ней реализованы snapshot isolation и optimistic locking.

Что-что говорите, букварь прикурить? Не вопрос:

Да, вижу, что легче! Но не так оптимистично, как могло б.

А гораздо, ГОРАЗДО лучше! Но не будем пока забегать...

Т.к. нужно еще кое-что нам для старта практического. А именно: загляните еще раз в мою предыдущую статью и обратите внимание на Библиотеку derslib, т.к. она нам сейчас понадобится.

Итак, допустим, что нам нужно сохранить настройки нового пользователя:

bool insertUser(mem_pool* mp, mem_db* mdb, strsz userId, strsz userJson)
{
    un_ptr<mem_db::transaction> tr=mdb->start_tr(mp);  // начинаем транзакцию

    tr->set(userId, userJson);  // вставляем нового пользователя
    tr->depend_del(userId);     // он не должен был существовать

    un_ptr<mem_db::error> err=tr->commit();            // коммитим транзакцию
    if (err) {
        // обрабатываем ошибку
        return false;
    }

    return true;
}

Все просто? Everything is as simple as a rake.

Единственный мутный момент -- вызов depend_del(userId). Он сообщает транзакции, что у нас есть зависимость: ключ userId не должен существовать в момент коммита!

А если все-таки он существует, то commit() завершится ошибкой, и придется начать все заново (т.е. другую транзакцию).

ОК, ясно. А если не вызвать depend_del(userId)?

В этом случае наша функция превращается в изменение настроек пользователя: были, не были -- всем плевать!

void updateUser(mem_pool* mp, mem_db* mdb, strsz userId, strsz userJson)
{
    auto tr=mdb->start_tr(mp);  // начинаем транзакцию
    tr->set(userId, userJson);  // изменяем настройки
    tr->commit();               // коммитим транзакцию
}

И нам не нужно даже проверять ошибку: нет зависимостей -- нет ошибок. Точка.

Ну да, неплохо. Вот только целая Транзакция ради единственного set(userId, userJson) -- не многовато ли?

ОК, вот более сложный пример: сохранение файла с его метаданными.

un_ptr<blob> readMeta(mem_pool* mp, strsz fileName);  
un_text readFile(mem_pool* mp, strsz fileName);       

bool insertFile(mem_pool* mp, mem_db* mdb, strsz fileName)
{
    auto keyMeta=new_blob(mp, strsz("meta!"), fileName);  // ключ "meta!"+fileName
    auto meta=readMeta(mp, fileName);                     // читаем метаданные

    auto keyCont=new_blob(mp, strsz("cont!"), fileName);  // ключ "cont!"+fileName
    auto cont=readFile(mp, fileName);                     // читаем содержимое

    auto tr=mdb->start_tr(mp);

    tr->depend_del(keyMeta->ptr());
    tr->set(keyMeta->ptr(), meta->ptr());

    tr->depend_del(keyCont->ptr());
    tr->set(keyCont->ptr(), cont->str());

    auto err=tr->commit();
    if (err) {
        // обрабатываем ошибку
        return false;
    }

    return true;
}

Теперь точно все честно: или есть и файл, и метадата, или нет ни одного! Атомарность транзакции в действии.

А что насчет optimistic locking?

Он тоже прекрасно работает: вероятность того, что кто-то вставит тот же самый файл одновременно с нами пренебрежительно мала! А если он вдруг умудрился, то depend_del() нам не даст закоммитить.

Конечно, хорошо быть оптимистом, но давайте реально смотреть на вещи! Например, заодно подсчитаем и общий размер:

long getSize(const blob* meta);

bool insertFileTotal(mem_pool* mp, mem_db* mdb, strsz fileName)
{
    auto keyMeta=new_blob(mp, strsz("meta!"), fileName);
    auto meta=readMeta(mp, fileName);

    long fileSize=getSize(get(meta));

    auto keyCont=new_blob(mp, strsz("cont!"), fileName);
    auto cont=readFile(mp, fileName);

    auto keyTotal=strsz("total!");  // ключ "total!"
    ptrsz valTotal;

    auto tr=mdb->start_tr(mp);

    tr->depend_del(keyMeta->ptr());
    tr->set(keyMeta->ptr(), meta->ptr());

    tr->depend_del(keyCont->ptr());
    tr->set(keyCont->ptr(), cont->str());

    if (!tr->get(keyTotal, &valTotal)) {
        // не найден "total!"
        return false;
    }
    assert(valTotal.size()==sizeof(long));

    long prev;
    memcpy(&prev, valTotal.ptr(), sizeof(long));   // копируем байты в long
    prev+=fileSize;

    tr->depend_val(keyTotal, valTotal);             // он не должен измениться!
    tr->set(keyTotal, ptrsz(&prev, sizeof(long)));  // обновляем

    auto err=tr->commit();
    if (err) {
        // обрабатываем ошибку
        return false;
    }

    return true;
}

Опа! Беда пришла в наш кишлак...

Нет, технически все корректно, но каждый вызов insertFileTotal() пытается изменить общий ключ "total!", а это значит, что практически ВСЕ транзакции завершатся ошибкой!! Optimistic locking уже не вывозит, т.к. мы изменяем те же самые данные.

Что делать?!

Вернуться от транзакций к хеш-таблице? Это медленно и печально. Сериализовать вызовы insertFileTotal() ручками? Годное извращение!

А можно ли как бы ли... Поначалу транзакция, а потом хеш-таблица? Все лучшее сразу из Двух Миров??

Ну вы даете! Конечно МОЖНО:

void updateTotal(mem_db::writer* wr, void* arg)
{
    auto psize=(long*)arg;          // указатель на размер

    auto keyTotal=strsz("total!");  // ключ "total!"
    ptrsz valTotal;

    long prev=0;
    if (wr->get(keyTotal, &valTotal)) {
        assert(valTotal.size()==sizeof(long));
        memcpy(&prev, valTotal.ptr(), sizeof(long));  // копируем байты в long
    }

    prev+=*psize;
    wr->set(keyTotal, ptrsz(&prev, sizeof(long)));    // и обновляем
}

bool insertFileTotal2(mem_pool* mp, mem_db* mdb, strsz fileName)
{
    auto keyMeta=new_blob(mp, strsz("meta!"), fileName);
    auto meta=readMeta(mp, fileName);

    long fileSize=getSize(get(meta));

    auto keyCont=new_blob(mp, strsz("cont!"), fileName);
    auto cont=readFile(mp, fileName);

    auto tr=mdb->start_tr(mp);

    tr->depend_del(keyMeta->ptr());
    tr->set(keyMeta->ptr(), meta->ptr());

    tr->depend_del(keyCont->ptr());
    tr->set(keyCont->ptr(), cont->str());

    auto err=tr->commit(updateTotal, &fileSize);  // call_back и аргумент
    if (err) {
        // обрабатываем ошибку
        return false;
    }

    return true;
}

Смех смехом, но commit() принимает параметр void call_back(writer*, void*). Т.е. вы можете передать в commit() функцию, которая будет вызвана сразу же после мержа всех изменений транзакции. Эта функция видит реальное состояние mem_db прямо в момент коммита и может прочитать/изменить что угодно!

Например, апдейтнуть значение ключа "total!" так, что никто не вмешается. Без шума и пыли, по вновь утвержденному плану!

Так, погодите... А можно ли сразу, ну, типа, колбэк без транзакции? Пришел, увидел, изменил??

Ну вы даете! А кто запретит?

Да, можно сразу же вызвать start_tr(mp) и commit(call_back, arg), но для этого есть специальная функция! И мы ее скоро покажем.


2.2. Что такое хорошо

Хорошо бы сразу понять, что mem_db оперирует сырыми байтами, а не объектами. Это уровень memcmp()/memcpy(), т.е. фрагменты памяти (не объекты!) быстро сравниваются и копируются по любому удобному адресу.

Никто не парится по поводу конструкторов/деструкторов. Как будто мы пишем в файл. Да, годная аналогия!

А вместо указателя мелькает ptrsz:

derslib\inc\ders\strsz.hpp
class ptrsz {
 public:
    ptrsz() : ptr_(0), sz_(0) {}
    ptrsz(const void* ptr, int sz) : ptr_(ptr), sz_(sz) { invariant(); }
    ptrsz(strsz s) : ptr_(s.str()), sz_(s.size()) { invariant(); }

    const void* ptr() const { return ptr_; }
    int size() const { return sz_; }

 private:
    const void* ptr_;
    int sz_;

    void invariant() { assert((ptr_ && sz_>=0) || (!ptr_ && !sz_)); }
};

А вот и основные интерфейсы:

derslib\inc\ders\mem_db.hpp
struct reader {
    virtual bool get(ptrsz key, ptrsz* pval) const=0;
    virtual void all(std::vector<ptrsz>* keys, std::vector<ptrsz>* vals) const=0;
};

struct writer : reader {
    virtual void set(ptrsz key, ptrsz val)=0;
    virtual void del(ptrsz key)=0;
};

struct transaction : writer {
    virtual void depend_val(ptrsz key, ptrsz val)=0;
    virtual void depend_del(ptrsz key)=0;
};

Тривиальные функции get(), set() и del(). Да еще depend_val() с depend_del() для зависимостей.

Все достаточно очевидно, а perf6\main.cpp с test_mem_db\main.cpp дают и примеры использования. В общем, смотрите исходный код при малейших сомнениях. Смотрите два раза при маловерии.


2.3. И что такое плохо

Как вы уже видели, mem_db щеголяет простым интерфейсом. Но с дуру можно сломать и его!

И прямо сейчас мы покажем, как НЕ надо делать. Да объясним почему.

  1. Не забывайте про инвалидацию указателей!

    Точно так же, как вызов vector::push_back() потенциально инвалидирует все итераторы, вызов set() инвалидирует полученные с помощью get() и all() указатели:

        ptrsz val;
        if (!wr->get(key, &val)) { /* ... */ }
    
        wr->set(key2, val2);  // вызов set() потенциально инвалидирует val.ptr()
        useVal(val);          // не используйте val после set()!
    а делайте копию:
        ptrsz val;
        if (!wr->get(key, &val)) { /* ... */ }
        auto myVal=new_blob(mp, val);  // копируем данные
    
        wr->set(key2, val2);
        useVal(myVal->ptr());          // используем копию
  2. Не подразумевайте alignment ключей и значений!
        long prev=*(long*)valTotal.ptr();             // Bus error!
    
        long prev;
        memcpy(&prev, valTotal.ptr(), sizeof(long));  // правильно
    А кто не слушается -- приедет чужой автобус!
  3. Не изменяйте значения прямо по месту!
        long prev=0;
    
        tr->get(keyTotal, &valTotal);
        memcpy((void*)valTotal.ptr(), &prev, sizeof(long));  // race condition!
    
        tr->set(keyTotal, ptrsz(&prev, sizeof(long)));       // правильно
    Ведь кто-то их читает вместе с вами!

    Когда чужой мои читает базы, заглядывая мне через плечо...


3. Оптимизации

Хозяйке на заметку. Шуруп, забитый молотком, держится крепче, чем гвоздь, закрученный отверткой.

3.1. Только для чтения

Как известно, в поликлинике всегда стоит две очереди: и по записи, и кому только спросить...

Что ж, используйте start_rd(), если вам лишь спросить:

un_ptr<blob> readUser(mem_pool* mp, mem_db* mdb, ptrsz userId)
{
    un_ptr<mem_db::reader> rd=mdb->start_rd(mp);  // нам только спросить

    ptrsz userJson;
    if (!rd->get(userId, &userJson))  // читаем
        return un_ptr<blob>(mp);      // не нашли

    return new_blob(mp, userJson);    // возвращаем сохраненную копию
}

Как вы прекрасно понимаете, мы не можем просто вернуть userJson, т.к. его указатель инвалидируется вместе с объектом mem_db::reader.

Не понимаете?

Тогда еще раз. В mem_db реализована snapshot isolation, т.е. в момент создания объекта mem_db::reader (или mem_db::transaction) вся база для нас "замирает". Но!

Для всех остальных жизнь продолжается, и ключ userId уже мог быть несколько раз перезаписан и/или удален. Тем самым, сразу же после деструкции mem_db::reader, найденный нами userJson может указывать неизвестно куда!

Так что мы сразу копируем найденное значение (т.е. последовательность байт, на которую указывает userJson) с помощью класса ders::blob и можем больше не беспокоиться о времени жизни транзакции.


3.2. Прямая запись

А теперь про Удивительную Фишку!

Т.е. про возможность прямого доступа к mem_db, аналогичную синхронизированной хеш-таблице.

Да-да, я сейчас расскажу про колбэк без транзакции -- вызов функции write():

struct UpdateUserData {
    strsz userId;
    strsz userJson;
};

void updateUserCallBack(mem_db::writer* wr, void* arg)
{
    auto ud=(UpdateUserData*)arg;        // наш указатель на данные
    wr->set(ud->userId, ud->userJson);   // обновляем
}

void updateUser2(mem_pool* mp, mem_db* mdb, UpdateUserData* ud)
{
    mdb->write(updateUserCallBack, ud);  // прямая запись
}

Все очень просто: единственный вызов write(updateUserCallBack, ud) вместо пары start_tr(mp) и commit(updateUserCallBack, ud). К тому же это быстрее!

И даже более.

Если вы перевозите легаси с хеш-таблиц на mem_db, то практически ВСЕ операции должны превратиться в вызовы write()! А транзакции оставьте лишь для редкостей, что тянутся долгими вечемиллисекундами.

И еще одна вещь... Но это уже Фантастика!

Вы когда-нибудь ездили ночью по городу? Те привычные перекрестки, неожиданно ставшие нерегулируемыми??

И знаете что, нерегулируемые перекрестки -- это наш optimistic locking! Потому что нет смысла стоять на красный, когда нет никого больше рядом.

А вот днем... А вот днем то же самое место забито машинами со всех направлений и без светофора уже никак!

Одно место в двух разных режимах. Сечете фишку??

Колбэки mem_db позволяют оперативно разруливать приключения одного и того же места!

Обратите внимание: у нас есть интерфейс mem_db::writer, так что код не придется дублировать!

4. Алгоритмы

Про любовь к алгоритмам. Со слайдами.

Что же скрыто от взоров общественности? Даже не удивлюсь, если mem_db использует внутри себя какую-то самописную хеш-таблицу. Не удивляйтесь и вы, если окажется, что это ders::blob_map.

Но никто же не будет впустую писать хеш-таблицу?

Фишка blob_map в копировании! Это просто memcpy()...

Падает челюсть!

У всех, кто когда-то писал сам деревья и списки.

НЕ МОЖЕТ БЫТЬ!!!

Может, если правильно использовать ders::off_pool -- распределитель памяти, возвращающий смещения вместо указателей:

derslib\inc\ders\off_pool.hpp
enum offset : unsigned { OFF_NULL };

class off_pool {
 public:
    enum how { decrease, exact, increase };

    off_pool(unsigned rsrv);
    off_pool(const off_pool* op, how h);
    ~off_pool();

    offset alloc(unsigned sz);
    void free(offset off, unsigned sz);
    void free_all();

    // ...
};

Ну, грубо говоря, off_pool -- это такой хитрый кусок памяти, четыре гигабайта максимум. В 32 бита уже больше не лезет, я проверял. И если живущие в нем структуры используют (относительные) смещения вместо (абсолютных) указателей, то копирование куска памяти никак не нарушит их связность.

Слайды давай!

Просто представьте, что мы имеем дело с файлом на диске. А в нем записаны разные там структуры, ссылающиеся друг на друга с помощью 32-битных смещений. Тогда файл можно смело копировать, и все будет работать!

Слайды! Слайды!

И насколько непросто иметь дело с blob_map? Ну, писать его было непросто! А вот использовать -- сплошное удовольствие:

derslib\inc\ders\blob_map.hpp
struct blob_map : deletable {
    virtual un_ptr<blob_map> copy(mem_pool* mp, bool shrink) const=0;

    virtual bool insert(ptrsz key, ptrsz val, int tag=0)=0;
    virtual bool upsert(ptrsz key, ptrsz val, int tag=0)=0;
    virtual bool erase(ptrsz key)=0;

    virtual unsigned find(ptrsz key) const=0;
    virtual ptrsz key(unsigned pos) const=0;
    virtual ptrsz val(unsigned pos) const=0;
    virtual int tag(unsigned pos) const=0;

    // ...
};

Почти стандартные операции, плюс наш любимый ptrsz.

А вот теперь будут слайды!

В общем и целом, устройство mem_db можно себе представить как совокупность blob_map-ов, ссылающихся друг на друга:

Когда в самом начале Client создает transaction 1, картина выглядит следующим образом:

Мы видим что:

Но не будем забывать, что клиентов первого поколения Clients 1 может быть много, и все они одновременно читают из base 1:

И теперь, если Client коммитит свою transaction 1, мы просто переносим over 1 в mem_db:

Наличие over поверх base в mem_db -- это нормальная ситуация. В данный момент мы не можем смержить over в base, т.к. Clients 1 продолжают ее читать.

А когда появятся клиенты второго поколения Clients 2, они уже будут читать из over 1:

И вторая транзакция сделает то же самое:

ОК, а что делать если Client закоммитит transaction 2? А это уже интересно!

  1. Мержим over 1 в over 2 -- получается over 3.
  2. Выбрасываем over 1 из mem_db, т.к. его читают Clients 2 и изменить его невозможно.
  3. И ставим свой новый over 3 на его место:

Да, интересно. А клиенты третьего поколения?

Думаю, вы уже догадались:

Хорошо. Но у нас постоянно крутятся разные Clients и не дают изменить base. А если все время подмерживать в over, то размер оверлея неизбежно превысит размер самой базы! Что делать?

Ай, то же самое: просто смержить все в новую базу, а старую выбросить:

Неплохо! Но есть и вопросы:


5. Производительность

Не хвались идучи на рать,
Хвались идучи с рати!

Как вы уже поняли, контейнеры C++ снова уйдут с рати. Но хвалиться они не будут.

Конечно, мои преданные читатели уже много раз видели существенный прирост производительности и ожидают нечто подобное и сейчас.

Но!

Вы правда уверены, что неизбежно выполняющая множество дополнительных шагов mem_db способна потягаться с тривиальной хеш-таблицей?

Увы, конечно не способна.

Т.к. для чтения/записи пары ключей вполне достаточно обычной хеш-таблицы с пресловутым mutex-ом. И далеко не факт, что read-write lock улучшит ситуацию!

Вообще говоря, транзакции начинают сиять, когда время выполнения составляет несколько миллисекунд (и более). Например:

В подобных случаях таблица с mutex-ом надолго заблокирует конкурентные изменения, в то время как транзакции продолжат свой прогресс!

Именно поэтому в тест производительности были введены параметры-задержки, позволяющие экспериментировать с длительностью операции. Смелее!


5.1. perf5: производительность blob_map

Как давно уже всем известно, ders::blob_map была специально создана для реализации mem_db, и скорость копирования -- ее главная фишка!

Тем не менее, интересно взглянуть на обычные вставки и удаления, т.к. их производительность не должна быть существенно хуже стандартных решений. Например unordered_map<string, string>:

perf5/main.cpp
void start_std(latch* l1, latch* l2)
{
    l1->arrive_and_wait();
    for (int i=0; i<N; i++) {
        unordered_map<string, string> m1;
        for (int j=0; j<M; j++) {
            m1.emplace(ks[j], vs[j]);
        }

        unordered_map<string, string> m2(m1);
        for (int j=0; j<M; j++) {
            m2.erase(ks[j]);
        }
    }
    l2->arrive_and_wait();
}

void start_ders(latch* l1, latch* l2)
{
    mem_pool mpo, *mp=&mpo;

    l1->arrive_and_wait();
    for (int i=0; i<N; i++) {
        auto bm1=new_blob_map(mp, 10, false, mem_hash, mem_equal);
        for (int j=0; j<M; j++) {
            bm1->insert({ks[j].data(), int(ks[j].size())}, {vs[j].data(), int(vs[j].size())});
        }

        auto bm2=bm1->copy(mp, false);
        for (int j=0; j<M; j++) {
            bm2->erase({ks[j].data(), int(ks[j].size())});
        }
    }
    l2->arrive_and_wait();
}

Красота!

Ubuntu 16, g++ 5.4.0
num_threads 1 2 3 4 8 16 32 64
std/ders 5.5 5.8 5.9 7.1 7.0 6.9 7.3 14.1

Даже для одного-единственного потока blob_map в пять раз быстрее unordered_map<string, string>! А на 64 потоках аж в целых четырнадцать раз!!

В общем, смело душите unordered_map, пока маленькие.


5.2. perf6: производительность mem_db

А вот и Главное Блюдо сезона: ders::mem_db!

Гмм, как я вижу, unordered_map еще с вами... Ну, ОК. Продолжим избиение мластарцев.

А для этого нам понадобится что-то похожее на Базу Данных, плюс операции, нуждающиеся в транзакциях.

Так. Что там показывают уже взрослым детям? Правильно! Детям сразу показывают перевод денег со счета на счет и объясняют про атомарность.

А чем мы хуже? Давайте перекидывать деньжата с помощью простейшего класса ClientRec:

perf6\mics.hpp
class ClientRec : public deletable {
 public:
    int getNumCred() const;
    void setNumCred(int v);

    int getNumDeb() const;
    void setNumDeb(int v);

    int getAmt() const;
    void setAmt(int v);

    // ...
};

inline un_text clientKey(mem_pool* mp, int n)
{
    return (tx_buf(mp)+"client"+n).release();
}

Где:

Как вы уже догадались, в конечном итоге количество всех списаний у всех клиентов должно соответствовать количеству всех зачислений. А общая сумма всех денег не измениться.

Но мы доверяем своим клиентам! Потому что все эти числа тщательно проверяются: и до старта и после торговли. Хорошая, кстати, практика. Очень рекомендую!

Ну, вроде все ясно. Пора торговать.

Хотя надо бы сразу отметить, что выглядят функции угрожающе! Но это на первый взгляд.

Для понимания сути процесса, сразу ищите константу WRITE_PROB, задающую вероятность записи. Дело в том, что в Диком Мире баз данных операции чтения заметно преобладают над записью, так что нет смысла тестировать голую скорость записи.

Ну и, тем более, не идут они друг за другом! Поэтому perf6 принимает параметры командной строки sleep_in и sleep_out -- время задержки внутри и между транзакциями в миллисекундах.

И вот так они выглядят:

perf6/main.cpp
void std_check()
{
    mem_pool mpo, *mp=&mpo;

    int sumCred=0, sumDeb=0, sumAmt=0;
    for (auto it=stdDB.begin(), end=stdDB.end(); it!=end; ++it) {
        auto cr=newClientRec(mp, to_ptrsz(&it->second));

        int cred=cr->getNumCred(), deb=cr->getNumDeb(), amt=cr->getAmt();
        perm_assert(cred>=0);
        perm_assert(deb>=0);
        perm_assert(amt>=0 && amt<=MAX_AMT);

        sumCred+=cred;
        sumDeb+=deb;
        sumAmt+=amt;
    }

    perm_assert(sumCred==sumDeb);
    perm_assert(sumAmt==totalAmt);
}

void std_run(latch* l1, latch* l2)
{
    mem_pool mpo, *mp=&mpo;

    l1->arrive_and_wait();
    for (int i=1; i<=M; i++) {
        for (bool done=false; !done; ) {
            int n1=randMnMx(1, N), n2=randMnMx(1, N);
            if (n1==n2) continue;

            auto key1=clientKey(mp, n1), key2=clientKey(mp, n2);
            auto sk1=to_string(key1->str()), sk2=to_string(key2->str());

            if (n1<=N*WRITE_PROB) {
                unique_lock<shared_timed_mutex> lock(stdShMtx);

                auto it1=stdDB.find(sk1), it2=stdDB.find(sk2);
                perm_assert(it1!=stdDB.end()); perm_assert(it2!=stdDB.end());

                auto cr1=newClientRec(mp, to_ptrsz(&it1->second)), cr2=newClientRec(mp, to_ptrsz(&it2->second));

                int amt1=cr1->getAmt(), amt2=cr2->getAmt();

                int amt=amt1 / 2;
                if (amt==0 || amt2==MAX_AMT) continue;
                if (amt2+amt>MAX_AMT) amt=MAX_AMT-amt2;

                cr1->setAmt(amt1-amt);
                cr1->setNumDeb(cr1->getNumDeb()+1);

                cr2->setAmt(amt2+amt);
                cr2->setNumCred(cr2->getNumCred()+1);

                randSleep(sleep_in);

                stdDB[sk1]=to_string(cr1->get()); stdDB[sk2]=to_string(cr2->get());
            }
            else {
                shared_lock<shared_timed_mutex> lock(stdShMtx);

                auto it1=stdDB.find(sk1), it2=stdDB.find(sk2);
                perm_assert(it1!=stdDB.end()); perm_assert(it2!=stdDB.end());

                auto cr1=newClientRec(mp, to_ptrsz(&it1->second)), cr2=newClientRec(mp, to_ptrsz(&it2->second));

                randSleep(sleep_in);
            }

            done=true;
        }

        randSleep(sleep_out);
    }
    l2->arrive_and_wait();
}

А, ну еще интересный момент!

Т.к. чтение преобладает над записью, и все операции длятся заметное время (никто не забыл про sleep_in?), то вместо обычного mutex-а мы здесь используем read-write lock (т.е. unique_lock<shared_timed_mutex> и shared_lock<shared_timed_mutex>).

А сейчас вариант ders2, который понятнее смотрится и демонстрирует транзакции в деле:

perf6/main.cpp
void ders_check()
{
    mem_pool mpo, *mp=&mpo;
    auto r=dersDB->start_rd(mp);

    vector<ptrsz> vals;
    r->all(0, &vals);

    int sumCred=0, sumDeb=0, sumAmt=0;
    for (int i=0, end=vals.size(); i<end; i++) {
        auto cr=newClientRec(mp, vals[i]);

        int cred=cr->getNumCred(), deb=cr->getNumDeb(), amt=cr->getAmt();
        perm_assert(cred>=0);
        perm_assert(deb>=0);
        perm_assert(amt>=0 && amt<=MAX_AMT);

        sumCred+=cred;
        sumDeb+=deb;
        sumAmt+=amt;
    }

    perm_assert(sumCred==sumDeb);
    perm_assert(sumAmt==totalAmt);
}

void ders_run2(latch* l1, latch* l2)
{
    mem_pool mpo, *mp=&mpo;

    l1->arrive_and_wait();
    for (int i=1; i<=M; i++) {
        for (bool done=false; !done; ) {
            int n1=randMnMx(1, N), n2=randMnMx(1, N);
            if (n1==n2) continue;

            auto key1=clientKey(mp, n1), key2=clientKey(mp, n2);
            ptrsz pk1=key1->str(), pk2=key2->str();

            if (n1<=N*WRITE_PROB) {
                auto t=dersDB->start_tr(mp);

                ptrsz val1, val2;
                bool fnd1=t->get(pk1, &val1), fnd2=t->get(pk2, &val2);
                perm_assert(fnd1); perm_assert(fnd2);

                auto cr1=newClientRec(mp, val1), cr2=newClientRec(mp, val2);

                int amt1=cr1->getAmt(), amt2=cr2->getAmt();

                int amt=amt1 / 2;
                if (amt==0 || amt2==MAX_AMT) continue;
                if (amt2+amt>MAX_AMT) amt=MAX_AMT-amt2;

                cr1->setAmt(amt1-amt);
                cr1->setNumDeb(cr1->getNumDeb()+1);

                cr2->setAmt(amt2+amt);
                cr2->setNumCred(cr2->getNumCred()+1);

                t->depend_val(pk1, val1); t->depend_val(pk2, val2);
                t->set(pk1, cr1->get()); t->set(pk2, cr2->get());

                randSleep(sleep_in);

                auto err=t->commit();
                if (err) continue;
            }
            else {
                auto r=dersDB->start_rd(mp);

                ptrsz val1, val2;
                bool fnd1=r->get(pk1, &val1), fnd2=r->get(pk2, &val2);
                perm_assert(fnd1); perm_assert(fnd2);

                auto cr1=newClientRec(mp, val1), cr2=newClientRec(mp, val2);

                randSleep(sleep_in);
            }

            done=true;
        }

        randSleep(sleep_out);
    }
    l2->arrive_and_wait();
}

В принципе, все то же самое. Только для чтения мы вызываем функцию dersDB->start_rd(mp) и, соответственно, dersDB->start_tr(mp) для чтения/записи.

Но! Обратите внимание на вызовы depend_val(): мы так сообщаем транзакции, что зависим от val1 и val2. Тем самым она завершится ошибкой, если кто-то успел изменить наши записи между start_tr() и commit().

А вот и вариант ders1, с его мутным call_back-ом:

perf6/main.cpp
void ders_run1_cb(mem_db::writer* wr, void* arg)
{
    auto mp=(mem_pool*)arg;

    for (bool done=false; !done; ) {
        int n1=randMnMx(1, N), n2=randMnMx(1, N);
        if (n1==n2) continue;

        auto key1=clientKey(mp, n1), key2=clientKey(mp, n2);
        ptrsz pk1=key1->str(), pk2=key2->str();

        ptrsz val1, val2;
        bool fnd1=wr->get(pk1, &val1), fnd2=wr->get(pk2, &val2);
        perm_assert(fnd1); perm_assert(fnd2);

        auto cr1=newClientRec(mp, val1), cr2=newClientRec(mp, val2);

        int amt1=cr1->getAmt(), amt2=cr2->getAmt();

        int amt=amt1 / 2;
        if (amt==0 || amt2==MAX_AMT) continue;
        if (amt2+amt>MAX_AMT) amt=MAX_AMT-amt2;

        cr1->setAmt(amt1-amt);
        cr1->setNumDeb(cr1->getNumDeb()+1);

        cr2->setAmt(amt2+amt);
        cr2->setNumCred(cr2->getNumCred()+1);

        wr->set(pk1, cr1->get()); wr->set(pk2, cr2->get());

        randSleep(sleep_in);
        done=true;
    }
}

void ders_run1(latch* l1, latch* l2)
{
    mem_pool mpo, *mp=&mpo;

    l1->arrive_and_wait();
    for (int i=1; i<=M; i++) {
        if (randMnMx(1, N)<=N*WRITE_PROB) {
            dersDB->write(ders_run1_cb, mp);
        }
        else {
            for (bool done=false; !done; ) {
                int n1=randMnMx(1, N), n2=randMnMx(1, N);
                if (n1==n2) continue;

                auto key1=clientKey(mp, n1), key2=clientKey(mp, n2);
                ptrsz pk1=key1->str(), pk2=key2->str();

                auto r=dersDB->start_rd(mp);

                ptrsz val1, val2;
                bool fnd1=r->get(pk1, &val1), fnd2=r->get(pk2, &val2);
                perm_assert(fnd1); perm_assert(fnd2);

                auto cr1=newClientRec(mp, val1), cr2=newClientRec(mp, val2);

                randSleep(sleep_in);
                done=true;
            }
        }

        randSleep(sleep_out);
    }
    l2->arrive_and_wait();
}

Хотя почему сразу мутным? Просто нужно понять концепцию.

А идея проста и удобна: с помощью вызова dersDB->write() мы производим прямую запись! Т.е. нет двух отдельных start_tr() и commit(), между которыми кто-то вклинится.

И обратите внимание, что мы блокируем mem_db, не давая другим начинать/завершать транзакции! Т.е. это по сути аналог синхронизированного контейнера. А раз так, то сравним его с std_run()!

Итак, первая табличка -- результаты работы perf6 cо sleep_in и sleep_out равными одной миллисекунде:

1ms 1ms Ubuntu 16, g++ 5.4.0
num_threads 1 2 3 4 8 16 32 64 128
std/ders1 1.0 1.1 1.1 1.2 1.6 2.0 2.1 2.1 2.0
std/ders2 1.0 1.2 1.3 1.5 2.6 4.9 9.7 17.9 33.7

Песня!

Хотя ders1 и ders2 выполняют ГОРАЗДО больше работы, они не отстали вначале! И стабильно ушли в отрыв, достигнув тридцатикратной разницы!!

Для варианта ders2, конечно. Для которого все и писалось. Но и ders1 не отстал! И стабильно ушел на два раза. Получи старик под мапу!

А теперь увеличим в пять раз задержки:

5ms 5ms Ubuntu 16, g++ 5.4.0
num_threads 1 2 3 4 8 16 32 64 128
std/ders1 1.0 1.0 1.0 1.1 1.4 1.7 1.7 1.8 1.7
std/ders2 1.0 1.0 1.2 1.3 2.1 3.9 7.5 15.0 27.6

И получим все то же самое, плюс-минус лапоть... Нет, все-таки ВДУМАЙТЕСЬ!

Тридцать раз разницы при серьезной нагрузке!! И чем больше работы, тем лучше.

Где вы на халяву такое видели?


6. Заключение

Хеш-таблица с транзакциями? Безоговорочно! Просто бери и используй.

И в чем подвох? Вот так вот тридцать раз на ровном месте??

Ну, кто читал Горизонтальное масштабирование уже знает, что ускорение отдельного компонента почти никогда не влияет на Бизнес. Нет смысла стараться нигде, кроме узкого места!

У меня получилось. Увенчается и у вас!
Copyright © С. Деревяго, 2024

Никакая часть данного материала не может быть использована в коммерческих целях без письменного разрешения автора.