Thursday, February 7, 2013

Streaming file protocol IX for go, inheriting from 9p.

Finally got a protocol as simple as 9p but capable of streaming. It's called IX.
We'll publish it soon.

This is the documentation for the Go binding of ix plus a few examples.


gr := ns.GetReader("/foo/dockingframescore.pdf")
of, err := os.Create("/tmp/file")
if err != nil {
t.Fatal(err)
}
for { 
b := make([]byte, 8192)
n, err := gr.Read(b)
if n > 0 {
of.Write(b[0:n])
}
if err != nil || n == 0 {
break
}
}
of.Close()

And as a result of running this...

2013/02/07 21:00:34 -> Tfid[7] fid 1
2013/02/07 21:00:34 -> Tclone[7] newfid 7
2013/02/07 21:00:34 -> Tclunkon[7] err
2013/02/07 21:00:34 -> Twalk[7] name 'dockingframescore.pdf'
2013/02/07 21:00:34 -> Topen[7] r
2013/02/07 21:00:34 -> Tread[7] off 0 count 18446744073709551615
2013/02/07 21:00:34 -> Tend[7]
2013/02/07 21:00:34 <- Rok[7]
2013/02/07 21:00:34 <- Rok[7]
2013/02/07 21:00:34 <- Rok[7]
2013/02/07 21:00:34 <- Rok[7]
2013/02/07 21:00:34 <- Rok[7]
2013/02/07 21:00:34 <- Rread[7] off 0 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 8192 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 16384 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 24576 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 32768 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 40960 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 49152 data[8192]
2013/02/07 21:00:34 <- Rread[7] off 57344 data[8192]


And the stream goes on and on.

This is an example of how to use the protocol directly from Go:

// Read a directory
r = c.NewRpc()
r.Wc <- &Call{Kind: Tfid, Fid: 1}
r.Wc <- &Call{Kind: Tclone, Newfid: 2}
r.Wc <- &Call{Kind: Tclunkon, When: End}
r.Wc <- &Call{Kind: Twalk, Name: "foo"}
r.Wc <- &Call{Kind: Topen, Mode: Read}
r.Wc <- &Call{Kind: Tread, Off: 0, Count: ^uint64(0)}
r.Wc <- nil
b := make([]byte, 0)
for rep := range r.Rc {
fmt.Printf("got rep %s\n", rep)
if rep.Kind == Rread {
b = append(b, rep.Data...)
}
}
r.Close()

Now the promised go doc. There are two packages, ix (the raw protocol interface) and ns (a canned interface more convenient for client code).

package ix
   import "/Users/nemo/gosrc/src/lsub.org/ix"

   IX protocol messages.

   This implements pack, unpack, and printing of IX messages.

   Authentication and encryption is left out of the protocol. The
   underlying transport must be secured by caller before using the
   protocol.

   The underlying transport is assumed to be a reliable stream (e.g. TCP).
   Messages are exchanged by sending the message length (4 bytes in big
   endian) and then the message data. The resulting transport thus
   preserves message boundaries.

   All integers are sent in big endian format, using either 4 bytes or 8
   bytes. These are noted as [4] and [8] in what follows. All strings use
   UTF-8. They are sent by sending the string size[4] in bytes and then so
   many bytes in UTF-8. A string on the wire is noted str[s]. Messages that
   carry raw data send such data as the last field, without including the
   field length in the message. The total message size indicates how many
   data bytes are included. Such data fields are noted data[].

   Messages sent on the wire have the format:

length[4] type[4] tag[4] ...

   Here, type identifies the message type.

   Requests belong to a group (also known as a channel), which is
   identified by a tag. A group is a series of requests with the same tag,
   and finishes with a Tend request. It is ok to send messages for
   different groups at the same time (one message at a time, of course).

   A server processes requests for a group one at a time, in order.
   Requests in a group are terminated as soon as one fails or the final
   Tend request is reached. Clients are expected to send the Tend request
   even if the group has already failed, for simplicity.

   Each group operates on a fid that is implicit for the group. The first
   request usually initializes the implicit fid. Requests like Tclone,
   which allocates a new fid, make that fid implicit for the rest of the
   group. We use ``implicit file'' to refer to the file pointed to by the
   implicit fid.

   All requests have replies, which carry the tag found in the request. Two
   generic replies are used almost everywhere: Rok indicates that the
   request was performed fine; Rerror indicates that it failed and conveys
   the error message. Most requests use these as their replies; some
   require specific reply messages.

   These are the messages in the protocol. The length, type, and tag are
   not shown; instead, a symbolic name for each message type is shown.

tattach fid[4] afid[4] uname[s] tname[s] msize[4]
rattach msize[4] afid[4]

   Attaches the fid to the tree with name tname for the user with name
   uname. The fid becomes implicit for the rest of the group. All data[]
   fields in the conversation will be no larger than msize bytes. Auth is
   left out of the protocol, thus it's up the the client and the server to
   authenticate in a way that makes the server trust the client on behalf
   of uname, and vice-versa. Afid is kept as a provision to add auth fids
   similar to those of 9p.

   Negotiation of the protocol version is also left out of the protocol.

twalk name[s]

   walks the implicit fid to the given name. Name is a single path element,
   not a full path. Servers are free to refuse walks to ".." and ".".

tclone newfid[4]

   allocates a new fid and makes it implicit for the rest of the group.

tclunkon end|err

   makes Tend and/or Rerror get rid of the implicit fid. Otherwise, the fid
   remains allocated after the entire group is processed. Clunking on end
   implies clunking also on errors.

topen mode[1]

   opes the implicit fid for I/O according to mode.

   Directories can only be open for reading.

tcreate type[4] perm[4] name[s]

   creates a file (or directory, depending on type) named "name" under the
   implicit file, with the given permissions. Note that the implicit fid
   still refers to the directory after this request, and not to the new
   file. Also, the new file is not open. This request only creates a file,
   nothing else. The values for type should be the character used in UNIX
   to list the file type, by convention.

tremove

   removes implicit file.

trattr name[s]
rrattr name[s] data[]

   Reads an attribute of the implicit file. All attributes have UTF-8 names
   and have data[] as values. Values are usually encoded as UTF-8 strings,
   which is useful for other requests shown later.

   The special name "?" may be used as an attribute name to obtain the list
   of attribute names for the implicit file.

   The special name "*" may be used to obtain one reply per existing
   attribute, followed by one reply with empty name and data.

   By convention, the following attributes are always available:

id: file id or path
name: file name
length: length in bytes for files; number of children for directories.
mode: file mode; conventional rwxrwxrwx bits are expected at least.
uid: user id
mtime: last modified, seconds since epoch
type: file type: "d" for directories, "-" for regular files
twattr name[s] data[]

   writes a new value for the named attribute.

tmove tofid[4] name[s]

   moves the implicit file to the directory implied by tofid[4], using
   name[s] as the new name for the file in the destination directory.

tfid fid[4]

   sets the implicit fid.

tread off[8] count[8]
rread off[8] data[]

   asks for replies conveying up to count[8] bytes from the implicit file
   (starting at the indicated offset). Here, count may be larger that the
   maximum data[] size negotiated. The server will reply with one or more
   replies until either count bytes are returned or EOF is reached in the
   file (signaled as an reply with 9 bytes). Note that this is equivalent
   to readn, not to read.

   To issue a single read request to the server, count should be 0, which
   means that up to the negotiated msize bytes might be sent.

   The result from reading a directory is a series of names, encoded as
   name[s], for each of the files contained in the directory by the time of
   the open.

treplace off0[8] off1[8] data[]
rreplace off[8] count[4]

   replaces the range off0:off1 in the implicit file with new data.

tcond op[1] name[s] data[]

   applies the condition op (LT,LE,EQ,GE,GT,NE) to the named attribute
   using the supplied data as the second operand. If the condition holds,
   Rok is replied; Rerror otherwise. If the attribute value is not a valid
   UTF-8 string, the request fails. Otherwise, it is compared as a string
   unless both strings are valid as printed integers, in which case they
   are compared as integer values.

tforall rec[1]
rforall data[] // one sent per file, carrying the file id. then one with data[0]

   This request applies the rest of the requests in the group to each one
   of the files contained in the implicit file (which must be a directory).
   If rec is non-zero, the request recurres to the entire file tree rooted
   at the implicit file. If rec is >1, then the requests are applied using
   tail recursion.

   This is useful to remove entire trees, to retrieve entire trees, etc.
   See the examples.

   Before applying the rest of the group to each one of the files, an
   Rforall is sent back, carrying the value of the "id" attribute for the
   file. Then, the rest of the group is applied to a temporary implicit fid
   that is set to point to the file (in each case). If the group fails,
   processing continues on the next file.

tend
rend

   terminates the group.

rok

   is a generic reply to requests that do not need specific replies.

rerror err[s]

   is a generic reply for a request that failed.

tflush

   flushes the group. It is Ok to send a flush after Tend. Flush requests
   are always replied with Rerror containing "flushed", as are flushed
   requests.

   Example, remove a tree:

Tfid 13, set the fid 13
Tforall 2
Tremove
Tend

   Example, retrieve everything about a file:

Tfid 13
Tclone
Tclunkon end
Twalk a
Twalk b
Trattr *
Topen OREAD
Tread 0 ~0 8192
Tend

   Example, retrieve an entire tree:

Tfid 13
Tclone
Tclunkon end
Twalk a
Twalk b
Tforall 1
Trattr *
Topen OREAD
Tread 0 ~0 8192
Tend

   Example, retrieve all metadata for a tree:

Tfid 13
Tclone
Tclunkon end
Twalk a
Twalk b
Tforall 1
Trattr *
Tend

   Example, retrieve all metadata and data for files newer than 1234

Tfid 13
Tclone
Tclunkon end
Twalk a
Twalk b
Tforall 1
Tcond GT mtime 1234
Trattr *
Topen OREAD
Tread 0 ~0 8192
Tend

CONSTANTS

const (
   None = CallKind(iota)
   Tattach
   Twalk
   Tclone
   Tclunkon
   Topen
   Tcreate
   Tremove
   Trattr
   Twattr
   Tmove
   Tfid
   Tread
   Tcond
   Tforall
   Tflush
   Tend
   Rattach
   Rrattr
   Rread
   Treplace
   Rreplace
   Rok
   Rforall
   Rerror
   Rend
)
   Message types.
const (
   Read  = 1
   Write = 2
   Trunc = 4
)
   Known modes for Call.Mode.
const (
   End = 0
   Err = 1
)
   Known flags for Tclunkon when
const (
   Tdir  = uint8('d')
   Tfile = uint8('-')
)
   Known file types for Tcreate.Ftype
const (
   LT = iota
   LE
   EQ
   GE
   GT
   NE
)
   Known ops for Tcond.Op.
const (
   Dontrec = iota
   Topdown
   Bottomup
)
   Known flag values for Tforall.Rec.


VARIABLES

var (
   // PEM and Key files used.
   DefaultPem = "certs/client.pem"
   DefaultKey = "certs/client.key"
)


FUNCTIONS

func AddWalks(calls []*Call, path string) []*Call
   Add enough Twalk requests to calls to walk to the given path.

func DirEnts(b []byte) ([]string, error)
   Convert data read from a directory read into directory entries

func NetDialTLS(network, addr string, cfg *tls.Config) (io.ReadWriteCloser, error)
   Dial a server using tls and return a rwc for it. You can create a config
   given paths to pem and key files using MakeTLSCfg(pem, key). If cfg is
   nil, default files with pem and key are used.

func NextElem(path string) (string, string)
   Return the first path element in path and the rest of it.

func ReadMsg(r io.Reader) ([]byte, error)
   Read a message from the given reader, allocating a buffer to hold the
   message bytes. Use ReadMsgBuf if you want to use your own buffer.

func ReadMsgBuf(r io.Reader, buf []byte) ([]byte, error)
   Read a message from the given reader using the given buffer. The buffer
   should have enough capacity for the largest expected message.

func ReadStringFrom(r io.Reader) (string, error)
   Read a string from r using the IX string encoding.

func WriteMsg(w io.Writer, buf []byte) error
   Write a packed message to the given writer.

func WriteStringTo(w io.Writer, s string) error
   Write a string into w using the IX string encoding.


TYPES

type Call struct {
   Kind   CallKind // all
   Tag    uint32   // all
   Fid    uint32   // Tattach, Tfid
   Afid   uint32   // Tattach, Rattach
   Uname  string   // Tattach
   Tname  string   // Tattach
   Msize  uint32   // Tattach, Rattach
   Name   string   // Twalk, Tcreate, Trattr, Twattr, Tmove, Tcond, Rrattr
   Newfid uint32   // Tfid
   Mode   uint8    // Topen
   Ftype  uint8    // Tcreate
   Perm   uint32   // Tcreate
   Data   []byte   // Twattr, Tcond, Rrattr, Rread, Treplace, Rforall
   Tofid  uint32   // Tmove
   Off    uint64   // Tread, Rread, Rreplace
   Count  uint64   // Tread, Rreplace (uint32 on the wire for Rreplace)
   Op     uint8    // Tcond
   Rec    uint8    // Tforall
   Off0   uint64   // Treplace
   Off1   uint64   // Treplace
   Err    string   // Rerror
   When   uint8    // Tclunkon
   // nothing: Tremove, Tflush, Tend, Rok, Rend

}
   Unpacked IX message operation (or reply).

func Unpack(buf []byte) (*Call, error)
   Unpack a message from buf. The buffer must contain exactly one message.

func (m *Call) Pack(buf []byte) ([]byte, error)
   Pack the message in the given buffer. The packed message does not
   include the message length. That should be written prior to writing the
   buffer, eg. using WriteMsg.

func (m *Call) PackedSize() (int, error)
   Return the size of the packed message by writing it into a memory buffer
   and returning its length.

func (m *Call) String() string
   Return a string representation for the message, for debugging.

func (m *Call) WriteTo(w io.Writer) (int64, error)
   Write the message to the given writer by packing the message into a
   newly allocated buffer and writing the message length and then the
   message bytes to w. Use Pack and WriteMsg if you want to provide your
   own buffer.

type CallKind uint32
   Message kind type.

func (k CallKind) ReplyKind() CallKind
   Return the expected reply type (besides Rerror) or Tnone if there is no
   reply.

func (k CallKind) String() string
   Return the name for the message type.

type Cli struct {
   sync.Mutex
   // contains filtered or unexported fields
}
   A client connection to an IX server. There should be only one per open
   connection. This is useful to keep track of fid and tag allocation and
   to allocate Rpcs to be sent to a server.

func Dial(network, addr string) (*Cli, error)
   Dial a server and return a client for it

func DialTLS(network, addr string, cfg *tls.Config) (*Cli, error)
   Dial a server using tls and return a client for it. You can create a
   config given paths to pem and key files using MakeTLSCfg(pem, key). If
   cfg is nil, default files with pem and key are used.

func NewCli(rw io.ReadWriteCloser) *Cli
   Build a new Cli for the given connection.

func (c *Cli) Close()
   Hangup the client and wait until hung up.

func (c *Cli) NewFid() uint32
   Get a new unique fid number.

func (c *Cli) NewRpc() *Rpc
   Return an ongoing call to an IX server, made out of different
   operations.

func (c *Cli) Rpc(ts []*Call) ([]*Call, error)
   Convenience method to issue a series of requests in a new RPC and return
   the replies.

type Rpc struct {

   // Channel to write requests within this RPC.
   // Send nil and close to terminate the RPC.
   Wc chan<- *Call

   // Channel to read replies from.
   Rc <-chan *Call
   // contains filtered or unexported fields
}
   An ongoing rpc to an IX server.

func (r *Rpc) Close()
   Terminate an RPC request by sending nil through r.Wc and closing it, and
   then wait for it to actually terminate.

func (r *Rpc) Discard(n int) (int, error)
   Receive and discard up to n calls, indicating any error received and how
   many did we receive up to the error, if any.

func (r *Rpc) Flush()
   Flush this RPC.

func (r *Rpc) Send(calls []*Call)
   Send the given requests terminating the Rpc after doing that.


SUBDIRECTORIES

certs
ns


package ns
   import "/Users/nemo/gosrc/src/lsub.org/ix/ns"

   A name space implemented out of connections to IX servers

CONSTANTS

const (
   Read  = ix.Read
   Write = ix.Write
   Trunc = ix.Trunc
)
   modes for Open
const (
   // Maximum message (data) size
   Msize = 8 * 1024
)


FUNCTIONS

func Attr2Int(b []byte) int64
   Return the int value for the given attribute value.

func Int2Attr(i int64) []byte
   Return the attribute value for the given int value.


TYPES

type Dir map[string][]byte
   A directory entry: named attribute values.

func NewDir() Dir
   Make a new, empty, directory entry.

func (d Dir) Gid() string

func (d Dir) Id() string

func (d Dir) Length() int64

func (d Dir) Mode() int

func (d Dir) Mtime() int64

func (d Dir) Name() string

func (d Dir) SetGid(s string)

func (d Dir) SetId(id string)

func (d Dir) SetLength(l int64)

func (d Dir) SetMode(m int)

func (d Dir) SetMtime(mt int64)

func (d Dir) SetName(n string)

func (d Dir) SetType(s string)

func (d Dir) SetUid(s string)

func (d Dir) Type() string

func (d Dir) Uid() string

type File struct {
   // contains filtered or unexported fields
}
   Represents a file in use.

func (f *File) Close() error
   Close a file in use.

func (f *File) Get() (<-chan Dir, <-chan []byte, <-chan error)
   Get everything about this file, its directory entry and all data. The
   file must be open for reading. Caller must receive from the Dir channel
   to retrieve the Dir, and from the []byte channel until a null slice is
   received. Upon reception of a null slice, the error channel should be
   checked out to discover any error (none if nil).

func (f *File) GetData(off, count uint64) (<-chan []byte, <-chan error)
   Stream data from the file. Caller must receive from the channel of
   []byte to get the data until the total count is received or a null slice
   is received. Upon reception of a null slice, the error channel should be
   checked out to discover any error. The file must be open for reading.

func (f *File) GetReader() *GetReader
   Return a reader to read the entire file.

func (f *File) Pread(count uint32, off uint64) ([]byte, error)
   Pread a file, issuing a single IX read request.

func (f *File) Pwrite(data []byte, off uint64) (uint32, uint64, error)
   Pwrite a file. May issue multiple IX write requests.

func (f *File) Rattr(name string) ([]byte, error)
   Read the value of an attribute.

func (f *File) Read(buf []byte) (int, error)

func (f *File) ReadDir() ([]Dir, error)
   Read all directory entries of files contained in a directory, open for
   reading. Should not be used on files that are not directories.

func (f *File) ReadDirNames() ([]string, error)
   Read all names of files contained in a directory (open for reading).
   Should not be used on files that are not directories.

func (f *File) Remove() error
   Remove this file.

func (f *File) Stat() (Dir, error)
   Stat a file and return its Dir entry.

func (f *File) String() string
   Return a printable version of File, for debugging.

func (f *File) Wattr(name string, value []byte) error
   Write the value of an attribute.

func (f *File) Write(data []byte) (int, error)

func (f *File) Wstat(m Dir) ([]string, error)
   Wstat a file. The names returned correspond to attributes not udpated
   due to errors.

type GetReader struct {
   // contains filtered or unexported fields
}
   To be used as a io.Reader with Ns.GetData dn Ns.Get.

func NewGetReader(c <-chan []byte, ec <-chan error) *GetReader
   Return an io.Reader retrieve data resulting from Get or GetData calls.

func (r *GetReader) Close() error
   Close the reader for the Get of the file. Beware that this would still
   receive all the data streamed in the background. The implementation
   should flush the request instead.

func (r *GetReader) Read(data []byte) (int, error)
   Read data retrieved and report any error condition.

type Ns struct {
   sync.RWMutex // don't use this; implementation only.
   // contains filtered or unexported fields
}
   Represents a name space attaching to IX servers, implemented by a prefix
   mount table.

func NewNs(uname string) *Ns
   Make a new name space for the given user name.

func (ns *Ns) Chdir(p string) error
   Change the current working directory to this named file.

func (ns *Ns) Close()
   Dismantle the Ns, closing all connections held. Each client connection
   is closed once, even if it's shared among multiple mount point entries.

func (ns *Ns) Create(p string, perm uint32) (*File, error)
   Create a new file in the name space and return a fid for it, open in RW
   mode.

func (ns *Ns) CreateDir(p string, perm uint32) error
   Create a new directory in the name space.

func (ns *Ns) Cwd() string
   Return the absolute path of the current working directory.

func (ns *Ns) Get(file string) (<-chan Dir, <-chan []byte, <-chan error)
   Get everything about this file, its directory entry and all data. Caller
   must receive from the Dir channel to retrieve the Dir, and from the
   []byte channel until a null slice is received. Upon reception of a null
   slice, the error channel should be checked out to discover any error
   (none if nil).

func (ns *Ns) GetData(file string, off, count uint64) (<-chan []byte, <-chan error)
   Stream data from the file. Caller must receive from the channel of
   []byte to get the data until the total count is received or a null slice
   is received. Upon reception of a null slice, the error channel should be
   checked out to discover any error (none if nil).

func (ns *Ns) GetReader(path string) *GetReader
   Return a reader to read the entire file.

func (ns *Ns) GetTree(file string, tcond *ix.Call) (<-chan Dir, <-chan []byte, <-chan error)
   Retrieve an entire subtree, including metadata. If filter is not null,
   then it's placed as a Tcond filter such that only files matching are
   retrieved, otherwise everything is retrieved. The caller should wait for
   a dir to be posted on the Dir channel, then read the file data until
   eof, receive another Dir, etc.

func (ns *Ns) List() []string
   Return the set of mount points.

func (ns *Ns) Mount(fd io.ReadWriteCloser, spec string, mnt string) error
   Mount an ix server reached through fd into ns. Spec indicates the tree
   and path to mount and mnt is the mount point. The first path element in
   spec identifies the tree name. Mounting on the current working directory
   updates it so it sees the mount. Mounting a suffix of the cwd is always
   seen, because it's a prefix mount table. Mounting on a prefix of the cwd
   has no effect (behaving as if an entry for dot was always present).

func (ns *Ns) NewPutWriter(file string, d Dir) *PutWriter
   do a ns.Put() for the given arguments, returning an io.WriteCloser to
   stream data writes to the new file.

func (ns *Ns) Open(path string, mode uint8) (*File, error)
   Open a file in the name space.

func (ns *Ns) Put(file string, d Dir, data <-chan []byte) <-chan error
   Create a file, update it's attributes to those in Dir (if given), and
   its data to be that received from the data channel. The data ends when
   nil is sent through the data channel. The error channel is sent the
   final error condition once the data channel has been closed and the
   streaming process completes. It is also posted an error condition if an
   error reply is noticed while streaming data, in which case all further
   data updates are discarded. Caller might check the error channel
   non-blocking during streaming to see if it should stop sending data due
   to errors.

func (ns *Ns) Remove(p string) error
   Remove a file in the name space.

func (ns *Ns) Stat(path string) (Dir, error)
   Stat a file and return its Dir entry.

func (ns *Ns) Unmount(mnt string) error
   Remove an entry in the mount table. The prefix must be exactly the given
   path. If dot is unmounted, it is left as it was. You could ChdDir(Cwd())
   to resolve dot again in the ns after the unmount has taken effect.

func (ns *Ns) Wstat(path string, m Dir) ([]string, error)
   Wstat a file. The names returned correspond to attributes not udpated
   due to errors.

type PutWriter struct {
   // contains filtered or unexported fields
}
   To be used as a io.WriteCloser with Ns.PutData and Ns.Put.

func (w *PutWriter) Close() error

func (w *PutWriter) Write(data []byte) (int, error)
   The count returned does not actually indicate that the bytes were
   written, because of streaming.


No comments:

Post a Comment