前面我們提過:FunDA就像一個管道(PipeLine)。管道內(nèi)流動著一串數(shù)據(jù)(Data)或者運算指令(Action)。管道的源頭就是能產(chǎn)生純數(shù)據(jù)的數(shù)據(jù)源(Source),跟著在管道的中間會有一些節(jié)點(WorkNode),我們可以在這些節(jié)點施用(apply)用戶提供的功能函數(shù)(Task)。用戶功能函數(shù)可以截取并使用管道中流動的數(shù)據(jù)或者指令,然后利用一種水龍頭開關(guān)機制(Valve)來影響流動元素:可以截住、直接傳送、傳送修改版本、插入新數(shù)據(jù)。作為FunDA的用戶,需要掌握用戶功能函數(shù)編寫模式。我們先從一個簡單的用戶函數(shù)開始介紹:
//定義一個用戶作業(yè)函數(shù):列印數(shù)據(jù),完全不影響數(shù)據(jù)流 def PRintAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"發(fā)行:${album.publisher}")//原封不動直接傳下去 fda_next(album) case r@ _ => fda_next(r) } }上面這個用戶函數(shù)的類型是FDATask[FDAROW],這是一個函數(shù)類型: //作業(yè)類型 type FDATask[ROW] = ROW => Option[List[ROW]]所以我們用lambda來代表函數(shù)內(nèi)容:row => {函數(shù)功能}。lambda為用戶函數(shù)提供了當前元素。我們用下面方式調(diào)用這個用戶函數(shù): val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()//定義一個用戶作業(yè)函數(shù):列印數(shù)據(jù),完全不影響數(shù)據(jù)流 def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"發(fā)行:${album.publisher}")//原封不動直接傳下去 fda_next(album) case r@ _ => fda_next(r) } } albumStream.appendTask(printAlbums).startRun我們把用戶函數(shù)printAlbums傳入appendTask來對數(shù)據(jù)流進行施用。我們可以在appendTask后面再接一個用戶函數(shù),這個用戶函數(shù)截取到的數(shù)據(jù)流元素是原裝的數(shù)據(jù)源,因為在任何情況下printAlbums都會原封不動地把截獲的元素用fda_next()傳下去。運行一下下面這個就清楚了: albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun相反情況我們只需要做下面的修改把fda_next替換成fda_skip就可以證實了://原封不動直接傳下去 fda_skip// fda_next(album)我們也可以根據(jù)當前元素情況生成一條FDAActionROW,它的定義是這樣的: type FDAAction = DBIO[Int] case class FDAActionRow(action: FDAAction) extends FDAROW def fda_mkActionRow(action: FDAAction): FDAActionRow = FDAActionRow(action) class FDAActionRunner(slickProfile: JdbcProfile) { import slickProfile.api._ def fda_execAction(action: FDAAction)(slickDB: Database): Int = Await.result(slickDB.run(action), Duration.Inf) } object FDAActionRunner { def apply(slickProfile: JdbcProfile): FDAActionRunner = new FDAActionRunner(slickProfile) }我們可以把一條FDAActionRow傳下去: def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => { val updateAction = albums.filter(r => r.title === album.title) .map(_.year) .update(Some(2017)) fda_next(FDAActionRow(updateAction)) } case others@ _ => fda_next(others) } }我們也可以把原數(shù)據(jù)同時傳下去: def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => { val updateAction = albums.filter(r => r.title === album.title) .map(_.year) .update(Some(2017)) fda_next(FDAActionRow(updateAction)) fda_next(album) } case others@ _ => fda_next(others) } }我們需要FDAActionRunner來運算action:val runner = FDAActionRunner(slick.driver.H2Driver) def runActions: FDATask[FDAROW] = row => { row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case others@ _ => fda_next(others) } }現(xiàn)在試試運轉(zhuǎn)這個管道: albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun實際上updateYear和runActions可以一步完成。但細化拆分功能就是函數(shù)式編程的一個特點,因為能夠更自由的進行組合,這其中就包括了并行運算組合。下面是這篇討論的示范源代碼:
package com.bayakala.funda.fdasources.examplesimport slick.driver.H2Driver.api._import com.bayakala.funda.fdasources.FDADataStream._import com.bayakala.funda.samples._import com.bayakala.funda.fdarows._import com.bayakala.funda.fdapipes._import FDAValves._import com.bayakala.funda.fdarows.FDARowTypes._import scala.concurrent.duration._object Example2 extends App { val albums = SlickModels.albums val companies = SlickModels.companies//數(shù)據(jù)源query val albumsInfo = for { (a,c) <- albums join companies on (_.company === _.id) } yield (a.title,a.artist,a.year,c.name)//query結(jié)果強類型(用戶提供) case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW//轉(zhuǎn)換函數(shù)(用戶提供) def toTypedRow(row: (String, String, Option[Int], String)): Album = Album(row._1, row._2, row._3.getOrElse(2000), row._4) val db = Database.forConfig("h2db") val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()//定義一個用戶作業(yè)函數(shù):列印數(shù)據(jù),完全不影響數(shù)據(jù)流 def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"發(fā)行:${album.publisher}")//原封不動直接傳下去// fda_skip fda_next(album) case r@ _ => fda_next(r) } }// albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => { val updateAction = albums.filter(r => r.title === album.title) .map(_.year) .update(Some(2017)) fda_next(FDAActionRow(updateAction)) fda_next(album) } case others@ _ => fda_next(others) } } val runner = FDAActionRunner(slick.driver.H2Driver) def runActions: FDATask[FDAROW] = row => { row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case others@ _ => fda_next(others) } } albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun}
新聞熱點
疑難解答