TLDR;
Трудно писать масштабируемые параллельные программы с большим количеством композиций async-операций, корректно обрабатывающих композиции Lifecycle с надлежащей поддержкой распространения ошибок и разрыва. Мы покажем эти ограничения на официальных асинхронных строительных блоках Go, затем обсудим неофициальные решения, и в конце концов представим экспериментальную реализацию Rx (gopkg.in/rx.v0), которая обеспечивает окончательную абстракцию.
Введение
Потенциал Go для работы с большим параллелизмом просто поразителен. Особенно он предлагает три интересные возможности — Go routine, каналы и Context. Я любил и ненавидел работу с ними, но есть и другие части языка, которые делают написание масштабируемого параллельного кода обременительным. В частности, из-за отсутствия generics я обнаружил, что пишу много похожего кода с просто разными типами данных.
Параллельные программы состоят из множества асинхронных операций. Масштабируемая система программирования для параллельных приложений должна уметь легко компоновать, оркестровать и планировать асинхронные операции.
Представление жизненного цикла
Мне нравится абстрагировать асинхронные операции с помощью жизненных циклов. Представление жизненного цикла асинхронной операции состоит из одного начала, одного конца, с потенциально от нуля до многих ценных входов и выходов, и нуля или одной точки прерывания либо ошибкой, либо отменой. Никаких ценных входов или выходов не может произойти до начала, после прерывания или после окончания.
Если говорят, что дочерний Жизненный цикл ограничен родительским Жизненным циклом, это означает:
- дочерний Жизненный цикл может начаться не раньше начала и не позже окончания родительского Жизненного цикла.
- дочерний Жизненный цикл может закончиться не позднее окончания родительского Жизненного цикла.
- дочерний Жизненный цикл должен быть прерван вместе с родительским Жизненным циклом.
Прежде чем мы сможем правильно говорить о композиции асинхронных операций, нам нужно понять, как мы можем представить асинхронные операции и их Lifecycle в коде, в Golang.
Для наглядности начнем с синхронной функции:
func plusOne(x int) int {
return x + 1
}
Эта операция начинается с входа в функциональный блок. В начале она принимает входной сигнал x
, производит с ним вычисления и выводит результат. Затем операция заканчивается на выходе из функционального блока. Мы видим, что у нее есть все признаки жизненного цикла, кроме прерывания. Поскольку это синхронная операция, мы никак не можем ее прервать. Но мы должны иметь возможность прервать асинхронную операцию.
Теперь давайте введем прерывание, преобразовав описанную выше операцию в асинхронную версию:
func plusOne(ctx context.Context, input <-chan int) chan<- int {
output := make(chan int)
go func() {
for {
select {
case x := <-input:
select {
case output <- x + 1:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
На этот раз мы использовали три интересные и мощные возможности Golang, которые являются основными строительными блоками для асинхронных операций: Go routine, каналы и Context.
Goroutines
Goroutines, вероятно, самая простая из этих трех функций. go f()
— это специальный синтаксис запуска Goroutines. Он означает запуск нового «потока», и выполнение функции f()
на этом «потоке». Таким образом, строки после нее могут продолжать выполняться, не дожидаясь завершения f()
. А f()
может продолжать выполняться до своего завершения даже после того, как функция, запускающая процедуру Go, вернулась. Итак, в этом примере мы создали выходную переменную, запустили Goroutine, чтобы сделать что-то в другом «потоке», и просто вернули выходную переменную.
Я взял «поток» в кавычки, потому что это не тот системный поток, который мы знали раньше, а нечто гораздо более легковесное. Мы не будем углубляться в детали этого. Вы можете просто концептуально считать их потоком для пользы этой статьи.
Каналы
С другой стороны, каналы — это то, что выглядит просто, но с большим количеством подвохов. Когда вы добавляете префикс chan
перед другим типом, вы получаете новый тип, который является каналом для этого типа. Вы создаете канал с помощью встроенной функции make(chan int)
. Каналы могут иметь внутренний размер буфера, но мы не будем рассматривать это в данной статье. К каналу можно применить три операции.
Первая — это передача значения через канал. Мы пишем c <- x
, чтобы обозначить отправку значения переменной x
через канал c
.
Второй — получение значения из канала. Мы пишем y = <- c
для обозначения получения значения из канала c
и установки его в переменную y
.
Третье — вы можете вызвать close(c)
на канале. Каждый канал имеет состояние либо открыт, либо закрыт. close(c)
изменит состояние канала c
с открытого на закрытое. Закрытый канал не может быть открыт снова, а вызов close(c)
на уже закрытом канале приведет к панике.
Когда канал открыт, операция отправки будет блокироваться до тех пор, пока операция получения не будет выполнена на другом конце, в другом «потоке»; а операция получения будет блокироваться до тех пор, пока операция отправки не будет выполнена на другом конце, в другом «потоке». Однако, если канал закрыт, все усложняется: операция отправки будет немедленно завершена с информацией о панике: panic: send on closed channel
; а операция получения будет немедленно завершена с возвратом пустого значения. Например, x = <-c
, где c
— закрытый канал chan int
, немедленно завершится с x
, установленным в пустое значение int 0
. Чтобы отличить полученное значение из открытого канала от закрытого, можно использовать x, ok = <-c
, чтобы одновременно получить значение канала и булево состояние ok
, где true
, если канал открыт, и false
, если канал закрыт.
Часто для создания одного сигнала переключения необходимо сначала создать канал, а затем закрыть его для переключения. Таким образом, до переключения любая операция получения на канале будет блокироваться, а после переключения канал станет закрытым, и все операции получения завершатся с пустым значением. И будущая операция получения также будет немедленно завершена пустым значением. Практическим примером такого одиночного сигнала переключения является сигнал прерывания, например, отмена.
Для небуферизованных каналов (а именно их мы используем в этой статье) отправка данных будет блокироваться вечно, если никто не получает данные, и наоборот. Успешная операция отправки должна совпадать с успешной операцией получения, иначе избыточная операционная сторона будет блокироваться (за некоторыми исключениями, описанными ниже).
Заявление select
.
Однако существует способ избежать блокировки при выполнении операций отправки и получения каналов:
select {
case c <- x:
default:
}
select {
case y = <- c:
default:
}
Таким образом, операция отправки или получения будет пропущена, если ее нельзя завершить немедленно.
Оператор select
с несколькими case
каналами ввода/вывода будет блокироваться до тех пор, пока ни один из этих case
не сможет завершиться. Если несколько case
могут завершиться одновременно, select
случайным образом выберет и выполнит один из них. Оператор select
с предложением default
попытается проверить, может ли какой-либо из case
быть завершен немедленно, и выполнит предложение default
, если таковое отсутствует.
Это означает, что мы можем использовать select
для блокировки нескольких операций ввода/вывода по каналу вместе, и позволить выполнить первую из них:
select {
case output <- x + 1:
case <-ctx.Done():
return
}
Эта часть кода будет блокировать операцию записи на канале output
, и операцию чтения на канале ctx.Done()
. Он будет работать до тех пор, пока ни один из этих двух случаев не завершится, и продолжит выполнение этого случая. В этом конкретном примере канал ctx.Done()
является источником прерывания. Значение, полученное от этого канала, означает, что операция Lifecycle была отменена.
Мы используем этот оператор select
как для получения значения из входного канала, так и для отправки результата в выходной канал, поскольку обе эти операции являются блокирующими, которые мы хотим прервать, если Жизненный цикл будет прерван.
Контекст Go
Сейчас самое время объяснить, что такое Go Context. Go Context — это попытка Golang представить некоторую часть жизненного цикла в качестве аргумента функции, обычно передаваемого в качестве первого параметра, каждой функции, выполняющей асинхронные операции. Контекст может регистрировать различные виды сигналов отмены и порождать при этом дочерние Контексты.
Например, родительский Контекст может породить дочерний Контекст с явной функцией отмены:
func parent(parentCtx context.Context) {
childCtx, cancel := context.WithCancel(parentCtx)
go func() {
<-time.NewTimer(time.Second).C
cancel()
}()
child(childCtx)
}
Жизненный цикл дочернего Контекста ограничен жизненным циклом его родительского Контекста. Это означает, что если жизненный цикл родительского контекста прерывается или завершается, жизненный цикл дочернего контекста перейдет в то же состояние. Кроме того, поскольку мы создали явный сигнал отмены — функцию cancel
, жизненный цикл дочернего контекста также может быть прерван вызовом cancel()
, и это прервет жизненный цикл дочернего контекста сигналом отмены — канал childCtx.Done()
будет закрыт, и все операции получения на этом канале завершатся с пустым значением. В данном примере мы порождаем Goroutine для отсчета 1 секунды, а затем вызываем cancel()
, чтобы прервать жизненный цикл дочернего контекста сигналом отмены.
Поскольку приведенный выше пример является распространенным случаем использования, в пакет Go Context включена еще одна вспомогательная функция для регистрации тайм-аута порожденного дочернего контекста:
childCtx, cancel := context.WithTimeout(parentCtx, time.Second)
Объяснив эти основные строительные блоки операций async, мы можем, наконец, обратиться к нашему предыдущему примеру кода async:
func plusOne(ctx context.Context, input <-chan int) chan<- int {
output := make(chan int)
go func() {
for {
select {
case x := <-input:
select {
case output <- x + 1:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
Жизненный цикл начинается при входе в функциональный блок, его дочерний жизненный цикл ограничен родительским жизненным циклом, начиная с входа в функциональный блок Go routine. Жизненный цикл потребляет входные значения и выдает выходные значения с помощью канальных операций на каналах input
и output
. Он получает прерывание отмены от получения канала ctx.Done()
, и окончательно завершается после возврата из подпрограммы Go.
Композиция асинхронных операций
Теперь мы наконец-то можем должным образом обсудить композицию асинхронных операций. Композиция двух асинхронных операций должна выполнять две задачи:
- Она должна правильно направить входные и выходные значения между двумя операциями, обычно одна из них является источником, а другая — получателем.
- Она должна управлять жизненными циклами двух операций и в результате генерировать составленный жизненный цикл.
Первую часть легко понять. Это то же самое, что и составление синхронных операций. Например:
func plusOne(x int) int {
return x + 1
}
func double(x int) int {
return x * 2
}
func plusOneThenDouble(x int) int {
return double(plusOne(x))
}
Здесь мы подключаем вход составленной функции к входу plusOne
. Затем подключаем выход из plusOne
к входу double
. Наконец, подключите выход double
в качестве выхода составленной функции.
Теперь давайте превратим plusOne
и double
в асинхронные операции. Мы уже написали plusOne
выше, теперь сделаем то же самое для double
:
func plusOne(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for {
select {
case x, ok := <-input:
if !ok {
return
}
select {
case output <- x + 1:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
func double(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for {
select {
case x, ok := <-input:
if !ok {
return
}
select {
case output <- x * 2:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
На этот раз мы также рассмотрим случай, когда каналы input
закрываются. Процедура Go завершится, если либо канал input
будет закрыт, либо если Context будет отменен. Таким образом, завершение операции может происходить в двух направлениях — операция источника, производящая входы для текущей операции, может завершить текущую операцию, а отмена Context, предоставленного потребителем, также может прервать и завершить текущую операцию. Мы также добавили defer close(output)
в процедуру Go, чтобы она всегда закрывала свой output
канал непосредственно перед завершением.
Теперь давайте попробуем сделать композицию из этих двух операций, удовлетворяющую двум нашим требованиям композиции асинхронных операций.
func plusOneThenDouble(ctx context.Context, input <-chan int) <-chan int {
return double(ctx, plusOne(ctx, input))
}
Как мы видим, для простой композиции асинхронных операций Go Context позволяет сделать это очень просто. Мы можем просто передать Context дочерним операциям, и их жизненный цикл будет ограничен переданным Context.
До сих пор все выглядело красиво и хорошо. Однако сейчас я покажу вам темную сторону медали. Речь идет в основном о двух вещах, которые обычно рассматриваются вместе — распространении ошибок и разрыве после прерывания.
Теперь рассмотрим другую композицию функций, которая связана с ошибкой:
func f(x, y int) (int, error) {
x = double(x)
y = plusOne(y)
if y == 0 {
err = errors.New("divide by zero")
}
return x / y, nil
}
Пытаться написать асинхронную версию этой композиции было бы муторно. Во-первых, мы не можем вернуть ошибку вместе с каналом output
, потому что реальная работа происходит в другом потоке, и мы не можем узнать об ошибке до того, как закончится вся работа в другом потоке. Если мы подождем, пока вся работа завершится, получим ошибку от другого потока и вернем ее, это будет означать, что вызывающая сторона не сможет начать получать результат до завершения всех работ. Это противоречит цели асинхронной работы — мы должны обеспечить способ сигнализировать и распространять ошибки во время фактической работы вне потока f()
.
// NOT A GOOD PATTERN, DON'T USE!
func f(ctx context.Context, x, y <-chan int) (<-chan int, <-chan error) {
output := make(chan int)
errorC := make(chan error)
x = double(ctx, x)
y = plusOne(ctx, y)
ctx, cancel := context.WithCancel(ctx)
go func() {
var err error
defer func() {
cancel()
close(output)
errorC <- err
close(errorC)
}()
for {
var ok bool
var a, b int
select {
case a, ok = <-x:
if !ok {
return
}
case <-ctx.Done():
return
}
select {
case b, ok = <-y:
if !ok {
return
}
case <-ctx.Done():
return
}
if y == 0 {
err = errors.New("divide by zero")
return
}
select {
case output <- a / b:
case <-ctx.Done():
return
}
}
}()
return output, errorC
}
Чтобы использовать эту составленную функцию, вызывающая сторона должна использовать возвращаемый канал ошибок:
// NOT A GOOD PATTERN, DON'T USE!
output, errorC := f(ctx, x, y)
for n := range output {
fmt.Println(n)
}
if err := <-errorC; err != nil {
panic(err)
}
Эта реализация сложна, давайте вникнем, чтобы понять, что мы сделали. Во-первых, возвращается два канала, один для вывода значения, а другой для вывода ошибки. Вывод значения может выдавать нулевое или несколько значений и закрываться. Выход ошибки будет выдавать одно значение и закрываться. Потребитель должен осушить оба канала, иначе асинхронная операция может заблокироваться навсегда без завершения.
Затем порождается дочерний Context с явным сигналом отмены. Дочерний Context отменяется в конце процедуры Go (в операторе defer
). Это означает, что он прервет дочерние асинхронные операции (double
и plusOne
), когда текущая операция завершится.
В теле основной функции есть цикл for
, который считывает пары значений из выходных каналов double
и plusOne
. Он получает каждое значение, проверяя прерывание родительского Контекста, и завершает свою работу в случае прерывания.
Затем он проверяет наличие ошибки деления на ноль, устанавливает ошибку и выходит из программы, если она обнаружена. В случае такой ошибки операторы defer
выполнят следующее: явно отменят дочерние жизненные циклы контекста, позволяя double
и plusOne
завершиться; закроют канал output
, чтобы потребитель больше не получал из него значения success; отправят значение err
в канал errorC
. Поскольку мы требуем, чтобы потребитель осушил оба канала output
и errorC
, эта операция отправки будет завершена. Наконец, он закрывает канал errorC
, чтобы потребитель больше не мог получать значения успеха.
Если ошибки нет, он отправит результат деления в канал output
, с возможным прерыванием Контекста.
Однако, это зашло слишком далеко и слишком сложно, чтобы отслеживать все, что нужно сделать, чтобы написать работающую и корректную композицию асинхронных операций с использованием этого паттерна. Более того, этот паттерн не масштабируется — если вы добавите канал ошибок к возвратам функций double
и plusOne
, и попытаетесь передать ошибки от дочерних операций к родительским, это будет кошмар. Я имею в виду, не пробуйте, но если вы это сделали, я хотел бы знать, как это прошло для вас.
Мы видели отмену, которая распространяется путем закрытия канала ctx.Done()
, но мы не видим, как ошибки могут распространяться на дочерние Жизненные циклы — лучшее, что мы можем сделать выше, это вручную cancel()
дочерних операций, и это не делает различия между ошибкой и отменой — они обе завершают дочерние Жизненные циклы. Но если мы хотим получить такое различие в дочерних жизненных циклах, то нет простого способа сделать это с помощью приведенного выше шаблона. Таким образом, это подытожило проблему распространения ошибок. Затем у нас есть проблема разрыва после прерывания. Если ваша асинхронная операция должна выполнить важную работу после прерывания, например, освободить некоторые важные ресурсы, до того, как она завершится, вы можете зарегистрировать их с помощью оператора defer
, но родительская операция не сможет определить, когда вы закончили. Если родитель хочет подождать, пока все дочерние операции освободят все ресурсы и завершатся, пока он не продолжит выполнение, нет масштабируемого способа сделать это с помощью приведенного выше шаблона. Лучшее, что мы можем сделать, это передать WaitGroup
и позволить каждой дочерней операции зарегистрировать в ней свою Go рутину, а в каждой дочерней Go рутине зарегистрировать wg.Done()
с оператором defer
после завершения работы. Затем родительская операция может вызвать wg.Wait()
, чтобы подождать, пока все дочерние операции завершат снос. Этот подход не масштабируется, поскольку WaitGroup
не может быть вложенной или разветвленной, что делает ее пригодной только на одном конкретном уровне операций для объединения всех дочерних операций. Любая операция дочернего уровня не имеет возможности подключиться и сделать то же самое.
Пакет Tomb
Очень хорошим решением этих проблем наивных асинхронных шаблонов в Go Context является пакет Tomb, написанный Густаво Нимейером. Объяснение и введение пакета можно найти в его блоге здесь. Или TLDR:
Модель проста: Tomb отслеживает, жива ли одна или несколько goroutines, умирает или мертва, а также причину смерти.
Эта модель идеально соответствует тому, что я описал как Lifecycle:
- жив — значит, после начала Жизненного цикла.
- умирающий означает, что Жизненный цикл был просто прерван, или если все управляемые им Go-процедуры завершены.
- мертвый означает, что Жизненный цикл был завершен без прерывания или закончил свое завершение после смерти и завершился.
- death reason означает ошибку, если таковая возникла во время выполнения операции, или распространяемую ошибку, если она была прервана.
Tomb — это улучшенная альтернатива Go Context. Она имеет интуитивно понятный интерфейс и комплексные инструменты управления жизненным циклом. Используя его, мы можем легче разрабатывать асинхронные операции с надлежащим управлением жизненным циклом и композицией. Давайте рассмотрим адаптированный пример:
func plusOne(t *tomb.Tomb, input <-chan int) <-chan int {
output := make(chan int)
t.Go(func() (err error) {
defer close(output)
for {
select {
case x, ok := <-input:
if !ok {
return
}
select {
case output <- x + 1:
case <-t.Dying():
return
}
case <-t.Dying():
return
}
}
})
return output
}
func double(t *tomb.Tomb, input <-chan int) <-chan int {
output := make(chan int)
t.Go(func() (err error) {
defer close(output)
for {
select {
case x, ok := <-input:
if !ok {
return
}
select {
case output <- x * 2:
case <-t.Dying():
return
}
case <-t.Dying():
return
}
}
})
return output
}
func f(t *tomb.Tomb, x, y <-chan int) <-chan int {
x = double(t, x)
y = plusOne(t, y)
output := make(chan int)
t.Go(func() (err error) {
defer close(output)
for {
var ok bool
var a, b int
select {
case a, ok = <-x:
if !ok {
return
}
case <-t.Dying():
return
}
select {
case b, ok = <-y:
if !ok {
return
}
case <-t.Dying():
return
}
if y == 0 {
err = errors.New("divide by zero")
return
}
select {
case output <- a / b:
case <-t.Dying():
return
}
}
})
return output
}
Он похож на наш предыдущий пример, но если присмотреться, то все три асинхронные операции имеют одну и ту же сигнатуру функции, где t *tomb.Tomb
является первым параметром. С заменой Go Context мы поступаем несколько иначе:
-
Вместо использования оператора языка
go f()
для запуска процедуры Go, мы используемt.Go(func() error { ... })
вместо этого. Это магическим образом свяжет жизненный цикл порожденной процедуры Go с жизненным циклом, представленным объектом Tombt
. Как это делается, мы объясним позже. -
Вместо использования
<-ctx.Done()
, мы используем<-t.Dying()
для прослушивания прерывания. Как мы кратко объяснили выше, умирание имеет специфическую семантику в Tomb, которая явно означает, что прерывание только что произошло, в отличие от умирания, которое означает, когда операция завершилась, либо нормально завершилась, либо завершилась после прерывания. В то время как в Go Context у нас нет такого различия. -
В комбинированной функции
f
мы больше не используем отдельный каналerrorC
для распространения ошибок. Объект Tomb заботится о распространении ошибок, поскольку это часть жизненного цикла операции. Когда вы порождаете завернутую в Tomb Go-рутину с помощьюt.Go(func() error { ... })
, ошибка, возвращаемая из функции рутины, будет проверена Tomb. Если она не равна nil, она будет рассматривать ее как ошибку и выдаст прерывание с этой ошибкой в жизненном цикле Tomb.
Когда Tomb будет прерван, он закроет свой канал t.Dying()
, делая все приемники канала завершенными. Гороутины, зарегистрированные в объекте Tomb, должны прослушивать этот канал t.Dying()
и переходить к завершению работы, как только получат сигнал о прерывании.
Tomb также можно прервать явно, вызвав t.Kill(err)
. При этом не будет проверяться, является ли переданная ошибка не nil. Он всегда будет прерывать жизненный цикл Tomb с переданным значением ошибки. Здесь принято считать, что если ошибка прерывания равна nil, то мы должны считать ее отменой, а если она не равна nil, то мы должны считать ее фактической ошибкой, распространяемой на этот жизненный цикл. Если прерывание происходит более одного раза, либо потому что несколько Goroutines вернули ошибки non-nil, либо потому что t.Kill(err)
вызывается несколько раз, только первое прерывание будет подтверждено, а остальные будут проигнорированы.
Объект Tomb сохраняет значение ошибки прерывания, т.е. причину смерти, и может быть запрошен командой t.Err()
.
На этом распространение ошибок в Tomb завершено. Теперь как Tomb узнает, что асинхронная операция, включая все ее дочерние Lifecycle, завершилась? Поскольку в данном шаблоне каждая порожденная Goroutine запускает свое завершение после получения сигнала о прерывании, Goroutine вернется только тогда, когда все ее завершение будет закончено. А поскольку мы регистрируем Go-рутину в обертке Tomb, Tomb отслеживает каждую Go-рутину, которую он помог породить, и когда все эти Go-рутины вернулись, объект Tomb может быть уверен, что больше нет никаких асинхронных операций или срывов, он переведет свое состояние в состояние dead и закроет свой канал t.Dead()
.
Tomb также предоставляет функцию err = t.Wait()
, которая будет блокироваться, пока t
не перейдет в состояние dead, и вернет причину смерти.
Используя Tomb, мы можем составлять асинхронные операции масштабируемым способом, особенно потому, что он имеет универсальную систему распространения ошибок, а также инструменты для отслеживания разрыва и реальной смерти дочерних операций.
Давайте немного оттолкнемся от этого, чтобы попробовать что-то более продвинутое и посмотреть, как Tomb может с этим справиться.
Fan-in и Fan-out
Давайте напишем асинхронную операцию, которая принимает на вход имена пользователей Twitter и для каждого пользователя получает первые 10 твитов и загружает изображения в этих твитах. У нас может быть 5 параллельных рабочих, загружающих твиты пользователей, и еще 10 параллельных рабочих, загружающих изображения. Мы проигнорируем соответствующую интеграцию с API Twitter и немного упростим ситуацию, предположив, что у нас уже есть пара рабочих функций, работающих с API Twitter.
func BindTomb(parent *tomb.Tomb, child *tomb.Tomb, teardown func() error) {
parent.Go(func() (err error) {
select {
case <-child.Dying():
err = child.Wait()
case <-t.Dying():
child.Kill(t.Err())
}
if teardown != nil {
parent.Go(teardown)
}
return
})
}
func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
tweets := make(chan *Tweet)
images := make(chan *Image)
tweetT := new(tomb.Tomb)
imageT := new(tomb.Tomb)
for i := 0; i < 5; i++ {
tweetT.Go(func() (err error) {
for {
var ok bool
var user string
var tweet *Tweet
select {
case user, ok = <-users:
if !ok {
return
}
case tweetT.Dying():
return
}
if tweet, err = FetchUserTop10Tweets(tweetT, user); err != nil {
return
}
select {
case tweets <- tweet:
case tweetT.Dying():
return
}
}
})
}
for i := 0; i < 10; i++ {
imageT.Go(func() (err error) {
for {
var ok bool
var tweet *Tweet
var image *Image
select {
case tweet, ok = <-tweets:
if !ok {
return
}
case imageT.Dying():
return
}
if image, err = DownloadTweetImage(imageT, tweet); err != nil {
return
}
select {
case images <- image:
case imageT.Dying():
return
}
}
})
}
BnidTomb(t, tweetT, func() (err error) {
close(tweets)
return
})
BnidTomb(t, imageT, func() (err error) {
close(images)
return
})
return images
}
Поскольку мы порождаем две группы параллельных рабочих, мы хотим иметь тонкий контроль над их жизненным циклом как двух разных групп — мы хотим закрыть их выходной канал после завершения работы каждой группы (смерти). Чтобы обеспечить такой уровень контроля, мы создаем два объекта Tomb, tweetT
и imageT
, для управления жизненными циклами двух рабочих групп.
Затем мы написали магическую функцию BindTomb
для связывания дочернего объекта Tomb с родительским объектом Tomb, чтобы жизненный цикл дочернего Tomb был ограничен жизненным циклом родительского Tomb. Эта функция делает 4 вещи:
- Распространяет причину смерти дочернего Lifecycle на родительский Lifecycle.
- Она передает прерывание из родительского Lifecycle в дочерний Lifecycle.
- Он продлевает окончание родительского жизненного цикла до окончания дочернего жизненного цикла.
- Она дает возможность выполнить дополнительное завершение после окончания дочернего Жизненного цикла.
Мы используем эту функцию для привязки жизненного цикла томбов tweetT
и imageT
к жизненному циклу родительского томба t
, и регистрируем дополнительное разрушение, которое закрывает их выходной канал после окончания жизненного цикла их рабочей группы.
Несмотря на то, что теперь мы можем писать корректные и масштабируемые композиции операций async для таких сложных случаев, как этот, мы все еще сталкиваемся с тем, что снова и снова переписываем множество шаблонных кодов. И из-за отсутствия generics многие из этих кодов не могут быть абстрагированы с помощью вызовов функций.
Generics
Недавно, после выхода стабильного релиза Go 1.18, Go наконец-то получил возможность делать дженерики. Давайте попробуем упростить шаблонные коды в приведенном выше примере:
func Receive[T any](t *tomb.Tomb, c <-chan T) (value T, ok bool) {
select {
case value, ok = <-c:
case t.Dying():
}
return
}
func Send[T any](t *tomb.Tomb, c chan<- T, value T) bool {
select {
case c <- value:
return true
case t.Dying():
}
return false
}
func MakeChildTombAndChan[T any](t *tomb.Tomb) (childT *tomb.Tomb, childC chan T) {
childC = make(chan T)
childT = new(tomb.Tomb)
t.Go(func() (err error) {
select {
case <-childT.Dead():
err = childT.Err()
case <-t.Dying():
childT.Kill(t.Err())
}
close(childC)
return
})
}
func FanOut[T any, S any](t *tomb.Tomb, workers int, inputs <-chan T, outputs chan<- S, project func(*tomb.Tomb, T) (S, error)) {
for i := 0; i < 5; i++ {
t.Go(func() (err error) {
for {
var ok bool
var input T
var output S
if input, ok = Receive(t, inputs); !ok {
return
}
if output, err = project(t, input); err != nil {
return
}
if !Send(t, outputs, output) {
return
}
}
})
}
}
func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
tweetT, tweets := MakeChildTombAndChan[*Tweet](t)
imageT, images := MakeChildTombAndChan[*Image](t)
FanOut(tweetT, 5, users, tweets, FetchUserTop10Tweets)
FanOut(imageT, 10, tweets, images, DownloadTweetImage)
return images
}
В итоге мы получим нечто гораздо более простое. Все абстрактные вспомогательные функции могут быть повторно использованы для того же шаблона, а в основной функции compose осталось всего несколько строк кода.
ReactiveX (Rx)
Однако мы делаем это только для паттерна Fan-in & Fan-out. Но существует множество других паттернов асинхронной композиции. Если мы хотим написать множество других паттернов в нескольких строках кода, нам понадобится библиотека для абстрагирования этих шаблонов для нас, паттерн за паттерном.
К счастью, существует библиотека ReactiveX, или просто Rx, не зависящая от языка, в которой есть не только абстракция для Lifecycle, но и коллекция общих композиций асинхронных операций, которые имеют семантические определения того, как компоновать их Lifecycle.
Поскольку большинство реализаций Rx сильно зависят от использования generics, существующие реализации Rx в Golang либо не очень хорошо используют систему типов Golang (повсеместно используя interface{}
), либо используют устаревший или неофициальный дизайн generics, который несовместим с сегодняшним релизом Go 1.18 generics.
Поэтому, почему бы нам не попробовать построить нашу собственную реализацию Rx с использованием дженериков Go 1.18?
Если вы хотите посмотреть на конечный результат, перейдите по адресу https://github.com/go-rx/rx. Но имейте в виду, что это экспериментальный проект, который я просто собрал за выходные.
ReactiveX (Rx) — это модель программирования Observable. Она определяет Observable как операцию, которая выводит поток значений. Жизненный цикл Observable начинается, когда некий «подписчик» вызывает его метод Subscribe()
. Операция может посылать выходные значения подписчику, может сигнализировать подписчику об ошибке, может слушать прерывание от подписчика, а после прерывания она завершается.
Мы можем определить Observable как интерфейс, на который можно «подписаться»:
type Observable[T any] interface {
Subscribe(subscriber Writer[T])
}
Здесь мы используем общий параметр типа T
для указания типа значения, которое будет выводить Observable. Мы видим, что subscriber
имеет тип Writer[T]
. Давайте посмотрим определение типа для Write[T]
и сопутствующего ему типа Reader[T]
:
type Writer[T any] interface {
Lifecycle
Write(T) bool
}
type Reader[T any] interface {
Lifecycle
Read() (T, bool)
}
type Lifecycle interface {
Kill(error)
Dead() <-chan struct{}
Dying() <-chan struct{}
Wait() error
Go(func() error)
Err() error
Alive() bool
Context(context.Context) context.Context
}
Типы Writer[T]
и Reader[T]
представляют собой комбинацию канала Go и средств управления жизненным циклом, заимствованных из семантики Tomb. Несмотря на то, что базовая реализация может использовать канал Go, мы определили функции Read()
и Write
вместо того, чтобы напрямую открывать канал Go по нескольким причинам:
- Мы не хотим закрывать каналы. Трудно правильно закрыть канал Go, особенно когда в нем могут происходить операции отправки и получения. Если мы закроем канал в то время, когда кто-то отправляет в него данные, вся программа Go запаникует. Поэтому мы не хотим использовать закрытие канала как индикатор завершения потока.
- Мы хотим, чтобы основной канал был ограничен своим жизненным циклом. Это означает, что
ok := Writer[T].Write(value)
иvalue, ok := Reader[T].Read()
завершатся сok == true
, если значение прошло через нижележащий канал, или сok == false
, если их Lifecycle умирает. - Мы хотим, чтобы пользователи библиотеки думали в терминах жизненных циклов, а не основных строительных блоков async. Такая абстракция скрывает эти детали.
Этот дизайн интерфейса немного отличается от обычных реализаций Rx, но давайте посмотрим, как он может помочь в построении паттернов композиции асинхронных операций.
Давайте начнем с чего-то простого — напишем временной тикер как Observable
.
func Ticker(period time.Duration) Observable[int] {
return Func(func(subscriber Writer[int]) (err error) {
if period == 0 {
return
}
count := 0
ticker := time.NewTicker(period)
for {
select {
case <-ticker.C:
if !subscriber.Write(count) {
return
}
count++
case <-subscriber.Dying():
ticker.Stop()
return
}
}
})
}
В этом примере мы хотим продемонстрировать помощник Func
. Он берет функцию и превращает ее в Observable. Реализация довольно проста:
func Func[T any](f func(subscriber Writer[T]) error) Observable[T] {
return functionObservable[T](f)
}
type functionObservable[T any] func(subscriber Writer[T]) error
func (o functionObservable[T]) Subscribe(subscriber Writer[T]) {
subscriber.Go(func() error {
return o(subscriber)
})
}
Когда возвращаемая Observable подписана, предоставленная функция будет запущена на Go-рутине, которая привязана к Lifecycle подписчика. Это означает, что все блокирующие операции во внутренней функции будут асинхронными по отношению к подписчику.
В приведенном выше примере Ticker есть внутренняя функция, которая блокирует Ticker и отправляет значения подписчику. Это базовый пример создания Observables.
Затем мы можем написать общие шаблоны комбинации асинхронных операций в виде операторов Observable, которые принимают одну или несколько Observable и выполняют комбинацию и преобразование над ними, в результате чего получается новая Observable. Давайте рассмотрим, как можно написать простой оператор Observable Map
:
func Map[T any, S any](source Observable[T], project func(T) (S, error)) Observable[S] {
return Func(func(subscriber Writer[S]) (err error) {
writer, reader := Pipe(PipeWithParentLifecycle[T](subscriber))
source.Subscribe(writer)
for {
if input, ok := reader.Read(); ok {
var output S
if output, err = project(input); err != nil {
return
}
if !subscriber.Write(output) {
return
}
} else {
return
}
}
})
}
Здесь мы представим еще одну служебную функцию Pipe
, которая создаст пару writer
и reader
, которые разделяют один и тот же жизненный цикл и лежащий в основе канал значений. Это означает, что если вы записываете
в писатель
, вы можете читать
значение из читателя
. В данном случае опция PipeWithParentLifecycle[T](subscriber)
сделала новый Жизненный цикл трубы дочерним Жизненным циклом родительского Жизненного цикла — subscriber
. Это означает, что жизненный цикл писателя
и читателя
ограничен жизненным циклом подписчика
.
Нет, давайте посмотрим на нечто более продвинутое — SwitchMap
:
func SwitchMap[T any, S any](source Observable[T], project func(T) Observable[S]) Observable[S] {
return Func(func(subscriber Writer[S]) (err error) {
outerWriter, outerReader := Pipe(PipeWithParentLifecycle[T](subscriber))
source.Subscribe(outerWriter)
var innerLifecycle Lifecycle
for {
var ok bool
var outerValue T
var innerObservable Observable[S]
if outerValue, ok = outerReader.Read(); !ok {
if innerLifecycle != nil {
innerLifecycle.Wait()
}
return
}
if innerLifecycle != nil {
innerLifecycle.Kill(nil)
}
innerObservable = project(outerValue)
innerWriter, innerReader := Pipe(PipeWithParentLifecycle[S](subscriber))
subscriber.Go(func() (err error) {
for {
if innerValue, ok := innerReader.Read(); ok {
// forward inner observable value to outter subscriber
if !subscriber.Write(innerValue) {
return
}
} else {
return
}
}
})
innerLifecycle = innerReader
innerObservable.Subscribe(innerWriter)
}
})
}
SwitchMap будет управлять innerLifecycle
, который является подпиской на результат функции project
. Каждый раз, когда он получает значение от внешней Observable источника, он project
это значение во внутреннюю Observable и Subscribe на нее, пересылая значения из внутренней Observable во внешнюю subscriber
. Если в то время, когда у нас была внутренняя подписка, из внешней исходной Observable поступит какое-либо последующее значение, мы уничтожим внутреннюю подписку и подпишемся на новую внутреннюю Observable, спроецированную на новое внешнее значение.
Давайте воспользуемся этой библиотекой Rx и напишем что-нибудь более практичное.
package main
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"net/http"
"time"
"github.com/google/go-github/v43/github"
"golang.org/x/time/rate"
"gopkg.in/rx.v0"
)
func main() {
client := github.NewClient(nil)
rl := rate.NewLimiter(rate.Every(time.Second), 1)
fetchTopContributors := func(owner string, repository string) rx.Observable[*github.Contributor] {
return rx.Defer(func(subscriber rx.Lifecycle) rx.Observable[*github.Contributor] {
ctx := subscriber.Context(nil)
rl.Wait(ctx)
contribs, _, err := client.Repositories.ListContributors(ctx, owner, repository, nil)
if err != nil {
return rx.Error[*github.Contributor](err)
}
return rx.List(contribs)
})
}
fetchUserFollowings := func(contrib *github.Contributor) rx.Observable[*github.User] {
return rx.Func(func(subscriber rx.Writer[*github.User]) (err error) {
if contrib.Login == nil {
return
}
ctx := subscriber.Context(nil)
rl.Wait(ctx)
followings, _, err := client.Users.ListFollowing(ctx, *contrib.Login, nil)
if err != nil {
return
}
for _, following := range followings {
if !subscriber.Write(following) {
return
}
}
return
})
}
downloadUserProfileImages := func(user *github.User) rx.Observable[[]byte] {
return rx.Func(func(subscriber rx.Writer[[]byte]) (err error) {
if user.AvatarURL == nil {
return
}
resp, err := http.Get(*user.AvatarURL)
if err != nil {
return
}
defer resp.Body.Close()
image, err := io.ReadAll(resp.Body)
if err != nil {
return
}
if !subscriber.Write(image) {
return
}
return
})
}
contribs := fetchTopContributors("ReactiveX", "rxjs")
followings := rx.MergeMap(contribs, fetchUserFollowings, rx.MergeMapWithStandbyConcurrency(5))
images := rx.MergeMap(followings, downloadUserProfileImages, rx.MergeMapWithStandbyConcurrency(10))
writer, reader := rx.Pipe[[]byte]()
images.Subscribe(writer)
for {
if image, ok := reader.Read(); ok {
fmt.Printf("Image Size: %d Hash: %sn", len(image), hex.EncodeToString(SHA256(image)))
} else {
break
}
}
if err := reader.Wait(); err != nil {
panic(err)
}
}
func SHA256(b []byte) []byte {
m := sha256.Sum256(b)
return m[:]
}
Это целый пример программы, которую можно запустить (смотрите на GitHub). В этом примере мы используем функцию одновременного разворачивания оператора MergeMap для создания 5 рабочих и 10 рабочих соответственно для получения данных о пользователях GitHub, загрузки и хэширования изображений профилей пользователей.
Заключение
Мы объяснили, как мышление в терминах жизненного цикла может помочь нам писать масштабируемые и правильные асинхронные программы. Сегодня с помощью пакета Tomb и дженериков Go мы наконец-то можем получить библиотеку, которая поможет нам легко управлять и составлять жизненные циклы.
Библиотека все еще находится в ранней альфа-фазе, многие операторы еще предстоит реализовать. Вклад и предложения приветствуются!