Design simplu al Kotlin Flow

Flux de Grant Tarrant

Într-o poveste anterioară „Fluxuri reci, canale fierbinți”, am definit fluxurile de date reci și fierbinți și am arătat un caz de utilizare pentru Kotlin Flows - fluxuri asincrone reci. Acum aruncăm o privire sub capotă, examinăm designul lor și vedem cum o combinație de caracteristici de limbaj și o bibliotecă permite o abstractizare puternică cu un design simplu.

Un flux în Kotlin este reprezentat de o interfață²:

interfață Flux  {
    suspend fun collection (colecționar: FlowCollector )
}

Tot ce există la un flux este o singură funcție de colectare care acceptă o instanță a interfeței FlowCollector cu o singură metodă de emisie:

interfață FlowCollector <în T> {
    suspenda emisia distractivă (valoare: T)
}

Un nume de emisiune ar trebui să sune familiar unui cititor al „fluxurilor reci, canalelor fierbinți”. Într-adevăr, am arătat un exemplu al următoarei definiții a fluxului:

valuri int: flux  = flux {
    pentru (eu în 1..10) {
        întârziere (100)
        emite (i) // <- emite se numește aici
    }
}

Semnătura constructorului de flux folosește, de asemenea, o interfață FlowCollector ca receptor, astfel încât să putem emite direct din corpul lambda corespunzător:

flux distractiv  (bloc: suspendare FlowCollector . () -> Unit): Flux 

Pentru o utilizare simplă a unui flux, atunci când fluxul este colectat, astfel:

ints.collect {println (it)} // durează 1 secundă, tipărește 10 int

ceea ce se întâmplă este că o instanță a FlowCollector este creată pe baza lambda trecută pentru a colecta funcția {...} și tocmai această instanță este trecută la fluxul {...} body⁴.

Astfel, o interacțiune între un emițător de flux și un colector de flux este cel al unui apel funcțional simplu - un apel al funcției de emisie. Dacă încordăm mental acest apel funcțional, putem înțelege imediat ce se întâmplă atunci când rulăm acest cod⁵ - va fi echivalent cu:

pentru (eu în 1..10) {
    întârziere (100)
    println (i) // <- emit a fost apelat aici
}

operatorii

Un constructor de flux și un operator de terminal de colectare este tot ce trebuie să știm pentru a începe să scrieți operatori care transformă fluxurile într-o varietate de moduri. De exemplu, un operator de bază de hartă care aplică o transformare specificată la fiecare valoare emisă poate fi implementat astfel:

distracție  Flux  .map (transformare: suspendare (valoare: T) -> R) = flux {
    collect {emit (transform (it))}
}

Folosind acest operator putem face acum ints.map {it * it} pentru a defini un flux cu pătrate ale numărului întreg original. Elementele curg în continuare de la emițător la colector prin apeluri funcționale. Există pur și simplu o altă funcție între acum.

De fapt, biblioteca kotlinx.coroutines definește deja harta și o serie de alți operatori de scop general ca extensii pe tipul Flow, urmând abordarea de design orientată pe extensie Ceea ce este important în acest proiect, este faptul că este destul de ușor de definit operatori specifici domeniului. Nu există nicio distincție între operatorii „integrați” și „definiți de utilizator” - toți operatorii sunt de primă clasă.

Înapoi presiune

Contrapresiunea în inginerie software este definită ca capacitatea unui consumator de date care nu poate ține pasul cu datele de intrare de a trimite un semnal către producătorul de date pentru a încetini rata elementelor de date.

Proiectarea fluxurilor reactive tradiționale implică un canal înapoi pentru a solicita mai multe date de la producători, după cum este necesar. Gestionarea acestui protocol de solicitare duce la implementări notoriu dificile, chiar și pentru operatori simpli. Nu vedem nicio complexitate în proiectarea fluxurilor Kotlin și nici în implementarea operatorilor pentru aceștia, cu toate acestea fluxurile Kotlin susțin contrapresiunea. Cum se face?

Gestionarea transparentă a contrapresiunii se realizează în fluxurile Kotlin prin utilizarea funcțiilor de suspendare Kotlin. Este posibil să fi observat că toate funcțiile și tipurile funcționale în proiectarea fluxului Kotlin sunt marcate cu modificator de suspendare - aceste funcții au o super-putere pentru a suspenda executarea apelantului fără a bloca un thread⁹. Deci, atunci când colectorul fluxului este copleșit, poate pur și simplu suspenda emițătorul și îl poate relua ulterior, atunci când este gata să accepte mai multe elemente.

Acest lucru este destul de similar cu gestionarea contrapresiunii în conductele de date sincrone tradiționale bazate pe fir, unde un consumator lent aplică automat contrapresiunea asupra producătorului, în virtutea blocării firului producătorului. Funcțiile de suspendare o duc dincolo de un singur fir și pe tărâmul programării asincrone, prin gestionarea transparentă a contrapresiunii pe fire, fără a le bloca. Dar asta trebuie spus într-o altă poveste.

Citire ulterioară și note de subsol

  1. ^ Fluxuri reci, canale calde
  2. ^ Fluxul și tipurile și funcțiile conexe sunt încă în previzualizare începând cu versiunea 1.2.1 a bibliotecii kotlinx.coroutines. Citiți mai multe aici.
  3. ^ Tipuri de funcții în Kotlin
  4. ^ Aceasta este o ușoară simplificare. Nu ține cont de verificări suplimentare pentru a asigura păstrarea contextului, dar acest subiect nu se încadrează în sfera acestei povești. Mai multe detalii în contextul de execuție al fluxurilor Kotlin.
  5. ^ Puteți rula acest cod prin Kotlin Playground aici.
  6. ^ Proiectare orientată către extensie
  7. ^ Curentele reactive
  8. ^ Implementarea operatorilor pentru [RxJava] 2.0
  9. ^ Blocarea firelor, suspendarea coroutinelor