STREAM

INTRO

The STREAM module provides chain persistence, accumulation and traversal.

  • writer/1 — creates writer cursor for chain.
  • reader/1 — creates reader cursor for chain.
  • save/1 — stores cursor to db.
  • load_reader/1 — loads reader cursor.
  • top/1 — returns top of the reader.
  • bot/1 — returns bottom of the reader.
  • next/1 — moves reader next.
  • prev/1 — moves reader prev.
  • take/1 — takes N elements from reader.
  • drop/1 — skips N elements from reader.
  • add/1 — adds element to writer.
  • append/2 — appends element to feed.
  • remove/2 — removes element from feed.
  • cut/2 — cleans-up the feed up to the key.

You can grab kvs_stream and use it in your applications without importing synrc/kvs dependency, as this module is self-containing. The possible applications are: public and private feeds, FIFO queues, unread messages, chat applications, blockchain etc.

WRITER

Writer cursor represents unique append list chain with some cached values. E.g., elements count, first element of the chain, cache field for value of a previous written message and args field for passing arguments in fixpoint style.

-record(writer, { id = [] :: term(), count = 0 :: integer(), cache = [] :: [] | tuple(), args = [] :: term(), first = [] :: [] | tuple() } ).

For adding data to storage you need to create writer cursor, set the args field with the record to be added:

> require KVS KVS > KVS.writer {:writer, [], 0, [], [], []}

writer(list()) -> #writer{}.

Creates writer cursor. After this you normally call save function for cursor persistence. As a parameter provide cursor name as a list.

> w = :kvs.writer '/cursors/cur1' {:writer, '/cursors/cur1', 0, [], [], []}

save(#writer{}) -> #writer{}.

Flushes writer cursor to database.

> :kvs.save w {:writer, '/cursors/cur1', 0, [], [], []} > :kvs.get :writer, '/cursors/cur1' {:ok, {:writer, '/cursors/cur1', 0, [], [], []}}

add(#writer{}) -> #writer{}.

Adds element to persistent chain declared by writer cursor. Adding elements to chain affects count field.

> :lists.map fn _ -> {:ok,w} = :kvs.get(:writer,'/cursors/cur1') :kvs.save(:kvs.add(KVS.writer(w, args: {:"$msg", [], [], [], [], []}))) end, :lists.seq(1, 5) [ {:writer, '/cursors/cur1', 1, {:"$msg", '1313544188019000'}, [], []}, {:writer, '/cursors/cur1', 2, {:"$msg", '1313544189127000'}, [], []}, {:writer, '/cursors/cur1', 3, {:"$msg", '1313544189869000'}, [], []}, {:writer, '/cursors/cur1', 4, {:"$msg", '1313544190519000'}, [], []}, {:writer, '/cursors/cur1', 5, {:"$msg", '1313544191134000'}, [], []} ] > :kvs.get :writer, '/cursors/cur1' {:ok, {:writer, '/cursors/cur1', 5, {:"$msg", '1314009332950000'}, [], []}}

READER

Reader cursor represents a user reading pointer to the writer feed. Field #reader.feed is linked to #writer.id. The pos field represents current position in feed. Field dir is used for changing take and drop directions.

Reader Cursor
-record(reader, { id = [] :: integer(), pos = 0 :: [] | integer(), cache = [] :: [] | integer(), args = [] :: term(), feed = [] :: term(), dir = 0 :: 0 | 1 } ).

reader(term()) -> #reader{}.

Creates reader cursor for a given writer id.

load_reader(term()) -> #reader{}.

Loads reader cursor from database.

save(#reader{}) -> #reader{}.

Flushes reader cursor to database.

bot(#reader{}) -> #reader{dir = 0}.

Changes the direction of the cursor to take foraward (normal).

top(#reader{}) -> #reader{dir = 1}.

Changes the direction of the cursor to take backwards.

ITER

KVS Stream Iterator
-record(iter, { id = [] :: [] | integer(), next = [] :: [] | integer(), prev = [] :: [] | integer() } ).

next(#reader{}) -> #reader{}.

Moves cursor to next. Consume data down from top. Return error if list is empty, otherwise next element or last.

prev(#reader{}) -> #reader{}.

Moves cursor to prev. Consume data up from bottom. Return error if list is empty, otherwise next element or last.

drop(#reader{}) -> #reader{}.

Drops N elements starting from reader.

take(#reader{}) -> #reader{}.

Trying to consume N records from stream using its current value and direction. Returns consumed data. Usually you seek to some position and then consume some data.

append(tuple(), list()) -> term().

Adds record to feed.

remove(tuple(), list()) -> term().

Removes record from feed.

cut(list(), term()) -> #ok{} | #error{}.

Cuts the feed up to a given key.

This module may refer to: kvs, kvs_st.