Rx с Go Generics

TLDR;

Трудно писать масштабируемые параллельные программы с большим количеством композиций async-операций, корректно обрабатывающих композиции Lifecycle с надлежащей поддержкой распространения ошибок и разрыва. Мы покажем эти ограничения на официальных асинхронных строительных блоках Go, затем обсудим неофициальные решения, и в конце концов представим экспериментальную реализацию Rx (gopkg.in/rx.v0), которая обеспечивает окончательную абстракцию.


Введение

Потенциал Go для работы с большим параллелизмом просто поразителен. Особенно он предлагает три интересные возможности — Go routine, каналы и Context. Я любил и ненавидел работу с ними, но есть и другие части языка, которые делают написание масштабируемого параллельного кода обременительным. В частности, из-за отсутствия generics я обнаружил, что пишу много похожего кода с просто разными типами данных.

Параллельные программы состоят из множества асинхронных операций. Масштабируемая система программирования для параллельных приложений должна уметь легко компоновать, оркестровать и планировать асинхронные операции.

Представление жизненного цикла

Мне нравится абстрагировать асинхронные операции с помощью жизненных циклов. Представление жизненного цикла асинхронной операции состоит из одного начала, одного конца, с потенциально от нуля до многих ценных входов и выходов, и нуля или одной точки прерывания либо ошибкой, либо отменой. Никаких ценных входов или выходов не может произойти до начала, после прерывания или после окончания.

Если говорят, что дочерний Жизненный цикл ограничен родительским Жизненным циклом, это означает:

  • дочерний Жизненный цикл может начаться не раньше начала и не позже окончания родительского Жизненного цикла.
  • дочерний Жизненный цикл может закончиться не позднее окончания родительского Жизненного цикла.
  • дочерний Жизненный цикл должен быть прерван вместе с родительским Жизненным циклом.

Прежде чем мы сможем правильно говорить о композиции асинхронных операций, нам нужно понять, как мы можем представить асинхронные операции и их Lifecycle в коде, в Golang.

Для наглядности начнем с синхронной функции:

func plusOne(x int) int {
    return x + 1
}
Вход в полноэкранный режим Выйти из полноэкранного режима

Эта операция начинается с входа в функциональный блок. В начале она принимает входной сигнал x, производит с ним вычисления и выводит результат. Затем операция заканчивается на выходе из функционального блока. Мы видим, что у нее есть все признаки жизненного цикла, кроме прерывания. Поскольку это синхронная операция, мы никак не можем ее прервать. Но мы должны иметь возможность прервать асинхронную операцию.

Теперь давайте введем прерывание, преобразовав описанную выше операцию в асинхронную версию:

func plusOne(ctx context.Context, input <-chan int) chan<- int {
    output := make(chan int)
    go func() {
        for {
            select {
            case x := <-input:
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Войти в полноэкранный режим Выход из полноэкранного режима

На этот раз мы использовали три интересные и мощные возможности Golang, которые являются основными строительными блоками для асинхронных операций: Go routine, каналы и Context.

Goroutines

Goroutines, вероятно, самая простая из этих трех функций. go f() — это специальный синтаксис запуска Goroutines. Он означает запуск нового «потока», и выполнение функции f() на этом «потоке». Таким образом, строки после нее могут продолжать выполняться, не дожидаясь завершения f(). А f() может продолжать выполняться до своего завершения даже после того, как функция, запускающая процедуру Go, вернулась. Итак, в этом примере мы создали выходную переменную, запустили Goroutine, чтобы сделать что-то в другом «потоке», и просто вернули выходную переменную.

Я взял «поток» в кавычки, потому что это не тот системный поток, который мы знали раньше, а нечто гораздо более легковесное. Мы не будем углубляться в детали этого. Вы можете просто концептуально считать их потоком для пользы этой статьи.

Каналы

С другой стороны, каналы — это то, что выглядит просто, но с большим количеством подвохов. Когда вы добавляете префикс chan перед другим типом, вы получаете новый тип, который является каналом для этого типа. Вы создаете канал с помощью встроенной функции make(chan int). Каналы могут иметь внутренний размер буфера, но мы не будем рассматривать это в данной статье. К каналу можно применить три операции.

Первая — это передача значения через канал. Мы пишем c <- x, чтобы обозначить отправку значения переменной x через канал c.

Второй — получение значения из канала. Мы пишем y = <- c для обозначения получения значения из канала c и установки его в переменную y.

Третье — вы можете вызвать close(c) на канале. Каждый канал имеет состояние либо открыт, либо закрыт. close(c) изменит состояние канала c с открытого на закрытое. Закрытый канал не может быть открыт снова, а вызов close(c) на уже закрытом канале приведет к панике.

Когда канал открыт, операция отправки будет блокироваться до тех пор, пока операция получения не будет выполнена на другом конце, в другом «потоке»; а операция получения будет блокироваться до тех пор, пока операция отправки не будет выполнена на другом конце, в другом «потоке». Однако, если канал закрыт, все усложняется: операция отправки будет немедленно завершена с информацией о панике: panic: send on closed channel; а операция получения будет немедленно завершена с возвратом пустого значения. Например, x = <-c, где c — закрытый канал chan int, немедленно завершится с x, установленным в пустое значение int 0. Чтобы отличить полученное значение из открытого канала от закрытого, можно использовать x, ok = <-c, чтобы одновременно получить значение канала и булево состояние ok, где true, если канал открыт, и false, если канал закрыт.

Часто для создания одного сигнала переключения необходимо сначала создать канал, а затем закрыть его для переключения. Таким образом, до переключения любая операция получения на канале будет блокироваться, а после переключения канал станет закрытым, и все операции получения завершатся с пустым значением. И будущая операция получения также будет немедленно завершена пустым значением. Практическим примером такого одиночного сигнала переключения является сигнал прерывания, например, отмена.

Для небуферизованных каналов (а именно их мы используем в этой статье) отправка данных будет блокироваться вечно, если никто не получает данные, и наоборот. Успешная операция отправки должна совпадать с успешной операцией получения, иначе избыточная операционная сторона будет блокироваться (за некоторыми исключениями, описанными ниже).

Заявление select.

Однако существует способ избежать блокировки при выполнении операций отправки и получения каналов:

select {
case c <- x:
default:
}

select {
case y = <- c:
default:
}
Войдите в полноэкранный режим Выйти из полноэкранного режима

Таким образом, операция отправки или получения будет пропущена, если ее нельзя завершить немедленно.

Оператор select с несколькими case каналами ввода/вывода будет блокироваться до тех пор, пока ни один из этих case не сможет завершиться. Если несколько case могут завершиться одновременно, select случайным образом выберет и выполнит один из них. Оператор select с предложением default попытается проверить, может ли какой-либо из case быть завершен немедленно, и выполнит предложение default, если таковое отсутствует.

Это означает, что мы можем использовать select для блокировки нескольких операций ввода/вывода по каналу вместе, и позволить выполнить первую из них:

select {
case output <- x + 1:
case <-ctx.Done():
    return
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Эта часть кода будет блокировать операцию записи на канале output, и операцию чтения на канале ctx.Done(). Он будет работать до тех пор, пока ни один из этих двух случаев не завершится, и продолжит выполнение этого случая. В этом конкретном примере канал ctx.Done() является источником прерывания. Значение, полученное от этого канала, означает, что операция Lifecycle была отменена.

Мы используем этот оператор select как для получения значения из входного канала, так и для отправки результата в выходной канал, поскольку обе эти операции являются блокирующими, которые мы хотим прервать, если Жизненный цикл будет прерван.

Контекст Go

Сейчас самое время объяснить, что такое Go Context. Go Context — это попытка Golang представить некоторую часть жизненного цикла в качестве аргумента функции, обычно передаваемого в качестве первого параметра, каждой функции, выполняющей асинхронные операции. Контекст может регистрировать различные виды сигналов отмены и порождать при этом дочерние Контексты.

Например, родительский Контекст может породить дочерний Контекст с явной функцией отмены:

func parent(parentCtx context.Context) {
    childCtx, cancel := context.WithCancel(parentCtx)
    go func() {
        <-time.NewTimer(time.Second).C
        cancel()
    }()
    child(childCtx)
}
Войти в полноэкранный режим Выход из полноэкранного режима

Жизненный цикл дочернего Контекста ограничен жизненным циклом его родительского Контекста. Это означает, что если жизненный цикл родительского контекста прерывается или завершается, жизненный цикл дочернего контекста перейдет в то же состояние. Кроме того, поскольку мы создали явный сигнал отмены — функцию cancel, жизненный цикл дочернего контекста также может быть прерван вызовом cancel(), и это прервет жизненный цикл дочернего контекста сигналом отмены — канал childCtx.Done() будет закрыт, и все операции получения на этом канале завершатся с пустым значением. В данном примере мы порождаем Goroutine для отсчета 1 секунды, а затем вызываем cancel(), чтобы прервать жизненный цикл дочернего контекста сигналом отмены.

Поскольку приведенный выше пример является распространенным случаем использования, в пакет Go Context включена еще одна вспомогательная функция для регистрации тайм-аута порожденного дочернего контекста:

childCtx, cancel := context.WithTimeout(parentCtx, time.Second)
Войти в полноэкранный режим Выход из полноэкранного режима

Объяснив эти основные строительные блоки операций async, мы можем, наконец, обратиться к нашему предыдущему примеру кода async:

func plusOne(ctx context.Context, input <-chan int) chan<- int {
    output := make(chan int)
    go func() {
        for {
            select {
            case x := <-input:
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Вход в полноэкранный режим Выход из полноэкранного режима

Жизненный цикл начинается при входе в функциональный блок, его дочерний жизненный цикл ограничен родительским жизненным циклом, начиная с входа в функциональный блок Go routine. Жизненный цикл потребляет входные значения и выдает выходные значения с помощью канальных операций на каналах input и output. Он получает прерывание отмены от получения канала ctx.Done(), и окончательно завершается после возврата из подпрограммы Go.

Композиция асинхронных операций

Теперь мы наконец-то можем должным образом обсудить композицию асинхронных операций. Композиция двух асинхронных операций должна выполнять две задачи:

  1. Она должна правильно направить входные и выходные значения между двумя операциями, обычно одна из них является источником, а другая — получателем.
  2. Она должна управлять жизненными циклами двух операций и в результате генерировать составленный жизненный цикл.

Первую часть легко понять. Это то же самое, что и составление синхронных операций. Например:

func plusOne(x int) int {
    return x + 1
}

func double(x int) int {
    return x * 2
}

func plusOneThenDouble(x int) int {
    return double(plusOne(x))
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Здесь мы подключаем вход составленной функции к входу plusOne. Затем подключаем выход из plusOne к входу double. Наконец, подключите выход double в качестве выхода составленной функции.

Теперь давайте превратим plusOne и double в асинхронные операции. Мы уже написали plusOne выше, теперь сделаем то же самое для double:

func plusOne(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}

func double(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x * 2:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Войти в полноэкранный режим Выйти из полноэкранного режима

На этот раз мы также рассмотрим случай, когда каналы input закрываются. Процедура Go завершится, если либо канал input будет закрыт, либо если Context будет отменен. Таким образом, завершение операции может происходить в двух направлениях — операция источника, производящая входы для текущей операции, может завершить текущую операцию, а отмена Context, предоставленного потребителем, также может прервать и завершить текущую операцию. Мы также добавили defer close(output) в процедуру Go, чтобы она всегда закрывала свой output канал непосредственно перед завершением.

Теперь давайте попробуем сделать композицию из этих двух операций, удовлетворяющую двум нашим требованиям композиции асинхронных операций.

func plusOneThenDouble(ctx context.Context, input <-chan int) <-chan int {
    return double(ctx, plusOne(ctx, input))
}
Вход в полноэкранный режим Выйти из полноэкранного режима

Как мы видим, для простой композиции асинхронных операций Go Context позволяет сделать это очень просто. Мы можем просто передать Context дочерним операциям, и их жизненный цикл будет ограничен переданным Context.

До сих пор все выглядело красиво и хорошо. Однако сейчас я покажу вам темную сторону медали. Речь идет в основном о двух вещах, которые обычно рассматриваются вместе — распространении ошибок и разрыве после прерывания.

Теперь рассмотрим другую композицию функций, которая связана с ошибкой:

func f(x, y int) (int, error) {
    x = double(x)
    y = plusOne(y)
    if y == 0 {
        err = errors.New("divide by zero")
    }
    return x / y, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Пытаться написать асинхронную версию этой композиции было бы муторно. Во-первых, мы не можем вернуть ошибку вместе с каналом output, потому что реальная работа происходит в другом потоке, и мы не можем узнать об ошибке до того, как закончится вся работа в другом потоке. Если мы подождем, пока вся работа завершится, получим ошибку от другого потока и вернем ее, это будет означать, что вызывающая сторона не сможет начать получать результат до завершения всех работ. Это противоречит цели асинхронной работы — мы должны обеспечить способ сигнализировать и распространять ошибки во время фактической работы вне потока f().

// NOT A GOOD PATTERN, DON'T USE!
func f(ctx context.Context, x, y <-chan int) (<-chan int, <-chan error) {
    output := make(chan int)
    errorC := make(chan error)
    x = double(ctx, x)
    y = plusOne(ctx, y)
    ctx, cancel := context.WithCancel(ctx)
    go func() {
        var err error
        defer func() {
            cancel()
            close(output)
            errorC <- err
            close(errorC)
        }()

        for {
            var ok bool
            var a, b int
            select {
            case a, ok = <-x:
                if !ok {
                    return
                }
            case <-ctx.Done():
                return
            }
            select {
            case b, ok = <-y:
                if !ok {
                    return
                }
            case <-ctx.Done():
                return
            }
            if y == 0 {
                err = errors.New("divide by zero")
                return
            }
            select {
            case output <- a / b:
            case <-ctx.Done():
                return
            }
        }
    }()
    return output, errorC
}
Вход в полноэкранный режим Выход из полноэкранного режима

Чтобы использовать эту составленную функцию, вызывающая сторона должна использовать возвращаемый канал ошибок:

// NOT A GOOD PATTERN, DON'T USE!
output, errorC := f(ctx, x, y)
for n := range output {
    fmt.Println(n)
}
if err := <-errorC; err != nil {
    panic(err)
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Эта реализация сложна, давайте вникнем, чтобы понять, что мы сделали. Во-первых, возвращается два канала, один для вывода значения, а другой для вывода ошибки. Вывод значения может выдавать нулевое или несколько значений и закрываться. Выход ошибки будет выдавать одно значение и закрываться. Потребитель должен осушить оба канала, иначе асинхронная операция может заблокироваться навсегда без завершения.

Затем порождается дочерний Context с явным сигналом отмены. Дочерний Context отменяется в конце процедуры Go (в операторе defer). Это означает, что он прервет дочерние асинхронные операции (double и plusOne), когда текущая операция завершится.

В теле основной функции есть цикл for, который считывает пары значений из выходных каналов double и plusOne. Он получает каждое значение, проверяя прерывание родительского Контекста, и завершает свою работу в случае прерывания.

Затем он проверяет наличие ошибки деления на ноль, устанавливает ошибку и выходит из программы, если она обнаружена. В случае такой ошибки операторы defer выполнят следующее: явно отменят дочерние жизненные циклы контекста, позволяя double и plusOne завершиться; закроют канал output, чтобы потребитель больше не получал из него значения success; отправят значение err в канал errorC. Поскольку мы требуем, чтобы потребитель осушил оба канала output и errorC, эта операция отправки будет завершена. Наконец, он закрывает канал errorC, чтобы потребитель больше не мог получать значения успеха.

Если ошибки нет, он отправит результат деления в канал output, с возможным прерыванием Контекста.

Однако, это зашло слишком далеко и слишком сложно, чтобы отслеживать все, что нужно сделать, чтобы написать работающую и корректную композицию асинхронных операций с использованием этого паттерна. Более того, этот паттерн не масштабируется — если вы добавите канал ошибок к возвратам функций double и plusOne, и попытаетесь передать ошибки от дочерних операций к родительским, это будет кошмар. Я имею в виду, не пробуйте, но если вы это сделали, я хотел бы знать, как это прошло для вас.

Мы видели отмену, которая распространяется путем закрытия канала ctx.Done(), но мы не видим, как ошибки могут распространяться на дочерние Жизненные циклы — лучшее, что мы можем сделать выше, это вручную cancel() дочерних операций, и это не делает различия между ошибкой и отменой — они обе завершают дочерние Жизненные циклы. Но если мы хотим получить такое различие в дочерних жизненных циклах, то нет простого способа сделать это с помощью приведенного выше шаблона. Таким образом, это подытожило проблему распространения ошибок. Затем у нас есть проблема разрыва после прерывания. Если ваша асинхронная операция должна выполнить важную работу после прерывания, например, освободить некоторые важные ресурсы, до того, как она завершится, вы можете зарегистрировать их с помощью оператора defer, но родительская операция не сможет определить, когда вы закончили. Если родитель хочет подождать, пока все дочерние операции освободят все ресурсы и завершатся, пока он не продолжит выполнение, нет масштабируемого способа сделать это с помощью приведенного выше шаблона. Лучшее, что мы можем сделать, это передать WaitGroup и позволить каждой дочерней операции зарегистрировать в ней свою Go рутину, а в каждой дочерней Go рутине зарегистрировать wg.Done() с оператором defer после завершения работы. Затем родительская операция может вызвать wg.Wait(), чтобы подождать, пока все дочерние операции завершат снос. Этот подход не масштабируется, поскольку WaitGroup не может быть вложенной или разветвленной, что делает ее пригодной только на одном конкретном уровне операций для объединения всех дочерних операций. Любая операция дочернего уровня не имеет возможности подключиться и сделать то же самое.

Пакет Tomb

Очень хорошим решением этих проблем наивных асинхронных шаблонов в Go Context является пакет Tomb, написанный Густаво Нимейером. Объяснение и введение пакета можно найти в его блоге здесь. Или TLDR:

Модель проста: Tomb отслеживает, жива ли одна или несколько goroutines, умирает или мертва, а также причину смерти.

Эта модель идеально соответствует тому, что я описал как Lifecycle:

  • жив — значит, после начала Жизненного цикла.
  • умирающий означает, что Жизненный цикл был просто прерван, или если все управляемые им Go-процедуры завершены.
  • мертвый означает, что Жизненный цикл был завершен без прерывания или закончил свое завершение после смерти и завершился.
  • death reason означает ошибку, если таковая возникла во время выполнения операции, или распространяемую ошибку, если она была прервана.

Tomb — это улучшенная альтернатива Go Context. Она имеет интуитивно понятный интерфейс и комплексные инструменты управления жизненным циклом. Используя его, мы можем легче разрабатывать асинхронные операции с надлежащим управлением жизненным циклом и композицией. Давайте рассмотрим адаптированный пример:

func plusOne(t *tomb.Tomb, input <-chan int) <-chan int {
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x + 1:
                case <-t.Dying():
                    return
                }
            case <-t.Dying():
                return
            }
        }
    })
    return output
}

func double(t *tomb.Tomb, input <-chan int) <-chan int {
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x * 2:
                case <-t.Dying():
                    return
                }
            case <-t.Dying():
                return
            }
        }
    })
    return output
}

func f(t *tomb.Tomb, x, y <-chan int) <-chan int {
    x = double(t, x)
    y = plusOne(t, y)
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            var ok bool
            var a, b int
            select {
            case a, ok = <-x:
                if !ok {
                    return
                }
            case <-t.Dying():
                return
            }
            select {
            case b, ok = <-y:
                if !ok {
                    return
                }
            case <-t.Dying():
                return
            }
            if y == 0 {
                err = errors.New("divide by zero")
                return
            }
            select {
            case output <- a / b:
            case <-t.Dying():
                return
            }
        }
    })
    return output
}
Вход в полноэкранный режим Выход из полноэкранного режима

Он похож на наш предыдущий пример, но если присмотреться, то все три асинхронные операции имеют одну и ту же сигнатуру функции, где t *tomb.Tomb является первым параметром. С заменой Go Context мы поступаем несколько иначе:

  1. Вместо использования оператора языка go f() для запуска процедуры Go, мы используем t.Go(func() error { ... }) вместо этого. Это магическим образом свяжет жизненный цикл порожденной процедуры Go с жизненным циклом, представленным объектом Tomb t. Как это делается, мы объясним позже.

  2. Вместо использования <-ctx.Done(), мы используем <-t.Dying() для прослушивания прерывания. Как мы кратко объяснили выше, умирание имеет специфическую семантику в Tomb, которая явно означает, что прерывание только что произошло, в отличие от умирания, которое означает, когда операция завершилась, либо нормально завершилась, либо завершилась после прерывания. В то время как в Go Context у нас нет такого различия.

  3. В комбинированной функции f мы больше не используем отдельный канал errorC для распространения ошибок. Объект Tomb заботится о распространении ошибок, поскольку это часть жизненного цикла операции. Когда вы порождаете завернутую в Tomb Go-рутину с помощью t.Go(func() error { ... }), ошибка, возвращаемая из функции рутины, будет проверена Tomb. Если она не равна nil, она будет рассматривать ее как ошибку и выдаст прерывание с этой ошибкой в жизненном цикле Tomb.

Когда Tomb будет прерван, он закроет свой канал t.Dying(), делая все приемники канала завершенными. Гороутины, зарегистрированные в объекте Tomb, должны прослушивать этот канал t.Dying() и переходить к завершению работы, как только получат сигнал о прерывании.

Tomb также можно прервать явно, вызвав t.Kill(err). При этом не будет проверяться, является ли переданная ошибка не nil. Он всегда будет прерывать жизненный цикл Tomb с переданным значением ошибки. Здесь принято считать, что если ошибка прерывания равна nil, то мы должны считать ее отменой, а если она не равна nil, то мы должны считать ее фактической ошибкой, распространяемой на этот жизненный цикл. Если прерывание происходит более одного раза, либо потому что несколько Goroutines вернули ошибки non-nil, либо потому что t.Kill(err) вызывается несколько раз, только первое прерывание будет подтверждено, а остальные будут проигнорированы.

Объект Tomb сохраняет значение ошибки прерывания, т.е. причину смерти, и может быть запрошен командой t.Err().

На этом распространение ошибок в Tomb завершено. Теперь как Tomb узнает, что асинхронная операция, включая все ее дочерние Lifecycle, завершилась? Поскольку в данном шаблоне каждая порожденная Goroutine запускает свое завершение после получения сигнала о прерывании, Goroutine вернется только тогда, когда все ее завершение будет закончено. А поскольку мы регистрируем Go-рутину в обертке Tomb, Tomb отслеживает каждую Go-рутину, которую он помог породить, и когда все эти Go-рутины вернулись, объект Tomb может быть уверен, что больше нет никаких асинхронных операций или срывов, он переведет свое состояние в состояние dead и закроет свой канал t.Dead().

Tomb также предоставляет функцию err = t.Wait(), которая будет блокироваться, пока t не перейдет в состояние dead, и вернет причину смерти.

Используя Tomb, мы можем составлять асинхронные операции масштабируемым способом, особенно потому, что он имеет универсальную систему распространения ошибок, а также инструменты для отслеживания разрыва и реальной смерти дочерних операций.

Давайте немного оттолкнемся от этого, чтобы попробовать что-то более продвинутое и посмотреть, как Tomb может с этим справиться.

Fan-in и Fan-out

Давайте напишем асинхронную операцию, которая принимает на вход имена пользователей Twitter и для каждого пользователя получает первые 10 твитов и загружает изображения в этих твитах. У нас может быть 5 параллельных рабочих, загружающих твиты пользователей, и еще 10 параллельных рабочих, загружающих изображения. Мы проигнорируем соответствующую интеграцию с API Twitter и немного упростим ситуацию, предположив, что у нас уже есть пара рабочих функций, работающих с API Twitter.

func BindTomb(parent *tomb.Tomb, child *tomb.Tomb, teardown func() error) {
    parent.Go(func() (err error) {
        select {
        case <-child.Dying():
            err = child.Wait()
        case <-t.Dying():
            child.Kill(t.Err())
        }
        if teardown != nil {
            parent.Go(teardown)
        }
        return
    })
}

func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
    tweets := make(chan *Tweet)
    images := make(chan *Image)

    tweetT := new(tomb.Tomb)
    imageT := new(tomb.Tomb)

    for i := 0; i < 5; i++ {
        tweetT.Go(func() (err error) {
            for {
                var ok bool
                var user string
                var tweet *Tweet
                select {
                case user, ok = <-users:
                    if !ok {
                        return
                    }
                case tweetT.Dying():
                    return
                }
                if tweet, err = FetchUserTop10Tweets(tweetT, user); err != nil {
                    return
                }
                select {
                case tweets <- tweet:
                case tweetT.Dying():
                    return
                }
            }
        })
    }

    for i := 0; i < 10; i++ {
        imageT.Go(func() (err error) {
            for {
                var ok bool
                var tweet *Tweet
                var image *Image
                select {
                case tweet, ok = <-tweets:
                    if !ok {
                        return
                    }
                case imageT.Dying():
                    return
                }
                if image, err = DownloadTweetImage(imageT, tweet); err != nil {
                    return
                }
                select {
                case images <- image:
                case imageT.Dying():
                    return
                }
            }
        })
    }

    BnidTomb(t, tweetT, func() (err error) {
        close(tweets)
        return
    })

    BnidTomb(t, imageT, func() (err error) {
        close(images)
        return
    })

    return images
}
Вход в полноэкранный режим Выход из полноэкранного режима

Поскольку мы порождаем две группы параллельных рабочих, мы хотим иметь тонкий контроль над их жизненным циклом как двух разных групп — мы хотим закрыть их выходной канал после завершения работы каждой группы (смерти). Чтобы обеспечить такой уровень контроля, мы создаем два объекта Tomb, tweetT и imageT, для управления жизненными циклами двух рабочих групп.

Затем мы написали магическую функцию BindTomb для связывания дочернего объекта Tomb с родительским объектом Tomb, чтобы жизненный цикл дочернего Tomb был ограничен жизненным циклом родительского Tomb. Эта функция делает 4 вещи:

  1. Распространяет причину смерти дочернего Lifecycle на родительский Lifecycle.
  2. Она передает прерывание из родительского Lifecycle в дочерний Lifecycle.
  3. Он продлевает окончание родительского жизненного цикла до окончания дочернего жизненного цикла.
  4. Она дает возможность выполнить дополнительное завершение после окончания дочернего Жизненного цикла.

Мы используем эту функцию для привязки жизненного цикла томбов tweetT и imageT к жизненному циклу родительского томба t, и регистрируем дополнительное разрушение, которое закрывает их выходной канал после окончания жизненного цикла их рабочей группы.

Несмотря на то, что теперь мы можем писать корректные и масштабируемые композиции операций async для таких сложных случаев, как этот, мы все еще сталкиваемся с тем, что снова и снова переписываем множество шаблонных кодов. И из-за отсутствия generics многие из этих кодов не могут быть абстрагированы с помощью вызовов функций.

Generics

Недавно, после выхода стабильного релиза Go 1.18, Go наконец-то получил возможность делать дженерики. Давайте попробуем упростить шаблонные коды в приведенном выше примере:

func Receive[T any](t *tomb.Tomb, c <-chan T) (value T, ok bool) {
    select {
    case value, ok = <-c:
    case t.Dying():
    }
    return
}

func Send[T any](t *tomb.Tomb, c chan<- T, value T) bool {
    select {
    case c <- value:
        return true
    case t.Dying():
    }
    return false
}

func MakeChildTombAndChan[T any](t *tomb.Tomb) (childT *tomb.Tomb, childC chan T) {
    childC = make(chan T)
    childT = new(tomb.Tomb)
    t.Go(func() (err error) {
        select {
        case <-childT.Dead():
            err = childT.Err()
        case <-t.Dying():
            childT.Kill(t.Err())
        }
        close(childC)
        return
    })
}

func FanOut[T any, S any](t *tomb.Tomb, workers int, inputs <-chan T, outputs chan<- S, project func(*tomb.Tomb, T) (S, error)) {
    for i := 0; i < 5; i++ {
        t.Go(func() (err error) {
            for {
                var ok bool
                var input T
                var output S
                if input, ok = Receive(t, inputs); !ok {
                    return
                }
                if output, err = project(t, input); err != nil {
                    return
                }
                if !Send(t, outputs, output) {
                    return
                }
            }
        })
    }
}

func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
    tweetT, tweets := MakeChildTombAndChan[*Tweet](t)
    imageT, images := MakeChildTombAndChan[*Image](t)

    FanOut(tweetT, 5, users, tweets, FetchUserTop10Tweets)
    FanOut(imageT, 10, tweets, images, DownloadTweetImage)

    return images
}
Вход в полноэкранный режим Выход из полноэкранного режима

В итоге мы получим нечто гораздо более простое. Все абстрактные вспомогательные функции могут быть повторно использованы для того же шаблона, а в основной функции compose осталось всего несколько строк кода.

ReactiveX (Rx)

Однако мы делаем это только для паттерна Fan-in & Fan-out. Но существует множество других паттернов асинхронной композиции. Если мы хотим написать множество других паттернов в нескольких строках кода, нам понадобится библиотека для абстрагирования этих шаблонов для нас, паттерн за паттерном.

К счастью, существует библиотека ReactiveX, или просто Rx, не зависящая от языка, в которой есть не только абстракция для Lifecycle, но и коллекция общих композиций асинхронных операций, которые имеют семантические определения того, как компоновать их Lifecycle.

Поскольку большинство реализаций Rx сильно зависят от использования generics, существующие реализации Rx в Golang либо не очень хорошо используют систему типов Golang (повсеместно используя interface{}), либо используют устаревший или неофициальный дизайн generics, который несовместим с сегодняшним релизом Go 1.18 generics.

Поэтому, почему бы нам не попробовать построить нашу собственную реализацию Rx с использованием дженериков Go 1.18?

Если вы хотите посмотреть на конечный результат, перейдите по адресу https://github.com/go-rx/rx. Но имейте в виду, что это экспериментальный проект, который я просто собрал за выходные.

ReactiveX (Rx) — это модель программирования Observable. Она определяет Observable как операцию, которая выводит поток значений. Жизненный цикл Observable начинается, когда некий «подписчик» вызывает его метод Subscribe(). Операция может посылать выходные значения подписчику, может сигнализировать подписчику об ошибке, может слушать прерывание от подписчика, а после прерывания она завершается.

Мы можем определить Observable как интерфейс, на который можно «подписаться»:

type Observable[T any] interface {
    Subscribe(subscriber Writer[T])
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Здесь мы используем общий параметр типа T для указания типа значения, которое будет выводить Observable. Мы видим, что subscriber имеет тип Writer[T]. Давайте посмотрим определение типа для Write[T] и сопутствующего ему типа Reader[T]:

type Writer[T any] interface {
    Lifecycle
    Write(T) bool
}

type Reader[T any] interface {
    Lifecycle
    Read() (T, bool)
}

type Lifecycle interface {
    Kill(error)
    Dead() <-chan struct{}
    Dying() <-chan struct{}
    Wait() error
    Go(func() error)
    Err() error
    Alive() bool
    Context(context.Context) context.Context
}
Вход в полноэкранный режим Выход из полноэкранного режима

Типы Writer[T] и Reader[T] представляют собой комбинацию канала Go и средств управления жизненным циклом, заимствованных из семантики Tomb. Несмотря на то, что базовая реализация может использовать канал Go, мы определили функции Read() и Write вместо того, чтобы напрямую открывать канал Go по нескольким причинам:

  1. Мы не хотим закрывать каналы. Трудно правильно закрыть канал Go, особенно когда в нем могут происходить операции отправки и получения. Если мы закроем канал в то время, когда кто-то отправляет в него данные, вся программа Go запаникует. Поэтому мы не хотим использовать закрытие канала как индикатор завершения потока.
  2. Мы хотим, чтобы основной канал был ограничен своим жизненным циклом. Это означает, что ok := Writer[T].Write(value) и value, ok := Reader[T].Read() завершатся с ok == true, если значение прошло через нижележащий канал, или с ok == false, если их Lifecycle умирает.
  3. Мы хотим, чтобы пользователи библиотеки думали в терминах жизненных циклов, а не основных строительных блоков async. Такая абстракция скрывает эти детали.

Этот дизайн интерфейса немного отличается от обычных реализаций Rx, но давайте посмотрим, как он может помочь в построении паттернов композиции асинхронных операций.

Давайте начнем с чего-то простого — напишем временной тикер как Observable.

func Ticker(period time.Duration) Observable[int] {
    return Func(func(subscriber Writer[int]) (err error) {
        if period == 0 {
            return
        }
        count := 0
        ticker := time.NewTicker(period)
        for {
            select {
            case <-ticker.C:
                if !subscriber.Write(count) {
                    return
                }
                count++
            case <-subscriber.Dying():
                ticker.Stop()
                return
            }
        }
    })
}
Вход в полноэкранный режим Выход из полноэкранного режима

В этом примере мы хотим продемонстрировать помощник Func. Он берет функцию и превращает ее в Observable. Реализация довольно проста:

func Func[T any](f func(subscriber Writer[T]) error) Observable[T] {
    return functionObservable[T](f)
}

type functionObservable[T any] func(subscriber Writer[T]) error

func (o functionObservable[T]) Subscribe(subscriber Writer[T]) {
    subscriber.Go(func() error {
        return o(subscriber)
    })
}
Вход в полноэкранный режим Выход из полноэкранного режима

Когда возвращаемая Observable подписана, предоставленная функция будет запущена на Go-рутине, которая привязана к Lifecycle подписчика. Это означает, что все блокирующие операции во внутренней функции будут асинхронными по отношению к подписчику.

В приведенном выше примере Ticker есть внутренняя функция, которая блокирует Ticker и отправляет значения подписчику. Это базовый пример создания Observables.

Затем мы можем написать общие шаблоны комбинации асинхронных операций в виде операторов Observable, которые принимают одну или несколько Observable и выполняют комбинацию и преобразование над ними, в результате чего получается новая Observable. Давайте рассмотрим, как можно написать простой оператор Observable Map:

func Map[T any, S any](source Observable[T], project func(T) (S, error)) Observable[S] {
    return Func(func(subscriber Writer[S]) (err error) {
        writer, reader := Pipe(PipeWithParentLifecycle[T](subscriber))
        source.Subscribe(writer)
        for {
            if input, ok := reader.Read(); ok {
                var output S
                if output, err = project(input); err != nil {
                    return
                }
                if !subscriber.Write(output) {
                    return
                }
            } else {
                return
            }
        }
    })
}
Войти в полноэкранный режим Выход из полноэкранного режима

Здесь мы представим еще одну служебную функцию Pipe, которая создаст пару writer и reader, которые разделяют один и тот же жизненный цикл и лежащий в основе канал значений. Это означает, что если вы записываете в писатель, вы можете читать значение из читателя. В данном случае опция PipeWithParentLifecycle[T](subscriber) сделала новый Жизненный цикл трубы дочерним Жизненным циклом родительского Жизненного цикла — subscriber. Это означает, что жизненный цикл писателя и читателя ограничен жизненным циклом подписчика.

Нет, давайте посмотрим на нечто более продвинутое — SwitchMap:

func SwitchMap[T any, S any](source Observable[T], project func(T) Observable[S]) Observable[S] {
    return Func(func(subscriber Writer[S]) (err error) {
        outerWriter, outerReader := Pipe(PipeWithParentLifecycle[T](subscriber))
        source.Subscribe(outerWriter)
        var innerLifecycle Lifecycle
        for {
            var ok bool
            var outerValue T
            var innerObservable Observable[S]

            if outerValue, ok = outerReader.Read(); !ok {
                if innerLifecycle != nil {
                    innerLifecycle.Wait()
                }
                return
            }

            if innerLifecycle != nil {
                innerLifecycle.Kill(nil)
            }

            innerObservable = project(outerValue)
            innerWriter, innerReader := Pipe(PipeWithParentLifecycle[S](subscriber))
            subscriber.Go(func() (err error) {
                for {
                    if innerValue, ok := innerReader.Read(); ok {
                        // forward inner observable value to outter subscriber
                        if !subscriber.Write(innerValue) {
                            return
                        }
                    } else {
                        return
                    }
                }
            })

            innerLifecycle = innerReader
            innerObservable.Subscribe(innerWriter)
        }
    })
}
Вход в полноэкранный режим Выход из полноэкранного режима

SwitchMap будет управлять innerLifecycle, который является подпиской на результат функции project. Каждый раз, когда он получает значение от внешней Observable источника, он project это значение во внутреннюю Observable и Subscribe на нее, пересылая значения из внутренней Observable во внешнюю subscriber. Если в то время, когда у нас была внутренняя подписка, из внешней исходной Observable поступит какое-либо последующее значение, мы уничтожим внутреннюю подписку и подпишемся на новую внутреннюю Observable, спроецированную на новое внешнее значение.

Давайте воспользуемся этой библиотекой Rx и напишем что-нибудь более практичное.

package main

import (
    "crypto/sha256"
    "encoding/hex"
    "fmt"
    "io"
    "net/http"
    "time"

    "github.com/google/go-github/v43/github"
    "golang.org/x/time/rate"
    "gopkg.in/rx.v0"
)

func main() {
    client := github.NewClient(nil)
    rl := rate.NewLimiter(rate.Every(time.Second), 1)

    fetchTopContributors := func(owner string, repository string) rx.Observable[*github.Contributor] {
        return rx.Defer(func(subscriber rx.Lifecycle) rx.Observable[*github.Contributor] {
            ctx := subscriber.Context(nil)
            rl.Wait(ctx)
            contribs, _, err := client.Repositories.ListContributors(ctx, owner, repository, nil)
            if err != nil {
                return rx.Error[*github.Contributor](err)
            }
            return rx.List(contribs)
        })
    }

    fetchUserFollowings := func(contrib *github.Contributor) rx.Observable[*github.User] {
        return rx.Func(func(subscriber rx.Writer[*github.User]) (err error) {
            if contrib.Login == nil {
                return
            }
            ctx := subscriber.Context(nil)
            rl.Wait(ctx)
            followings, _, err := client.Users.ListFollowing(ctx, *contrib.Login, nil)
            if err != nil {
                return
            }
            for _, following := range followings {
                if !subscriber.Write(following) {
                    return
                }
            }
            return
        })
    }

    downloadUserProfileImages := func(user *github.User) rx.Observable[[]byte] {
        return rx.Func(func(subscriber rx.Writer[[]byte]) (err error) {
            if user.AvatarURL == nil {
                return
            }
            resp, err := http.Get(*user.AvatarURL)
            if err != nil {
                return
            }
            defer resp.Body.Close()

            image, err := io.ReadAll(resp.Body)
            if err != nil {
                return
            }
            if !subscriber.Write(image) {
                return
            }
            return
        })
    }

    contribs := fetchTopContributors("ReactiveX", "rxjs")
    followings := rx.MergeMap(contribs, fetchUserFollowings, rx.MergeMapWithStandbyConcurrency(5))
    images := rx.MergeMap(followings, downloadUserProfileImages, rx.MergeMapWithStandbyConcurrency(10))

    writer, reader := rx.Pipe[[]byte]()
    images.Subscribe(writer)

    for {
        if image, ok := reader.Read(); ok {
            fmt.Printf("Image Size: %d Hash: %sn", len(image), hex.EncodeToString(SHA256(image)))
        } else {
            break
        }
    }

    if err := reader.Wait(); err != nil {
        panic(err)
    }
}

func SHA256(b []byte) []byte {
    m := sha256.Sum256(b)
    return m[:]
}
Вход в полноэкранный режим Выход из полноэкранного режима

Это целый пример программы, которую можно запустить (смотрите на GitHub). В этом примере мы используем функцию одновременного разворачивания оператора MergeMap для создания 5 рабочих и 10 рабочих соответственно для получения данных о пользователях GitHub, загрузки и хэширования изображений профилей пользователей.

Заключение

Мы объяснили, как мышление в терминах жизненного цикла может помочь нам писать масштабируемые и правильные асинхронные программы. Сегодня с помощью пакета Tomb и дженериков Go мы наконец-то можем получить библиотеку, которая поможет нам легко управлять и составлять жизненные циклы.

Библиотека все еще находится в ранней альфа-фазе, многие операторы еще предстоит реализовать. Вклад и предложения приветствуются!

Оставьте комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *