22/08: f# wraper for Rx framework
according to jafar husain the people now have access to the awesome rx framework library.
this is great except you can not use it easily from f#
with this library wrapper you can access all the observable methods from the module OSeq and build observable collections using the oseq monad builder.
following is the interesting bits of an example that shows both, it encapsulates a triple-click:
("win" is a System.Windows.Window)
open Microsoft.FSharp.Collections
let mousedown = OSeq.from_event win.MouseDown
let mouseup = OSeq.from_event win.MouseUp
let rec trippleclick = oseq {
for x in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
let mili = DateTime.Now.Ticks
for y in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
for z in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
if DateTime.Now.Ticks - mili < (int64 5000000) then
yield "click"
return! trippleclick
else
return! trippleclick
}
the following library does not presently wrap every observable function but is already quite useful.
module Microsoft.FSharp.Collections
open System
open System.Linq
open System.Collections.Generic
open System.Linq.Expressions
module OSeq =
type ObservableBuilder() =
member this.Bind(a, f) = Observable.SelectMany(a, Func<'a, 'b IObservable>(f))
member this.Return x = Observable.Return x
member this.Delay f = Observable.Defer(Func<'a IObservable>(f))
member this.Yield x = Observable.Return x
member this.For(a, f) = Observable.SelectMany(a, Func<'a, 'b IObservable>(f))
//member this.While : (unit -> bool) * M<'a> -> M<'a>
member this.Combine(left, right) = Observable.Concat(left, right)
member this.TryFinally(a : 'a IObservable, f) = //Observable.Finally(a, b) ??
{new 'a IObservable with
member this.Subscribe(o) =
a.Subscribe(
{new 'a IObserver with
member this.OnNext x = o.OnNext x
member this.OnError e = f(); o.OnError e
member this.OnCompleted () = f(); o.OnCompleted()
}
)
}
member this.TryWith(a : 'a IObservable, f : Exception -> 'a IObservable) = Observable.Catch(a, Func<Exception, 'a IObservable>(f))
member this.Using(a : #IDisposable, f : #IDisposable -> 'a IObservable) = this.TryFinally(f a, fun () -> a.Dispose() )
member this.Zero () = Observable.Never()
let oseq = ObservableBuilder()
let listen f (o : 'a IObservable) =
o.Subscribe
{new IObserver<'a> with
member this.OnNext a = f a
member this.OnCompleted () = ()
member this.OnError e = raise e
}
let zip a b = oseq {
for a' in a do
for b' in b do
return a', b'
}
/// .NET events are wrapped in a generic IEvent object with two type arguments:
/// the first is the delegate type and the second is the type of the event arguments.
/// Use this overload to convert these events into IObservable.
let from_event (ev : IEvent<'a, 'b>) =
{new 'b Event IObservable with
member this.Subscribe o =
let next = Action<obj, 'b>(fun source a -> o.OnNext(new 'b Event(source, a)))
let senderParameter = Expression.Parameter(typeof<obj>, "sender")
let argumentsParameter = Expression.Parameter(typeof<'b>, "arguments")
let parameterArray = [| (senderParameter :> Expression); (argumentsParameter :> Expression) |]
let handlerExpression = Expression.Lambda<'a>(Expression.Call(Expression.Constant(next.Target), next.Method, parameterArray), [| senderParameter; argumentsParameter |])
let handler = handlerExpression.Compile()
ev.AddHandler(handler)
{new IDisposable with member this.Dispose() = ev.RemoveHandler(handler) }
}
/// Native F# events are of type IEvent but with only one type argument: the type of the event arguments.
/// This method converts native F# events to IObservable.
let from_ievent<'a, 'T> (event : 'a IEvent) =
{new IObservable<'a> with
member this.Subscribe o =
//event |> Event.listen (fun e -> o.OnNext e)
let handler = Handler(fun _ e -> o.OnNext e)
event.AddHandler handler
{new IDisposable with member this.Dispose() = event.RemoveHandler handler }
}
let merge left right = Observable.Merge(left, right)
let skip n o = Observable.Skip(o, n)
let pairwise o = zip o (skip 1 o)
let map f o = Observable.Select(o, Func<'a, 'b>(f))
let filter f o = Observable.Where(o, Func<'a, bool>(f))
let until a b = Observable.Until(b, a)
let waitUntil a b = Observable.WaitUntil(b, a)
let take n o = Observable.Take(o, n)
let takeWhile f o = Observable.TakeWhile(o, Func<'a, bool>(f))
let selectMany f o = Observable.SelectMany(o, Func<'a, 'b IObservable>(f))
let holdUntilChanged o = Observable.HoldUntilChanged o
let concat a b = Observable.Concat(b, a)
let fold f s o = Observable.Aggregate(o, s, Func<'a, 'b, 'a>(f))
let reduce f o = Observable.Aggregate(o, Func<'a, 'a, 'a>(f))
let first = Observable.First
let last = Observable.Last
let scan f s o = Observable.Scan(o, s, Func<'a, 'a, 'a>(f))
let skipWhile f o = Observable.SkipWhile(o, Func<'a, bool>(f))
let flatten = Observable.Flatten
let cons a o = Observable.Cons(a, o)
let range s c = Observable.Range(s, c)
let interval i = Observable.Interval(i)
let repeat () = Observable.Repeat()
let Let f o = Observable.Let(o, Func<'a IObservable, 'b IObservable>(f))
let start f = Observable.Start(Func<'a>(f))
let post context o = Observable.Post(o, context)
let cast<'a, 'b> (e : 'a IObservable) = e |> map (fun evt -> (evt :> obj) :?> 'b)
let of_type<'a, 'b> e = e |> cast<'a, obj> |> filter (function :? 'a -> true | _ -> false) |> cast<obj, 'b>
let to_seq o = Observable.ToEnumerable o
/// returns a new observer that fires when its state is true.
/// the returned observer's state is managed by the 'start' and 'stop' argument observer's that, when fired, set the state to true and false respectivly.
let state start stop ob =
let set = ref false
let regstart = start |> listen (fun e -> set := true)
let regstop = stop |> listen (fun e -> set := false)
{new 'a IObservable with
member this.Subscribe o =
let regob = ob |> listen (fun x -> if !set then o.OnNext x)
{new IDisposable with member this.Dispose () = regstart.Dispose(); regstop.Dispose(); regob.Dispose() }
}
let to_async (x : 'a IObservable) =
Async.Primitive(
fun (answer, exn, exccancel) ->
let answers = List()
x.Subscribe
{new 'a IObserver with
member this.OnNext x = answers.Add x
member this.OnCompleted () = answer answers
member this.OnError ex = exn ex
} |> ignore )
let from_async asynchronous =
{new 'a IObservable with
member this.Subscribe(observer) =
let update = ref true
async {
try
let! value = asynchronous
if !update then
observer.OnNext value
observer.OnCompleted()
with | e -> observer.OnError e
} |> Async.Start
{new IDisposable with
member this.Dispose() =
update := false
}
}
//________________________________________________________________________________________________________
module Seq =
let to_oseq (x : 'a seq) = x.ToObservable()
let oseq = OSeq.oseq
this is great except you can not use it easily from f#
with this library wrapper you can access all the observable methods from the module OSeq and build observable collections using the oseq monad builder.
following is the interesting bits of an example that shows both, it encapsulates a triple-click:
("win" is a System.Windows.Window)
open Microsoft.FSharp.Collections
let mousedown = OSeq.from_event win.MouseDown
let mouseup = OSeq.from_event win.MouseUp
let rec trippleclick = oseq {
for x in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
let mili = DateTime.Now.Ticks
for y in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
for z in mousedown |> OSeq.waitUntil mouseup |> OSeq.take 1 do
if DateTime.Now.Ticks - mili < (int64 5000000) then
yield "click"
return! trippleclick
else
return! trippleclick
}
the following library does not presently wrap every observable function but is already quite useful.
module Microsoft.FSharp.Collections
open System
open System.Linq
open System.Collections.Generic
open System.Linq.Expressions
module OSeq =
type ObservableBuilder() =
member this.Bind(a, f) = Observable.SelectMany(a, Func<'a, 'b IObservable>(f))
member this.Return x = Observable.Return x
member this.Delay f = Observable.Defer(Func<'a IObservable>(f))
member this.Yield x = Observable.Return x
member this.For(a, f) = Observable.SelectMany(a, Func<'a, 'b IObservable>(f))
//member this.While : (unit -> bool) * M<'a> -> M<'a>
member this.Combine(left, right) = Observable.Concat(left, right)
member this.TryFinally(a : 'a IObservable, f) = //Observable.Finally(a, b) ??
{new 'a IObservable with
member this.Subscribe(o) =
a.Subscribe(
{new 'a IObserver with
member this.OnNext x = o.OnNext x
member this.OnError e = f(); o.OnError e
member this.OnCompleted () = f(); o.OnCompleted()
}
)
}
member this.TryWith(a : 'a IObservable, f : Exception -> 'a IObservable) = Observable.Catch(a, Func<Exception, 'a IObservable>(f))
member this.Using(a : #IDisposable, f : #IDisposable -> 'a IObservable) = this.TryFinally(f a, fun () -> a.Dispose() )
member this.Zero () = Observable.Never()
let oseq = ObservableBuilder()
let listen f (o : 'a IObservable) =
o.Subscribe
{new IObserver<'a> with
member this.OnNext a = f a
member this.OnCompleted () = ()
member this.OnError e = raise e
}
let zip a b = oseq {
for a' in a do
for b' in b do
return a', b'
}
/// .NET events are wrapped in a generic IEvent object with two type arguments:
/// the first is the delegate type and the second is the type of the event arguments.
/// Use this overload to convert these events into IObservable.
let from_event (ev : IEvent<'a, 'b>) =
{new 'b Event IObservable with
member this.Subscribe o =
let next = Action<obj, 'b>(fun source a -> o.OnNext(new 'b Event(source, a)))
let senderParameter = Expression.Parameter(typeof<obj>, "sender")
let argumentsParameter = Expression.Parameter(typeof<'b>, "arguments")
let parameterArray = [| (senderParameter :> Expression); (argumentsParameter :> Expression) |]
let handlerExpression = Expression.Lambda<'a>(Expression.Call(Expression.Constant(next.Target), next.Method, parameterArray), [| senderParameter; argumentsParameter |])
let handler = handlerExpression.Compile()
ev.AddHandler(handler)
{new IDisposable with member this.Dispose() = ev.RemoveHandler(handler) }
}
/// Native F# events are of type IEvent but with only one type argument: the type of the event arguments.
/// This method converts native F# events to IObservable.
let from_ievent<'a, 'T> (event : 'a IEvent) =
{new IObservable<'a> with
member this.Subscribe o =
//event |> Event.listen (fun e -> o.OnNext e)
let handler = Handler(fun _ e -> o.OnNext e)
event.AddHandler handler
{new IDisposable with member this.Dispose() = event.RemoveHandler handler }
}
let merge left right = Observable.Merge(left, right)
let skip n o = Observable.Skip(o, n)
let pairwise o = zip o (skip 1 o)
let map f o = Observable.Select(o, Func<'a, 'b>(f))
let filter f o = Observable.Where(o, Func<'a, bool>(f))
let until a b = Observable.Until(b, a)
let waitUntil a b = Observable.WaitUntil(b, a)
let take n o = Observable.Take(o, n)
let takeWhile f o = Observable.TakeWhile(o, Func<'a, bool>(f))
let selectMany f o = Observable.SelectMany(o, Func<'a, 'b IObservable>(f))
let holdUntilChanged o = Observable.HoldUntilChanged o
let concat a b = Observable.Concat(b, a)
let fold f s o = Observable.Aggregate(o, s, Func<'a, 'b, 'a>(f))
let reduce f o = Observable.Aggregate(o, Func<'a, 'a, 'a>(f))
let first = Observable.First
let last = Observable.Last
let scan f s o = Observable.Scan(o, s, Func<'a, 'a, 'a>(f))
let skipWhile f o = Observable.SkipWhile(o, Func<'a, bool>(f))
let flatten = Observable.Flatten
let cons a o = Observable.Cons(a, o)
let range s c = Observable.Range(s, c)
let interval i = Observable.Interval(i)
let repeat () = Observable.Repeat()
let Let f o = Observable.Let(o, Func<'a IObservable, 'b IObservable>(f))
let start f = Observable.Start(Func<'a>(f))
let post context o = Observable.Post(o, context)
let cast<'a, 'b> (e : 'a IObservable) = e |> map (fun evt -> (evt :> obj) :?> 'b)
let of_type<'a, 'b> e = e |> cast<'a, obj> |> filter (function :? 'a -> true | _ -> false) |> cast<obj, 'b>
let to_seq o = Observable.ToEnumerable o
/// returns a new observer that fires when its state is true.
/// the returned observer's state is managed by the 'start' and 'stop' argument observer's that, when fired, set the state to true and false respectivly.
let state start stop ob =
let set = ref false
let regstart = start |> listen (fun e -> set := true)
let regstop = stop |> listen (fun e -> set := false)
{new 'a IObservable with
member this.Subscribe o =
let regob = ob |> listen (fun x -> if !set then o.OnNext x)
{new IDisposable with member this.Dispose () = regstart.Dispose(); regstop.Dispose(); regob.Dispose() }
}
let to_async (x : 'a IObservable) =
Async.Primitive(
fun (answer, exn, exccancel) ->
let answers = List()
x.Subscribe
{new 'a IObserver with
member this.OnNext x = answers.Add x
member this.OnCompleted () = answer answers
member this.OnError ex = exn ex
} |> ignore )
let from_async asynchronous =
{new 'a IObservable with
member this.Subscribe(observer) =
let update = ref true
async {
try
let! value = asynchronous
if !update then
observer.OnNext value
observer.OnCompleted()
with | e -> observer.OnError e
} |> Async.Start
{new IDisposable with
member this.Dispose() =
update := false
}
}
//________________________________________________________________________________________________________
module Seq =
let to_oseq (x : 'a seq) = x.ToObservable()
let oseq = OSeq.oseq