icefiredb-ipfs-log #
Project introduction #
icefiredb-ipfs-log is a distributed immutable, operation-based conflict-free replication data structure that relies on ipfs to store data and merges each peer node data based on pubsub conflict-free. You can easily implement custom data structures such as kv, event, nosql, etc. based on icefiredb-ipfs-log.
Conflict-free log replication model
Log A Log B
| |
logA.append("one") logB.append("hello")
| |
v v
+-----+ +-------+
|"one"| |"hello"|
+-----+ +-------+
| |
logA.append("two") logB.append("world")
| |
v v
+-----------+ +---------------+
|"one","two"| |"hello","world"|
+-----------+ +---------------+
| |
| |
logA.join(logB) <----------+
|
v
+---------------------------+
|"one","hello","two","world"|
+---------------------------+
Features #
- Easy access to P2P && ipfs-log data consistency function
- Stable decentralized networking function
- Friendly program access interface
Installing #
go get -u github.com/IceFireDB/icefiredb-ipfs-log
Example #
Example of building a key-value database using icefiredb-ipfs-log #
- memory key-value:memory-kv
- leveldb kv :leveldb-kv
Use of key-value databases #
Detailed usage example reference
func main() {
ctx := context.TODO()
// disk cache directory
rootPath := "./kvdb"
node, api, err := iflog.CreateNode(ctx, rootPath)
if err != nil {
panic(err)
}
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", node.PeerHost.ID().Pretty()))
for _, a := range node.PeerHost.Addrs() {
fmt.Println(a.Encapsulate(hostAddr).String())
}
log := zap.NewNop()
dbname := "iflog-event-kv"
ev, err := iflog.NewIpfsLog(ctx, api, dbname, &iflog.EventOptions{
Directory: rootPath,
Logger: log,
})
if err != nil {
panic(err)
}
if err := ev.AnnounceConnect(ctx, node); err != nil {
panic(err)
}
kvdb, err := kv.NewKeyValueDB(ctx, ev, log)
if err != nil {
panic(err)
}
// Load old data from disk
if err := ev.LoadDisk(ctx); err != nil {
panic(err)
}
kvdb.Put(ctx, "one", "one")
kvdb.Get("one")
kvdb.Delete(ctx, "one")
}
package main
import (
"bufio"
"context"
"fmt"
icefiredb_crdt_kv "github.com/IceFireDB/icefiredb-crdt-kv/kv"
badger2 "github.com/dgraph-io/badger"
"github.com/ipfs/go-datastore/query"
"github.com/sirupsen/logrus"
"os"
"strings"
)
func main() {
ctx := context.TODO()
log := logrus.New()
db, err := icefiredb_crdt_kv.NewCRDTKeyValueDB(ctx, icefiredb_crdt_kv.Config{
NodeServiceName: "icefiredb-crdt-kv",
DataSyncChannel: "icefiredb-crdt-kv-data",
NetDiscoveryChannel: "icefiredb-crdt-kv-net",
Namespace: "test",
Logger: log,
})
if err != nil {
panic(err)
}
defer db.Close()
fmt.Printf("> ")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
text := scanner.Text()
fields := strings.Fields(text)
if len(fields) == 0 {
fmt.Printf("> ")
continue
}
cmd := fields[0]
switch cmd {
case "exit", "quit":
return
case "get":
if len(fields) < 2 {
printVal("missing key")
continue
}
val, err := db.Get(ctx, []byte(fields[1]))
if err != nil {
printVal(err)
continue
}
printVal(string(val))
case "put":
if len(fields) < 3 {
printVal("Missing parameters")
continue
}
printVal(db.Put(ctx, []byte(fields[1]), []byte(fields[2])))
case "delete":
if len(fields) < 2 {
printVal("missing key")
continue
}
printVal(db.Delete(ctx, []byte(fields[1])))
case "has":
if len(fields) < 2 {
printVal("missing key")
continue
}
is, err := db.Has(ctx, []byte(fields[1]))
if err != nil {
printVal(err)
continue
}
printVal(is)
case "list":
result, err := db.Query(ctx, query.Query{})
if err != nil {
printVal(err)
continue
}
for val := range result.Next() {
fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
}
fmt.Print("> ")
case "query":
if len(fields) < 2 {
printVal("missing query condition")
continue
}
//fmt.Println(fields[1], len(fields[1]))
q := query.Query{
//Prefix: fields[1],
Filters: []query.Filter{
query.FilterKeyPrefix{
Prefix: fields[1],
},
},
}
result, err := db.Query(ctx, q)
if err != nil {
printVal(err)
continue
}
//time.Sleep(time.Second)
for val := range result.Next() {
fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
}
fmt.Print("> ")
case "connect": // 主动连接
if len(fields) < 2 {
printVal("Missing connection address")
continue
}
err = db.Connect(fields[1])
if err == nil {
printVal("connection succeeded!")
} else {
printVal(err)
}
case "slist":
result, err := db.Store().Query(ctx, query.Query{})
if err != nil {
printVal(err)
continue
}
for val := range result.Next() {
fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
}
fmt.Print("> ")
case "bquery":
if len(fields) < 2 {
printVal("missing query condition")
continue
}
db.DB().View(func(txn *badger2.Txn) error {
opts := badger2.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte(fields[1])
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(v []byte) error {
fmt.Printf("key=%s, value=%s\n", k, v)
return nil
})
if err != nil {
return err
}
}
return nil
})
case "blist":
db.DB().View(func(txn *badger2.Txn) error {
opts := badger2.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(v []byte) error {
fmt.Printf("key=%s, value=%s\n", k, v)
return nil
})
if err != nil {
return err
}
}
return nil
})
default:
printVal("")
}
}
}
func printVal(v interface{}) {
fmt.Printf("%v\n> ", v)
}
Some code reference sources #
License #
icefiredb-ipfs-log is under the Apache 2.0 license. See the LICENSE directory for details.