Merge pull request #14 from wayofthepie/watch-update

Add watch client
This commit is contained in:
Brendan Burns
2018-04-07 07:59:39 -07:00
committed by GitHub
4 changed files with 182 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
# kubernetes-watch-client
Client for streaming events from watch enabled endpoints.
## Example
Following is a simple example which
just streams to stdout. First some setup - this assumes kubernetes is accessible
at http://localhost:8001, e.g. after running `kubectl proxy`:
```haskell
> import qualified Data.ByteString.Streaming.Char8 as Q
> manager <- newManager defaultManagerSettings
> defaultConfig <- newConfig
> config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
> request = listEndpointsForAllNamespaces (Accept MimeJSON)
```
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
```haskell
> dispatchWatch manager config request Q.stdout
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
```
A more complex example involving some ggprocessing of the stream, the following
prints out the event types of each event. First, define functions to allow us apply
a parser to a stream:
```haskell
import Data.Aeson
import qualified Data.ByteString.Streaming.Char8 as Q
import Data.JsonStream.Parser
import qualified Streaming.Prelude as S
-- | Parse the stream using the given parser.
streamParse ::
FromJSON a =>
Parser a
-> Q.ByteString IO r
-> Stream (Of [a]) IO r
streamParse parser byteStream = do
byteStream & Q.lines & parseEvent parser
-- | Parse a single event from the stream.
parseEvent ::
(FromJSON a, Monad m) =>
Parser a
-> Stream (Q.ByteString m) m r
-> Stream (Of [a]) m r
parseEvent parser byteStream = S.map (parseByteString parser) (S.mapped Q.toStrict byteStream)
```
Next, define the parser and apply it to the stream:
```haskell
> eventParser = value :: Parser (WatchEvent V1Endpoints)
> withResponseBody body = streamParse eventParser body & S.map (map eventType)
> dispatchWatch manager config request (S.print . withResponseBody)
[\"ADDED\"]
[\"ADDED\"]
[\"MODIFIED\"]
...
```
Packages in this example:
* Data.Aeson -- from [aeson](https://hackage.haskell.org/package/aeson)
* Data.ByteString.Streaming.Char8 from [streaming-bytestring](https://hackage.haskell.org/package/streaming-bytestring-0.1.5/docs/Data-ByteString-Streaming-Char8.html)
* Data.JsonStream.Parser from [json-stream](https://hackage.haskell.org/package/json-stream-0.4.1.5/docs/Data-JsonStream-Parser.html)
* Streaming.Prelude from [streaming](https://hackage.haskell.org/package/streaming-0.2.0.0/docs/Streaming-Prelude.html)

View File

@@ -0,0 +1,15 @@
name: kubernetes-watch
version: 0.1.0.0
library:
source-dirs: src
dependencies:
- base >=4.7 && <5.0
- aeson >=1.0 && <2.0
- bytestring >=0.10.0 && <0.11
- http-client >=0.5 && <0.6
- mtl >=2.2.1
- streaming-bytestring >= 0.1.5 && < 0.2.0
- text >=0.11 && <1.3
- kubernetes == 0.1.0.0

View File

@@ -0,0 +1,95 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
module Kubernetes.Watch.Client
( WatchEvent
, eventType
, eventObject
, dispatchWatch
) where
import Control.Monad
import Control.Monad.Trans (lift)
import Data.Aeson
import qualified Data.ByteString as B
import qualified Data.ByteString.Streaming.Char8 as Q
import qualified Data.Text as T
import Kubernetes.Core
import Kubernetes.Client
import Kubernetes.MimeTypes
import Kubernetes.Model (Watch(..))
import Network.HTTP.Client
data WatchEvent a = WatchEvent
{ _eventType :: T.Text
, _eventObject :: a
} deriving (Eq, Show)
instance FromJSON a => FromJSON (WatchEvent a) where
parseJSON (Object x) = WatchEvent <$> x .: "type" <*> x .: "object"
parseJSON _ = fail "Expected an object"
instance ToJSON a => ToJSON (WatchEvent a) where
toJSON x = object
[ "type" .= _eventType x
, "object" .= _eventObject x
]
-- | Type of the 'WatchEvent'.
eventType :: WatchEvent a -> T.Text
eventType = _eventType
-- | Object within the 'WatchEvent'.
eventObject :: WatchEvent a -> a
eventObject = _eventObject
{-| Dispatch a request setting watch to true. Takes a consumer function
which consumes the 'Q.ByteString' stream. Following is a simple example which
just streams to stdout. First some setup - this assumes kubernetes is accessible
at http://localhost:8001, e.g. after running /kubectl proxy/:
@
import qualified Data.ByteString.Streaming.Char8 as Q
manager <- newManager defaultManagerSettings
defaultConfig <- newConfig
config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
request = listEndpointsForAllNamespaces (Accept MimeJSON)
@
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
@
> dispatchWatch manager config request Q.stdout
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
@
-}
dispatchWatch ::
(HasOptionalParam req Watch, MimeType accept, MimeType contentType) =>
Manager
-> KubernetesConfig
-> KubernetesRequest req contentType resp accept
-> (Q.ByteString IO () -> IO a)
-> IO a
dispatchWatch manager config request apply = do
let watchRequest = applyOptionalParam request (Watch True)
(InitRequest req) <- _toInitRequest config watchRequest
withHTTP req manager $ \resp -> apply $ responseBody resp
withHTTP ::
Request
-> Manager
-> (Response (Q.ByteString IO ()) -> IO a)
-> IO a
withHTTP request manager f = withResponse request manager f'
where
f' resp = do
let p = (from . brRead . responseBody) resp
f (resp {responseBody = p})
from :: IO B.ByteString -> Q.ByteString IO ()
from io = go
where
go = do
bs <- lift io
unless (B.null bs) $ do
Q.chunk bs
go

View File

@@ -3,3 +3,4 @@ extra-deps:
packages:
- kubernetes
- kubernetes-client-helper
- kubernetes-watch