icefiredb-ipfs-log

icefiredb-ipfs-log #

Project introduction #

icefiredb-ipfs-log is a distributed immutable, operation-based conflict-free replication data structure that relies on ipfs to store data and merges each peer node data based on pubsub conflict-free. You can easily implement custom data structures such as kv, event, nosql, etc. based on icefiredb-ipfs-log.

Conflict-free log replication model

           Log A                Log B
             |                    |
     logA.append("one")   logB.append("hello")
             |                    |
             v                    v
          +-----+             +-------+
          |"one"|             |"hello"|
          +-----+             +-------+
             |                    |
     logA.append("two")   logB.append("world")
             |                    |
             v                    v
       +-----------+       +---------------+
       |"one","two"|       |"hello","world"|
       +-----------+       +---------------+
             |                    |
             |                    |
       logA.join(logB) <----------+
             |
             v
+---------------------------+
|"one","hello","two","world"|
+---------------------------+

Features #

  1. Easy access to P2P && ipfs-log data consistency function
  2. Stable decentralized networking function
  3. Friendly program access interface

Installing #

go get -u github.com/IceFireDB/icefiredb-ipfs-log

Example #

Example of building a key-value database using icefiredb-ipfs-log #

Use of key-value databases #

Detailed usage example reference

func main() {
    ctx := context.TODO()
    // disk cache directory
    rootPath := "./kvdb"
    node, api, err := iflog.CreateNode(ctx, rootPath)
    if err != nil {
        panic(err)
    }
	
    hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", node.PeerHost.ID().Pretty()))
    for _, a := range node.PeerHost.Addrs() {
        fmt.Println(a.Encapsulate(hostAddr).String())
    }
    
    log := zap.NewNop()
    dbname := "iflog-event-kv"
    ev, err := iflog.NewIpfsLog(ctx, api, dbname, &iflog.EventOptions{
        Directory: rootPath,
        Logger:    log,
    })
    if err != nil {
        panic(err)
    }
	
    if err := ev.AnnounceConnect(ctx, node); err != nil {
        panic(err)
    }
    kvdb, err := kv.NewKeyValueDB(ctx, ev, log)
    if err != nil {
        panic(err)
    }
    // Load old data from disk
    if err := ev.LoadDisk(ctx); err != nil {
        panic(err)
    }
    kvdb.Put(ctx, "one", "one")
    kvdb.Get("one")
    kvdb.Delete(ctx, "one")
}
package main

import (
	"bufio"
	"context"
	"fmt"
	icefiredb_crdt_kv "github.com/IceFireDB/icefiredb-crdt-kv/kv"
	badger2 "github.com/dgraph-io/badger"
	"github.com/ipfs/go-datastore/query"
	"github.com/sirupsen/logrus"
	"os"
	"strings"
)

func main() {
	ctx := context.TODO()
	log := logrus.New()
	db, err := icefiredb_crdt_kv.NewCRDTKeyValueDB(ctx, icefiredb_crdt_kv.Config{
		NodeServiceName:     "icefiredb-crdt-kv",
		DataSyncChannel:     "icefiredb-crdt-kv-data",
		NetDiscoveryChannel: "icefiredb-crdt-kv-net",
		Namespace:           "test",
		Logger:              log,
	})
	if err != nil {
		panic(err)
	}

	defer db.Close()

	fmt.Printf("> ")
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		text := scanner.Text()
		fields := strings.Fields(text)
		if len(fields) == 0 {
			fmt.Printf("> ")
			continue
		}

		cmd := fields[0]
		switch cmd {
		case "exit", "quit":
			return
		case "get":
			if len(fields) < 2 {
				printVal("missing key")
				continue
			}
			val, err := db.Get(ctx, []byte(fields[1]))
			if err != nil {
				printVal(err)
				continue
			}
			printVal(string(val))
		case "put":
			if len(fields) < 3 {
				printVal("Missing parameters")
				continue
			}

			printVal(db.Put(ctx, []byte(fields[1]), []byte(fields[2])))
		case "delete":
			if len(fields) < 2 {
				printVal("missing key")
				continue
			}
			printVal(db.Delete(ctx, []byte(fields[1])))
		case "has":
			if len(fields) < 2 {
				printVal("missing key")
				continue
			}
			is, err := db.Has(ctx, []byte(fields[1]))
			if err != nil {
				printVal(err)
				continue
			}
			printVal(is)
		case "list":
			result, err := db.Query(ctx, query.Query{})
			if err != nil {
				printVal(err)
				continue
			}
			for val := range result.Next() {
				fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
			}
			fmt.Print("> ")
		case "query":
			if len(fields) < 2 {
				printVal("missing query condition")
				continue
			}
			//fmt.Println(fields[1], len(fields[1]))
			q := query.Query{
				//Prefix: fields[1],
				Filters: []query.Filter{
					query.FilterKeyPrefix{
						Prefix: fields[1],
					},
				},
			}
			result, err := db.Query(ctx, q)
			if err != nil {
				printVal(err)
				continue
			}
			//time.Sleep(time.Second)
			for val := range result.Next() {
				fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
			}
			fmt.Print("> ")

		case "connect": // 主动连接
			if len(fields) < 2 {
				printVal("Missing connection address")
				continue
			}
			err = db.Connect(fields[1])
			if err == nil {
				printVal("connection succeeded!")
			} else {
				printVal(err)
			}
		case "slist":
			result, err := db.Store().Query(ctx, query.Query{})
			if err != nil {
				printVal(err)
				continue
			}
			for val := range result.Next() {
				fmt.Printf(fmt.Sprintf("%s => %v\n", val.Key, string(val.Value)))
			}
			fmt.Print("> ")
		case "bquery":
			if len(fields) < 2 {
				printVal("missing query condition")
				continue
			}
			db.DB().View(func(txn *badger2.Txn) error {
				opts := badger2.DefaultIteratorOptions
				opts.PrefetchSize = 10
				it := txn.NewIterator(opts)
				defer it.Close()
				prefix := []byte(fields[1])
				for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
					item := it.Item()
					k := item.Key()
					err := item.Value(func(v []byte) error {
						fmt.Printf("key=%s, value=%s\n", k, v)
						return nil
					})
					if err != nil {
						return err
					}
				}
				return nil
			})

		case "blist":
			db.DB().View(func(txn *badger2.Txn) error {
				opts := badger2.DefaultIteratorOptions
				opts.PrefetchSize = 10
				it := txn.NewIterator(opts)
				defer it.Close()
				for it.Rewind(); it.Valid(); it.Next() {
					item := it.Item()
					k := item.Key()
					err := item.Value(func(v []byte) error {
						fmt.Printf("key=%s, value=%s\n", k, v)
						return nil
					})
					if err != nil {
						return err
					}
				}
				return nil
			})
		default:
			printVal("")
		}
	}
}

func printVal(v interface{}) {
	fmt.Printf("%v\n> ", v)
}

Some code reference sources #

License #

icefiredb-ipfs-log is under the Apache 2.0 license. See the LICENSE directory for details.