Tuesday 10 October 2017

Rx Moving Average


I finansdomenet trenger vi vanligvis å beregne samlet verdi for flyttingsvinduet fra en strøm av tidsseriedata, bruk glidende gjennomsnitt som et eksempel, si at vi har følgende datastrøm T er tidsstempel og V er den faktiske vlauen. For å beregne et glidende gjennomsnitt 3 fra strømmen vi får. For å beregne det bevegelige gjennomsnittet, virker det som om vi kunne gjøre det ved å bygge en observerbar fra den opprinnelige strømmen. bygge en observerbar fra den opprinnelige strømmen ved å aggregere verdiene i grupper. bruke aggregatoperatøren å beregne de endelige resultatene fra Observable i trinn 2.Step 1 og 3 er trivielt å implementere, men for trinn 2 virker det som om nåværende RxJava ikke har innebygd operatør til å produsere flyttbare vinduer grupper, ser vinduet gruppen ikke til operatør passe i dette tilfellet, og jeg fant ikke en enkel måte å komponere en løsning fra eksisterende operatører på. Kan noen foreslå hvordan dette gjøres i RxJava på en elegant måte. Skrevet 28. desember kl. 13 6. Jeg gjør det slik. Jeg bruker vindu som utsender observables, hvilke onl du bruker en konstant mengde minne og ikke buffer som avgir Lister, som bruker minne for hver av elementet. Dette er et eksempel på hvordan du kan bruke kombinatoroperatører i stedet for å skrive egne looper, noe som du alltid bør vurdere når du bruker Observables. Update Hvis du vil filtrere ut vinduene på slutten av strømmen som har mindre enn n elementer, kan du gjøre det slik. Jeg valgte Scala fordi det er kortere å skrive, men i Java kan du gjøre det samme, bare vær oppmerksom på at Scala s foldLeft kalles redusere i Java. answered 4. januar kl. 22 20.Praktisk Rx-opplæring London 6-7 oktober 2015. Presentert av forfatteren av. PART 1 - Komme i gang. Brukerne forventer realtidsdata De vil ha sine tweets nå Deres ordre er bekreftet nå De trenger priser nøyaktige fra nå. De online spillene må være responsive. Som utvikler krever du brann-og - glem meldinger Du vil ikke bli blokkert og venter på et resultat. Du vil ha resultatet trykket til deg når det er klart. Enda bedre, når du arbeider med resultatsett, vil du motta individuelle resultater når de er klare. Du vil ikke vent på at hele settet skal behandles før du ser den første raden Verden har flyttet for å presse brukere venter på oss for å fange opp Utviklere har verktøy for å presse data, dette er enkelt Utviklere trenger verktøy for å reagere på push data. Welcome to Reactive Extensions for Rx Denne boken er rettet mot enhver utvikling r nysgjerrig på IObservable T og IObserver T grensesnittene som har dukket opp i 4 De reaktive utvidelsesbiblioteker fra Microsoft er implementeringen av disse grensesnittene som raskt plukker opp trekkraft med både server, klient og web-utviklere Rx er et kraftig produktivt utviklingsverktøy Rx gjør det mulig for utviklere å løse problemer i en elegant, kjent og deklarativ stil, ofte avgjørende med mindre kode enn det var mulig uten Rx. Ved å utnytte LINQ, får Rx å skryte av de vanlige fordelene ved en LINQ-implementering. 1.Integrated LINQ er integrert i C-språket Unitive Using LINQ gir deg mulighet til å utnytte dine eksisterende ferdigheter for å spørre data i ro LINQ til SQL, LINQ til XML eller LINQ til objekter for å spørre data i bevegelse Du kan tenke på Rx som LINQ til hendelser LINQ lar deg overgå fra andre paradigmer til et felles paradigme For eksempel kan du overføre en standardhendelse, et asynkront metallsamtal, en oppgave eller en tredjeparts mellomvare-API til et enkelt samarbeid mmon Rx-paradigme Ved å utnytte vårt eksisterende språk og bruke kjente operatører som Velg Where GroupBy etc, kan utviklere rationalisere og formidle design eller kode i en felles form Extensible Du kan utvide Rx med dine egne tilpassede spørringsoperatørutvidelsesmetoder. Deklarativ LINQ tillater koden din å lese som en erklæring om hva koden din gjør og etterlater hvordan implementeringen av operatørene Komponenter som LINQ-funksjoner, for eksempel utvidelsesmetoder, lambda-syntaks og spørringsbegrepssyntax, gir en flytende API for utviklere å konsumere. Spørsmål kan bygges med mange operatører Spørsmål kan da bli sammensatt for å videreutvikle sammensatte spørringer. Transformative spørringer kan omdanne data fra en type til en annen En spørring kan oversette en enkelt verdi til en annen verdi, samlet fra en sekvens av verdier til en enkelt gjennomsnittsverdi eller utvide en enkelt data Verdien inn i en sekvens av verdier. Når er Rx passende. Rx tilbyr et naturlig paradigme for håndtere sekvenser av hendelser En sekvens kan inneholde null eller flere hendelser Rx viser seg å være mest verdifulle når man komponerer sekvenser av hendelser. Skal bruke Rx. Managing hendelser som disse er hva Rx ble bygget for. UI hendelser som musen flytte, klikk click. Domain hendelser som eiendom endret, samling oppdatert, ordre fylt, registrering akseptert etc. Infrastructure hendelser som fra fil watcher, system og WMI events. Integration hendelser som en kringkasting fra en meldingsbuss eller en push hendelse fra WebSockets API eller annen lav latency middleware som Nirvana. Integrasjon med en CEP-motor som StreamInsight eller StreamBase. Interessant Microsofts CEP-produkt StreamInsight som er en del av SQL Server-familien, bruker også LINQ til å bygge spørringer over streaming hendelser av data. Rx er også godt egnet for å introdusere og administrere samtidighet for Formålet med å laste ut Det er å utføre et gitt sett med arbeid samtidig for å frigjøre gjeldende tråd. En svært populær bruk av dette er å opprettholde et svar ve UI. You bør vurdere å bruke Rx hvis du har en eksisterende IEnumerable T som forsøker å modellere data i bevegelse Mens IEnumerable T kan modellere data i bevegelse ved å bruke lat evaluering som avkastningsavkastning, har det sannsynligvis vunnet t-skala Iterating over en IEnumerable T vil forbruker blokkere en tråd Du bør enten favorisere den ikke-blokkerende naturen til Rx via enten IObservable T eller vurdere async-funksjonene i 4 5.Kan bruke Rx. Rx kan også brukes til asynkrone samtaler. Dette er effektive sekvenser av en hendelse. Resultat av en oppgave eller oppgave T. Resultat av en APM-metallsamtale som FileStream BeginRead EndRead. You kan finne det brukbare TPL-, Dataflow - eller async-søkeordet 4 5 viser seg å være en mer naturlig måte å komponere asynkron metoder på Mens Rx kan definitivt hjelpe med disse scenariene, hvis det finnes andre mer hensiktsmessige rammer til din disposisjon, bør du vurdere dem først. Rx kan brukes, men er mindre egnet for å introdusere og administrere samtidighet for å skalere eller utføre parallell comp utgaver Andre dedikerte rammer som TPL Oppgave Parallel Library eller C AMP er mer hensiktsmessige for å utføre parallell beregning intensivt arbeid. Rx og spesielt IObservable T er ikke en erstatning for IEnumerable TI vil ikke anbefale å prøve å ta noe som er naturlig trekkbasert og tvinge det til å være push based. Translating eksisterende IEnumerable T verdier til IObservable T bare slik at koden base kan være mer Rx. Message køer Køer som i MSMQ eller en JMS implementering har generelt transactionality og er per definisjon sekvensiell Jeg føler IEnumerable T er en naturlig passform for her. Ved å velge det beste verktøyet for jobben, bør koden din være enklere å vedlikeholde, gi bedre ytelse, og du får sannsynligvis bedre støtte. Rx i action. Adopting and learning Rx kan være en iterativ tilnærming der du sakte kan bruke den til din infrastruktur og domene På kort tid bør du kunne ha ferdigheter til å produsere kode, eller redusere eksisterende kode, til spørsmål som består av enkle opp erators For eksempel er denne enkle ViewModel alt jeg trengte å kode for å integrere et søk som skal utføres som en bruker types. Public class MemberSearchViewModel INotifyPropertyChanged. While denne kodebiten er ganske liten, støtter den følgende krav. Opprettholder en responsiv UI. Supports timeouts. Knows når søket er complete. Allows resultater å komme tilbake en om gangen. Handler feil. Det er enhet testbar, selv med concurrency concerns. If en bruker endrer søket, avbryter nåværende søk og utføre nytt søk med ny tekst. For å produsere denne prøven er det nesten et tilfelle å komponere operatørene som samsvarer med kravene i en enkelt spørring. Spørringen er liten, vedlikeholdsbar, deklarativ og langt mindre kode enn å rulle egen. Det er den ekstra fordelen ved å gjenbruke en velprøvd API Den mindre koden du må skrive, jo mindre kode du må teste, feilsøke og vedlikeholde Opprette andre spørsmål som følgende er simple. calculating et glidende gjennomsnitt av en rekke verdier, for eksempel service lev el-avtaler for gjennomsnittlige ventetider eller downtimebining hendelsesdata fra flere kilder, for eksempel søkeresultater fra Bing, Google og Yahoo, eller sensordata fra Accelerometer, Gyro, Magnetometer eller temperature. grouping data, for eksempel tweets etter emne eller bruker, eller aksjekurser ved delta eller likviditet. filtrering data, for eksempel online spill servere i en region, for et bestemt spill eller med et minimum antall deltakere. Push er her Arming deg selv med Rx er en effektiv måte å møte brukerens forventninger til en push verden ved å forstå og komponere de bestanddeler av Rx vil du være i stand til å gjøre kort arbeid med kompleksitet ved behandling av innkommende hendelser. Rx er satt til å bli en daglig del av kodingsopplevelsen. Ytterligere anbefalt lesing. Le Campbell Introduksjon til Rx Kindle-utgaven 2012. Sekvenser av tilfeldighet. av delen av denne boken har nå tatt opp et ganske uavhengig konsept Nøkkeltyper, Livstidsstyring, Anamorfisme, Katamorfisme, Binding, Flow Control, Planlegging og Testing I N tidligere kapitler har vi vurdert å kombinere flere sekvenser med operatører som SelectMear Merge Zip Concat osv. Disse operatørene er litt enklere enn operatørene i dette kapittelet. Dette siste kapitlet ser ut til å kombinere mange av disse emnene, slik at vi kan være i stand til å manipulere samtidige sekvenser, eller tilfeller av tilfeldighet. Vi kan forstå hendelser som har tid til å være vinduer. Eksempler er at serveren er opp, en person er i et rom, knappen er presset og ikke utgitt. Disse kan omformuleres som for dette tidsvinduet, serveren var oppe Når vinduene overlapper, sies de å falle sammen i tid. Buffer revisted. Buffer er ikke en ny operatør til oss, men det kan nå bli konseptuelt gruppert med vinduets operatører. Disse operatørene gjør alt sammen med en sekvens og et vindu av tid Hver operatør vil åpne et vindu når kildesekvensen gir en verdi Måten vinduet er stengt og hvilke verdier som er utsatt, er hovedforskjellen mellom hver operatør La vi bare gjenskape den interne arbeidet til den gamle vennbufferen, og se hvordan dette kartlegger konseptet med tidsvinduer. Buffer vil opprette et vindu når den første verdien blir produsert. Det vil da sette verdien inn i en intern cache. Vinduet vil bli Åpne til verdien av verdier er nådd Hver av disse verdiene vil bli cachet Nå som tellingen er nådd, lukkes vinduet og cachen blir OnNext utgitt som en IList T Når den neste verdien produseres fra kilden, cachen blir ryddet og vi starter igjen Dette betyr at buffer vil ta en IObservable T og returnere en IObservable IList T. Example Buffer med teller på 3. I dette marmorediagrammet har jeg representert listen over verdier som blir returnert på et tidspunkt som en kolonne med data, dvs. verdiene 0, 1 2 returneres alle i den første bufferen. Utenfor dette er det ikke mye av et sprang for å forstå Buffer med tiden I stedet for å bestå en telle passerer vi TimeSpan Lukkingen av vinduet og derfor buffer s cache er nå diktert av tid i stedet for tellingen av verdier som produseres. Dette er stadig mer komplisert ettersom vi nå har innført en slags planlegging. For å produsere IList T på riktig tidspunkt, trenger vi en planlegger tildelt for å utføre timingen Dette gjør også testing av disse tingene mye lettere. Eksempelbuffer med tid på 5 enheter. Eksempel på Revisted. IObservable TSource Eksempel på denne IObservable TSource, IObservable TTick. Vinduets operatører ligner veldig mye på Buffer-operatørene, de er bare veldig forskjellige etter deres returtype Hvor buffer vil ta en IObservable T og returnere en IObservable IList T, returnerer Windows-operatørene en IObservable IObservable T Det er også verdt å merke seg at bufferoperatørene ikke vil gi bufferene til vinduet lukkes. Her kan vi se de enkle overlastene til vinduet der er en overraskende symmetri med vindu og buffer overloads. public static IObservable IObservable TSource Window TSource denne IObservable TSource kilde, int count. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, int count, int skip. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, TimeSpan timeSpan. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilde, TimeSpan timeSpan, int count. offentlig statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, TimeSpan timeSpan, TimeSpan timeShift. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, TimeSpan timeSpan, IScheduler scheduler. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, TimeSpan timeSpan , TimeSpan timeShift, IScheduler scheduler. public statisk IObservable IObservable TSource Window TSource denne IObservable TSource kilden, TimeSpan timeSpan, int telle, IScheduler scheduler. Eksempel på Window med en telling av 3. Eksempel på Window med tid på 5 units. A major differenc e vi ser her er vinduet operatører kan varsle deg om verdier fra kilden så snart de er produsert bufferen operatørene må vente til vinduet lukkes før verdiene kan bli varslet som en hel liste. Flattening et vindu operasjon. Jeg tror det Det er verdt å merke seg, i hvert fall fra et faglig punkt, at vindusoperatørene produserer IObservable IObservable T Vi har utforsket konseptet med nestede observerbare objekter i det tidligere kapittelet om Aggregation Concat Merge and Switch hver har en overbelastning som tar en IObservable IObservable T og returnerer en IObservable T Da vinduets operatører sikrer at windows-barnesekvensene ikke overlapper, kan vi bruke enten Concat Switch eller Merge-operatører til å slå en windowed-sekvens tilbake i sin opprinnelige rekkefølge. er det samme som. var switchedWindow Observable TimeSpan. WindowWithTime TimeSpan. Customizing Windows. Ovennevnte overbelastninger gir enkle måter å bryte en sekvens inn på mindre nestede vinduer ved hjelp av en telling og eller et tidsrom. Nå vil vi se på overbelastningene som gir deg mer fleksibilitet over hvordan vinduene dine styres. Prosjekterer hvert element av en observerbar sekvens i sammenhengende ikke-overlappende vinduer. windowClosingSelector En funksjon som er påkalt for å definere grensene til de produserte vinduene Et nytt vindu startes når den forrige er lukket. offentlig statisk IObservable IObservable TSource Window TSource, TWindowClosing denne IObservable TSource-kilden, Func IObservable TWindowClosing windowClosingSelector. Den første av disse komplekse overlastene tillater oss til å kontrollere når vinduene lukkes WindowClosingSelector-funksjonen kalles hver gang et vindu er opprettet. Windows er opprettet ved abonnement og umiddelbart etter at et vindu lukker Windows lukkes når sekvensen fra vinduetClosingSelector produserer en verdi. Verdien ignoreres, så det spiller ingen rolle hva Typen av sekvensverdiene er Faktisk kan du til og med bare fullføre sekvensen fra windowClosingSelector for å lukke vinduet. I dette eksempelet oppretter vi et vindu med en lukkevelger for vinduet. Vi returnerer det samme emnet fra velgeren hver gang og melder fra emnet Når en bruker trykker, skriver du inn fra console. var windo wIdx 0.var kilde Observable TimeSpan. var nærmere ny Subject Unit. nærmere. Den mest komplekse overbelastningen av Window lar oss lage potensielt overlappende vinduer. Prosjekterer hvert element av en observerbar sekvens i null eller flere vinduer. windowOpenings Observerbar sekvens hvis elementer angir opprettelsen av nye vinduer. windowClosingSelector En funksjon som påberopes for å definere lukkingen av hver produsert window. public statisk IObservable IObservable TSource Window TSource, TWindowOpening, TWindowClosing denne IObservable TSource kilden, IObservable TWindowOpening windowOpenings, Func TWindowOpening, IObservable TWindowClosing windowClosingSelector. Denne overbelastningen tar 3 argumenter. Kilde sekvensen. En sekvens som indikerer når et nytt vindu skal åpnes. En funksjon som tar en vinduesåpningsverdi, og returnerer en vinduesavslutningssekvens. Denne overbelastningen gir stor fleksibilitet i måten vinduene åpnes og lukkes. Windows kan være stort sett uavhengig av hverandre. De kan overlappe, variere i størrelse og til og med hoppe over verdier fra kilden. For å lette vei inn i denne mer komplekse overbelastningen, la oss først prøve å bruke den til å gjenskape en enklere versjon av Window overbelastningen som tar en telle. For å gjenopprette dette trenger vi å åpne et vindu på det opprinnelige abonnementet og hver gang kilden har produsert, spesifiserer du tellingen T han vinduet må lukke hver gang kilden produserer den angitte tellingen For å oppnå dette trenger vi bare kildesekvensen Vi deler den ved å bruke Publiseringsmetoden og deretter levere visninger av kilden som hver av arguments. public statisk IObservable IObservable T MyWindow T denne IObservable T-kilden, int count. var windowEdge shared. It kan også være av interesse for å vurdere hvordan du kan implementere andre tidsforskyvingsmetoder som Eksempel eller Throttle med Window. The Join-operatør lar deg logisk bli med to sekvenser Hvor Zip-operatøren vil bli med to sekvenser sammen av indeksen, kan du delta i sekvenser ved å krysse vinduer. Som det siste vinduets overbelastning vi nettopp så på, kan du spesifisere når et vindu lukkes fra en funksjon som tar en åpningsverdi og returnerer en observerbar sekvens som angir når vinduet skal lukke Operatør-operatøren har to slike funksjoner, en for den første kildesekvensen og en for den andre kildesekvensen. Som Zip operato r må vi også gi en selector-funksjon for å produsere resultatobjektet fra pair of values. public statisk IObservable TResult Bli med TLeft, TRight, TLeftDuration, TRightDuration, TResult. this IObservable TLeft left. Observable TRight right. Func TLeft, IObservable TLeftDuration leftDurationSelector. Func TRight, IObservable TRightDuration rightDurationSelector. Func TLeft, TRight, TResult resultSelector. Now dette er en ganske hårete signatur for å prøve å forstå på en gang, så la oss ta det en parameter om gangen. Observerbar TLeft igjen er kildesekvens som definerer når et vindu starter Dette er akkurat som bufferen og vinduet operatørene, bortsett fra at hver verdi publisert fra denne kilden åpner et nytt vindu I noen buffer og vinduet falt noen verdier bare inn i et eksisterende vindu. Jeg liker å tenke IObservable TRight like vinduverdien sekvens Mens den venstre sekvensen styrer åpning av vinduene, vil den riktige sekvensen forsøke å para sammen med en verdi fra den venstre sekvensen. La oss forestille oss at vår venstre sekvens produserer en verdi som skaper et nytt vindu Hvis den riktige sekvensen gir en verdi mens vinduet er åpent, kalles resultatselgeringsfunksjonen med de to verdiene. Dette er kjernepunktet i å bli sammenkoblet, sammenkobling av to verdier fra en sekvens som oppstår med i samme vindu Dette fører oss til vårt neste spørsmål Når lukker vinduet Svaret på dette spørsmålet er både kraften og kompleksiteten til Join-operatøren. Når venstre produserer en verdi, åpnes et vindu. Denne verdien sendes også til funksjonen leftDurationSelector Resultatet av denne funksjonen er en IObservable TLeftDuration Når den sekvensen produserer en verdi eller fullfører, er vinduet for den verdien lukket. Merk at det er irrelevant hva typen TLeftDuration er. Dette forlot meg først med følelsen av at IObservable TLeftDuration var alt litt over drepe som du effektivt trenger bare en slags hendelse å si Lukket Men ved å la deg bruke IObservable T kan du gjøre noen smarte ting som vi w dårlig se senere. Så la oss først forestille seg et scenario der vi har den venstre sekvensen som produserer verdier dobbelt så fort som den riktige sekvensen. Forestill oss at vi aldri lukker vinduene. Vi kunne gjøre dette ved å alltid returnere Unit fra den venstreDurationSelector-funksjonen. Dette ville resultere i de følgende parene blir produsert. Som du kan se, er de venstre verdiene bufret og gjentatt hver gang den høyre produserer en verdi. Nå virker det ganske åpenbart at hvis jeg umiddelbart lukket vinduet ved å returnere Enhet eller kanskje vil vinduene aldri bli åpnet så nei par ville noensinne bli produsert. Men hva kunne jeg gjøre for å sikre at disse vinduene ikke overlapper slik at en gang en sekund verdi ble produsert, ville jeg ikke lenger se den første verdien Vel, hvis vi returnerte venstre sekvens fra venstreDurationSelector som kunne gjøre det Men vent, når vi kommer tilbake fra venstreDurationSelector, ville det prøve å opprette et nytt abonnement, og det kan innføre bivirkninger. Det raske svaret er at Publisere og RefCount den venstre sekvensen Hvis vi gjør det, ser resultatene ut som dette. Det siste eksempelet ligner veldig på CombineLatest bortsett fra at det bare produserer et par når den riktige sekvensen endrer. Vi kan bruke Bli med for å lage vår egen versjon av CombineLatest Hvis verdiene fra venstre sekvens utløper når neste verdi fra venstre ble varslet så jeg ville være bra på vei Men jeg trenger det samme som skal skje til høyre Heldigvis gir samarbeidspartneren oss også en rightDurationSelector som fungerer akkurat som venstreDurationSelector Dette er enkelt å implementere, er alt jeg trenger å gjøre å returnere en referanse til samme venstre sekvens når en venstre verdi blir produsert og deretter den samme til høyre. Koden ser ut som denne. public static..Overvinner TRIGHT right. While koden ovenfor ikke er produksjonskvalitet, vil det trenge å ha noen porter på plass for å redusere løpevilkårene, det viser oss kraften vi kunne kom med Bli med, vi kan faktisk bruke den til å opprette andre operatører. Når samarbeidspartneren parrer opp verdier som faller sammen i et vindu, vil det alltid produsere bare venstreverdien og den riktige verdien til resultatetSelektor. Operatøren GroupJoin tar dette et skritt lenger av sender den venstre verdien umiddelbart til resultatetSelgeren med en sekvens av de riktige verdiene som forekommer i vinduet. Det er signaturen som ligner på Bli med, men merk forskjellen i resultatet Selector Func. public statisk IObservable TResult GroupJoin TLeft, TRight, TLeftDuration, TRightDuration, TResult. this IObservable TLeft left. Observable TRight right. Func TLeft, IObservable TLeftDuration leftDurationSelector. Func TRight, IObservable TRightDuration rightDurationSelector. Func TLeft, IObservable TRight, TResult resultSelector. If vi gikk tilbake til vår første Join eksempel hvor vi hadde. verdier dobbelt så fort som høyre. venstre igjen aldri utløper. høyre umiddelbart utløper. dette er hva resultatet er kan se ut. Nå kan vi bytte rundt og få det at venstre utløp umiddelbart og høyre aldri utgå, resultatet kan se slik ut. Dette begynner å gjøre ting interessant. Skarpe lesere har kanskje lagt merke til at med GroupJoin kan du effektivt opprette din egen Bli med ved å gjøre noe som dette. public. Observable TResult MyJoin TLeft, TRight, TLeftDuration, TRightDuration, TResult. Observable TLeft left. Observable TRight right. Func TLeft, IObservable TLeftDuration leftDurationSelector. Func TRight, IObservable TRightDuration rightDurationSelector. Func TLeft, TRight, ResultatresultatSelektor.

No comments:

Post a Comment