add ceph backing store plugin

This commit is contained in:
Le Zhang
2018-02-16 14:48:55 +08:00
parent b4b2b735bc
commit a969203cba
25 changed files with 4498 additions and 4 deletions

View File

@@ -1,32 +1,45 @@
sudo: required
dist: trusty
env:
- TARGET=iqn.2016-09.com.gotgt.gostor:example_tgt_0
global:
- TARGET=iqn.2016-09.com.gotgt.gostor:example_tgt_0
matrix:
- TGT_CFG='{"storages":[{"deviceID":1000,"path":"file:/var/tmp/disk.img","online":true}],"iscsiportals":[{"id":0,"portal":"127.0.0.1:3260"}],"iscsitargets":{"iqn.2016-09.com.gotgt.gostor:example_tgt_0":{"tpgts":{"1":[0]},"luns":{"0":1000}}}}'
- TGT_CFG='{"storages":[{"deviceID":1000,"path":"ceph-rbd:iscsi_pool/lun0","online":true}],"iscsiportals":[{"id":0,"portal":"127.0.0.1:3260"}],"iscsitargets":{"iqn.2016-09.com.gotgt.gostor:example_tgt_0":{"tpgts":{"1":[0]},"luns":{"0":1000}}}}'
language: go
go:
- 1.6
- 1.7
- 1.8
install:
- true
before_script:
- go get github.com/kr/godep
- echo ${TGT_CFG}
- echo ${TARGET}
- sudo apt-get update
- ci/ceph_install.sh
- bash ci/ceph_micro-osd.sh /tmp/micro-ceph
- export CEPH_CONF=/tmp/micro-ceph/ceph.conf
- ceph status
- go get github.com/kr/godep
- sudo apt-get install -y libcunit1 libcunit1-doc libcunit1-dev
- sudo apt-get install -y open-iscsi
script:
- cd ${TRAVIS_BUILD_DIR}
- ./autogen.sh
- ./configure
- ./configure
- make
- hack/verify-gofmt.sh
- export GOPATH=`pwd`/Godeps/_workspace/:$GOPATH
- go test -v ./pkg/...
- dd if=/dev/zero of=/var/tmp/disk.img bs=1024 count=10240
- mkdir ${HOME}/.gotgt
- echo '{"storages":[{"deviceID":1000,"path":"file:/var/tmp/disk.img","online":true}],"iscsiportals":[{"id":0,"portal":"127.0.0.1:3260"}],"iscsitargets":{"iqn.2016-09.com.gotgt.gostor:example_tgt_0":{"tpgts":{"1":[0]},"luns":{"0":1000}}}}' > ${HOME}/.gotgt/config.json
- echo ${TGT_CFG} > ${HOME}/.gotgt/config.json
- ./gotgt daemon --log debug 1>/dev/null 2>&1 &
# libiscsi test
- mkdir ${HOME}/libiscsi

5
Godeps/Godeps.json generated
View File

@@ -58,6 +58,11 @@
{
"ImportPath": "golang.org/x/net/proxy",
"Rev": "d9558e5c97f85372afee28cf2b6059d7d3818919"
},
{
"ImportPath": "github.com/ceph/go-ceph",
"Rev": "bd5bc6d4cb3e3d3441f2ec4e9f89899178edfc71"
}
]
}

View File

@@ -0,0 +1,26 @@
dist: trusty
sudo: required
language: go
branches:
except:
- gh-pages
matrix:
include:
- env: CEPH_RELEASE=jewel
- env: CEPH_RELEASE=kraken
before_install:
- sudo apt-get update
- ci/before_install.sh
- bash ci/micro-osd.sh /tmp/micro-ceph
- export CEPH_CONF=/tmp/micro-ceph/ceph.conf
- ceph status
script:
- go get -t -v ./...
- go list ./...
- go test -v $(go list ./... | grep -v cephfs)
- go fmt ./...

View File

@@ -0,0 +1,26 @@
FROM golang:1.7.1
MAINTAINER Abhishek Lekshmanan "abhishek.lekshmanan@gmail.com"
ENV CEPH_VERSION jewel
RUN echo deb http://download.ceph.com/debian-$CEPH_VERSION/ jessie main | tee /etc/apt/sources.list.d/ceph-$CEPH_VERSION.list
# Running wget with no certificate checks, alternatively ssl-cert package should be installed
RUN wget --no-check-certificate -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | apt-key add - \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ceph \
ceph-mds \
librados-dev \
librbd-dev \
libcephfs-dev \
uuid-runtime \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
VOLUME /go/src/github.com/ceph/go-ceph
COPY ./ci/entrypoint.sh /tmp/entrypoint.sh
ENTRYPOINT ["/tmp/entrypoint.sh", "/tmp/micro-ceph"]

21
Godeps/_workspace/src/github.com/ceph/go-ceph/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Noah Watkins
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

107
Godeps/_workspace/src/github.com/ceph/go-ceph/README.md generated vendored Normal file
View File

@@ -0,0 +1,107 @@
# go-ceph - Go bindings for Ceph APIs
[![Build Status](https://travis-ci.org/ceph/go-ceph.svg)](https://travis-ci.org/ceph/go-ceph) [![Godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/ceph/go-ceph) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/ceph/go-ceph/master/LICENSE)
## Installation
go get github.com/ceph/go-ceph
The native RADOS library and development headers are expected to be installed.
## Documentation
Detailed documentation is available at
<http://godoc.org/github.com/ceph/go-ceph>.
### Connecting to a cluster
Connect to a Ceph cluster using a configuration file located in the default
search paths.
```go
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
```
A connection can be shutdown by calling the `Shutdown` method on the
connection object (e.g. `conn.Shutdown()`). There are also other methods for
configuring the connection. Specific configuration options can be set:
```go
conn.SetConfigOption("log_file", "/dev/null")
```
and command line options can also be used using the `ParseCmdLineArgs` method.
```go
args := []string{ "--mon-host", "1.1.1.1" }
err := conn.ParseCmdLineArgs(args)
```
For other configuration options see the full documentation.
### Object I/O
Object in RADOS can be written to and read from with through an interface very
similar to a standard file I/O interface:
```go
// open a pool handle
ioctx, err := conn.OpenIOContext("mypool")
// write some data
bytes_in := []byte("input data")
err = ioctx.Write("obj", bytes_in, 0)
// read the data back out
bytes_out := make([]byte, len(bytes_in))
n_out, err := ioctx.Read("obj", bytes_out, 0)
if bytes_in != bytes_out {
fmt.Println("Output is not input!")
}
```
### Pool maintenance
The list of pools in a cluster can be retreived using the `ListPools` method
on the connection object. On a new cluster the following code snippet:
```go
pools, _ := conn.ListPools()
fmt.Println(pools)
```
will produce the output `[data metadata rbd]`, along with any other pools that
might exist in your cluster. Pools can also be created and destroyed. The
following creates a new, empty pool with default settings.
```go
conn.MakePool("new_pool")
```
Deleting a pool is also easy. Call `DeletePool(name string)` on a connection object to
delete a pool with the given name. The following will delete the pool named
`new_pool` and remove all of the pool's data.
```go
conn.DeletePool("new_pool")
```
## Contributing
Contributions are welcome & greatly appreciated, every little bit helps. Make code changes via Github pull requests:
- Fork the repo and create a topic branch for every feature/fix. Avoid
making changes directly on master branch.
- All incoming features should be accompanied with tests.
- Make sure that you run `go fmt` before submitting a change
set. Alternatively the Makefile has a flag for this, so you can call
`make fmt` as well.
- The integration tests can be run in a docker container, for this run:
```
make test-docker
```

View File

@@ -0,0 +1,89 @@
package cephfs
/*
#cgo LDFLAGS: -lcephfs
#cgo CPPFLAGS: -D_FILE_OFFSET_BITS=64
#include <stdlib.h>
#include <cephfs/libcephfs.h>
*/
import "C"
import "fmt"
import "unsafe"
//
type CephError int
func (e CephError) Error() string {
return fmt.Sprintf("cephfs: ret=%d", e)
}
//
type MountInfo struct {
mount *C.struct_ceph_mount_info
}
func CreateMount() (*MountInfo, error) {
mount := &MountInfo{}
ret := C.ceph_create(&mount.mount, nil)
if ret == 0 {
return mount, nil
} else {
return nil, CephError(ret)
}
}
func (mount *MountInfo) ReadDefaultConfigFile() error {
ret := C.ceph_conf_read_file(mount.mount, nil)
if ret == 0 {
return nil
} else {
return CephError(ret)
}
}
func (mount *MountInfo) Mount() error {
ret := C.ceph_mount(mount.mount, nil)
if ret == 0 {
return nil
} else {
return CephError(ret)
}
}
func (mount *MountInfo) SyncFs() error {
ret := C.ceph_sync_fs(mount.mount)
if ret == 0 {
return nil
} else {
return CephError(ret)
}
}
func (mount *MountInfo) CurrentDir() string {
c_dir := C.ceph_getcwd(mount.mount)
return C.GoString(c_dir)
}
func (mount *MountInfo) ChangeDir(path string) error {
c_path := C.CString(path)
defer C.free(unsafe.Pointer(c_path))
ret := C.ceph_chdir(mount.mount, c_path)
if ret == 0 {
return nil
} else {
return CephError(ret)
}
}
func (mount *MountInfo) MakeDir(path string, mode uint32) error {
c_path := C.CString(path)
defer C.free(unsafe.Pointer(c_path))
ret := C.ceph_mkdir(mount.mount, c_path, C.mode_t(mode))
if ret == 0 {
return nil
} else {
return CephError(ret)
}
}

View File

@@ -0,0 +1,66 @@
package cephfs_test
import "testing"
import "github.com/ceph/go-ceph/cephfs"
import "github.com/stretchr/testify/assert"
func TestCreateMount(t *testing.T) {
mount, err := cephfs.CreateMount()
assert.NoError(t, err)
assert.NotNil(t, mount)
}
func TestMountRoot(t *testing.T) {
mount, err := cephfs.CreateMount()
assert.NoError(t, err)
assert.NotNil(t, mount)
err = mount.ReadDefaultConfigFile()
assert.NoError(t, err)
err = mount.Mount()
assert.NoError(t, err)
}
func TestSyncFs(t *testing.T) {
mount, err := cephfs.CreateMount()
assert.NoError(t, err)
assert.NotNil(t, mount)
err = mount.ReadDefaultConfigFile()
assert.NoError(t, err)
err = mount.Mount()
assert.NoError(t, err)
err = mount.SyncFs()
assert.NoError(t, err)
}
func TestChangeDir(t *testing.T) {
mount, err := cephfs.CreateMount()
assert.NoError(t, err)
assert.NotNil(t, mount)
err = mount.ReadDefaultConfigFile()
assert.NoError(t, err)
err = mount.Mount()
assert.NoError(t, err)
dir1 := mount.CurrentDir()
assert.NotNil(t, dir1)
err = mount.MakeDir("/asdf", 0755)
assert.NoError(t, err)
err = mount.ChangeDir("/asdf")
assert.NoError(t, err)
dir2 := mount.CurrentDir()
assert.NotNil(t, dir2)
assert.NotEqual(t, dir1, dir2)
assert.Equal(t, dir1, "/")
assert.Equal(t, dir2, "/asdf")
}

View File

@@ -0,0 +1,27 @@
#!/bin/bash
set -e
set -x
sudo apt-get install -y python-virtualenv
# ceph-deploy and ceph
WORKDIR=$HOME/workdir
mkdir $WORKDIR
pushd $WORKDIR
ssh-keygen -f $HOME/.ssh/id_rsa -t rsa -N ''
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
git clone git://github.com/ceph/ceph-deploy
pushd ceph-deploy
./bootstrap
./ceph-deploy install --release ${CEPH_RELEASE} `hostname`
./ceph-deploy pkg --install librados-dev `hostname`
./ceph-deploy pkg --install librbd-dev `hostname`
./ceph-deploy pkg --install libcephfs-dev `hostname`
popd # ceph-deploy
popd # workdir

View File

@@ -0,0 +1,114 @@
#!/bin/bash
#
# Copyright (C) 2013,2014 Loic Dachary <loic@dachary.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
set -e
set -u
DIR=$1
#if ! dpkg -l ceph ; then
# wget -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | sudo apt-key add -
# echo deb http://ceph.com/debian-dumpling/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list
# sudo apt-get update
# sudo apt-get --yes install ceph ceph-common
#fi
# get rid of process and directories leftovers
pkill ceph-mon || true
pkill ceph-osd || true
rm -fr $DIR
# cluster wide parameters
mkdir -p ${DIR}/log
cat >> $DIR/ceph.conf <<EOF
[global]
fsid = $(uuidgen)
osd crush chooseleaf type = 0
run dir = ${DIR}/run
auth cluster required = none
auth service required = none
auth client required = none
osd pool default size = 1
EOF
export CEPH_ARGS="--conf ${DIR}/ceph.conf"
# single monitor
MON_DATA=${DIR}/mon
mkdir -p $MON_DATA
cat >> $DIR/ceph.conf <<EOF
[mon.0]
log file = ${DIR}/log/mon.log
chdir = ""
mon cluster log file = ${DIR}/log/mon-cluster.log
mon data = ${MON_DATA}
mon addr = 127.0.0.1
EOF
ceph-mon --id 0 --mkfs --keyring /dev/null
touch ${MON_DATA}/keyring
ceph-mon --id 0
# single osd
OSD_DATA=${DIR}/osd
mkdir ${OSD_DATA}
cat >> $DIR/ceph.conf <<EOF
[osd.0]
log file = ${DIR}/log/osd.log
chdir = ""
osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
EOF
OSD_ID=$(ceph osd create)
ceph osd crush add osd.${OSD_ID} 1 root=default host=localhost
ceph-osd --id ${OSD_ID} --mkjournal --mkfs
ceph-osd --id ${OSD_ID}
# single mds
MDS_DATA=${DIR}/mds.a
mkdir ${MDS_DATA}
cat >> $DIR/ceph.conf <<EOF
[mds.a]
mds data = ${MDS_DATA}
mds log max segments = 2
mds cache size = 10000
host = localhost
EOF
ceph-authtool --create-keyring --gen-key --name=mds.a ${MDS_DATA}/keyring
ceph -i ${MDS_DATA}/keyring auth add mds.a mon 'allow profile mds' osd 'allow *' mds 'allow'
ceph osd pool create cephfs_data 8
ceph osd pool create cephfs_metadata 8
ceph fs new cephfs cephfs_metadata cephfs_data
ceph-mds -i a
# check that it works
rados --pool rbd put group /etc/group
rados --pool rbd get group ${DIR}/group
diff /etc/group ${DIR}/group
ceph osd tree
export CEPH_CONF="${DIR}/ceph.conf"
go get github.com/stretchr/testify/assert
cd /go/src/github.com/ceph/go-ceph
exec go test -v ./...

View File

@@ -0,0 +1,119 @@
#
# Copyright (C) 2013,2014 Loic Dachary <loic@dachary.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
set -e
set -u
DIR=$1
#if ! dpkg -l ceph ; then
# wget -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | sudo apt-key add -
# echo deb http://ceph.com/debian-dumpling/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list
# sudo apt-get update
# sudo apt-get --yes install ceph ceph-common
#fi
# get rid of process and directories leftovers
pkill ceph-mon || true
pkill ceph-osd || true
rm -fr $DIR
# cluster wide parameters
mkdir -p ${DIR}/log
cat >> $DIR/ceph.conf <<EOF
[global]
fsid = $(uuidgen)
osd crush chooseleaf type = 0
run dir = ${DIR}/run
auth cluster required = none
auth service required = none
auth client required = none
osd pool default size = 1
mon allow pool delete = true
EOF
export CEPH_ARGS="--conf ${DIR}/ceph.conf"
# single monitor
MON_DATA=${DIR}/mon
mkdir -p $MON_DATA
cat >> $DIR/ceph.conf <<EOF
[mon.0]
log file = ${DIR}/log/mon.log
chdir = ""
mon cluster log file = ${DIR}/log/mon-cluster.log
mon data = ${MON_DATA}
mon addr = 127.0.0.1
EOF
ceph-mon --id 0 --mkfs --keyring /dev/null
touch ${MON_DATA}/keyring
ceph-mon --id 0
# single osd
OSD_DATA=${DIR}/osd
mkdir ${OSD_DATA}
cat >> $DIR/ceph.conf <<EOF
[osd.0]
log file = ${DIR}/log/osd.log
chdir = ""
osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
EOF
OSD_ID=$(ceph osd create)
ceph osd crush add osd.${OSD_ID} 1 root=default host=localhost
ceph-osd --id ${OSD_ID} --mkjournal --mkfs
ceph-osd --id ${OSD_ID}
# single mds
MDS_DATA=${DIR}/mds.a
mkdir ${MDS_DATA}
cat >> $DIR/ceph.conf <<EOF
[mds.a]
mds data = ${MDS_DATA}
mds log max segments = 2
mds cache size = 10000
host = localhost
EOF
ceph-authtool --create-keyring --gen-key --name=mds.a ${MDS_DATA}/keyring
ceph -i ${MDS_DATA}/keyring auth add mds.a mon 'allow profile mds' osd 'allow *' mds 'allow'
ceph osd pool create cephfs_data 8
ceph osd pool create cephfs_metadata 8
ceph fs new cephfs cephfs_metadata cephfs_data
ceph-mds -i a
export CEPH_CONF="${DIR}/ceph.conf"
while true; do
if ceph status | tee /dev/tty | grep -q HEALTH_OK; then
if ! ceph status | grep -q creating &> /dev/null; then
break
fi
fi
sleep 1
done
# check that it works
rados --pool rbd put group /etc/group
rados --pool rbd get group ${DIR}/group
diff /etc/group ${DIR}/group
ceph osd tree

9
Godeps/_workspace/src/github.com/ceph/go-ceph/doc.go generated vendored Normal file
View File

@@ -0,0 +1,9 @@
/*
Set of wrappers around Ceph APIs.
*/
package rados
import (
_ "github.com/ceph/go-ceph/rados"
_ "github.com/ceph/go-ceph/rbd"
)

View File

@@ -0,0 +1,12 @@
package rados
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestImports(t *testing.T) {
if assert.Equal(t, 1, 1) != true {
t.Error("Something is wrong.")
}
}

View File

@@ -0,0 +1,308 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import "unsafe"
import "bytes"
// ClusterStat represents Ceph cluster statistics.
type ClusterStat struct {
Kb uint64
Kb_used uint64
Kb_avail uint64
Num_objects uint64
}
// Conn is a connection handle to a Ceph cluster.
type Conn struct {
cluster C.rados_t
}
// PingMonitor sends a ping to a monitor and returns the reply.
func (c *Conn) PingMonitor(id string) (string, error) {
c_id := C.CString(id)
defer C.free(unsafe.Pointer(c_id))
var strlen C.size_t
var strout *C.char
ret := C.rados_ping_monitor(c.cluster, c_id, &strout, &strlen)
defer C.rados_buffer_free(strout)
if ret == 0 {
reply := C.GoStringN(strout, (C.int)(strlen))
return reply, nil
} else {
return "", RadosError(int(ret))
}
}
// Connect establishes a connection to a RADOS cluster. It returns an error,
// if any.
func (c *Conn) Connect() error {
ret := C.rados_connect(c.cluster)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Shutdown disconnects from the cluster.
func (c *Conn) Shutdown() {
C.rados_shutdown(c.cluster)
}
// ReadConfigFile configures the connection using a Ceph configuration file.
func (c *Conn) ReadConfigFile(path string) error {
c_path := C.CString(path)
defer C.free(unsafe.Pointer(c_path))
ret := C.rados_conf_read_file(c.cluster, c_path)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// ReadDefaultConfigFile configures the connection using a Ceph configuration
// file located at default locations.
func (c *Conn) ReadDefaultConfigFile() error {
ret := C.rados_conf_read_file(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
func (c *Conn) OpenIOContext(pool string) (*IOContext, error) {
c_pool := C.CString(pool)
defer C.free(unsafe.Pointer(c_pool))
ioctx := &IOContext{}
ret := C.rados_ioctx_create(c.cluster, c_pool, &ioctx.ioctx)
if ret == 0 {
return ioctx, nil
} else {
return nil, RadosError(int(ret))
}
}
// ListPools returns the names of all existing pools.
func (c *Conn) ListPools() (names []string, err error) {
buf := make([]byte, 4096)
for {
ret := int(C.rados_pool_list(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
if ret < 0 {
return nil, RadosError(int(ret))
}
if ret > len(buf) {
buf = make([]byte, ret)
continue
}
tmp := bytes.SplitAfter(buf[:ret-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
names = append(names, name)
}
}
return names, nil
}
}
// SetConfigOption sets the value of the configuration option identified by
// the given name.
func (c *Conn) SetConfigOption(option, value string) error {
c_opt, c_val := C.CString(option), C.CString(value)
defer C.free(unsafe.Pointer(c_opt))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_conf_set(c.cluster, c_opt, c_val)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetConfigOption returns the value of the Ceph configuration option
// identified by the given name.
func (c *Conn) GetConfigOption(name string) (value string, err error) {
buf := make([]byte, 4096)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_conf_get(c.cluster, c_name,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: ret may be -ENAMETOOLONG if the buffer is not large enough. We
// can handle this case, but we need a reliable way to test for
// -ENAMETOOLONG constant. Will the syscall/Errno stuff in Go help?
if ret == 0 {
value = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return value, nil
} else {
return "", RadosError(ret)
}
}
// WaitForLatestOSDMap blocks the caller until the latest OSD map has been
// retrieved.
func (c *Conn) WaitForLatestOSDMap() error {
ret := C.rados_wait_for_latest_osdmap(c.cluster)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetClusterStat returns statistics about the cluster associated with the
// connection.
func (c *Conn) GetClusterStats() (stat ClusterStat, err error) {
c_stat := C.struct_rados_cluster_stat_t{}
ret := C.rados_cluster_stat(c.cluster, &c_stat)
if ret < 0 {
return ClusterStat{}, RadosError(int(ret))
} else {
return ClusterStat{
Kb: uint64(c_stat.kb),
Kb_used: uint64(c_stat.kb_used),
Kb_avail: uint64(c_stat.kb_avail),
Num_objects: uint64(c_stat.num_objects),
}, nil
}
}
// ParseCmdLineArgs configures the connection from command line arguments.
func (c *Conn) ParseCmdLineArgs(args []string) error {
// add an empty element 0 -- Ceph treats the array as the actual contents
// of argv and skips the first element (the executable name)
argc := C.int(len(args) + 1)
argv := make([]*C.char, argc)
// make the first element a string just in case it is ever examined
argv[0] = C.CString("placeholder")
defer C.free(unsafe.Pointer(argv[0]))
for i, arg := range args {
argv[i+1] = C.CString(arg)
defer C.free(unsafe.Pointer(argv[i+1]))
}
ret := C.rados_conf_parse_argv(c.cluster, argc, &argv[0])
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// ParseDefaultConfigEnv configures the connection from the default Ceph
// environment variable(s).
func (c *Conn) ParseDefaultConfigEnv() error {
ret := C.rados_conf_parse_env(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// GetFSID returns the fsid of the cluster as a hexadecimal string. The fsid
// is a unique identifier of an entire Ceph cluster.
func (c *Conn) GetFSID() (fsid string, err error) {
buf := make([]byte, 37)
ret := int(C.rados_cluster_fsid(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: the success case isn't documented correctly in librados.h
if ret == 36 {
fsid = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return fsid, nil
} else {
return "", RadosError(int(ret))
}
}
// GetInstanceID returns a globally unique identifier for the cluster
// connection instance.
func (c *Conn) GetInstanceID() uint64 {
// FIXME: are there any error cases for this?
return uint64(C.rados_get_instance_id(c.cluster))
}
// MakePool creates a new pool with default settings.
func (c *Conn) MakePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_create(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// DeletePool deletes a pool and all the data inside the pool.
func (c *Conn) DeletePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_delete(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// MonCommand sends a command to one of the monitors
func (c *Conn) MonCommand(args []byte) (buffer []byte, info string, err error) {
return c.monCommand(args, nil)
}
// MonCommand sends a command to one of the monitors, with an input buffer
func (c *Conn) MonCommandWithInputBuffer(args, inputBuffer []byte) (buffer []byte, info string, err error) {
return c.monCommand(args, inputBuffer)
}
func (c *Conn) monCommand(args, inputBuffer []byte) (buffer []byte, info string, err error) {
argv := C.CString(string(args))
defer C.free(unsafe.Pointer(argv))
var (
outs, outbuf *C.char
outslen, outbuflen C.size_t
)
inbuf := C.CString(string(inputBuffer))
inbufLen := len(inputBuffer)
defer C.free(unsafe.Pointer(inbuf))
ret := C.rados_mon_command(c.cluster,
&argv, 1,
inbuf, // bulk input (e.g. crush map)
C.size_t(inbufLen), // length inbuf
&outbuf, // buffer
&outbuflen, // buffer length
&outs, // status string
&outslen)
if outslen > 0 {
info = C.GoStringN(outs, C.int(outslen))
C.free(unsafe.Pointer(outs))
}
if outbuflen > 0 {
buffer = C.GoBytes(unsafe.Pointer(outbuf), C.int(outbuflen))
C.free(unsafe.Pointer(outbuf))
}
if ret != 0 {
err = RadosError(int(ret))
return nil, info, err
}
return
}

View File

@@ -0,0 +1,4 @@
/*
Set of wrappers around librados API.
*/
package rados

View File

@@ -0,0 +1,872 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <errno.h>
// #include <stdlib.h>
// #include <rados/librados.h>
//
// char* nextChunk(char **idx) {
// char *copy;
// copy = strdup(*idx);
// *idx += strlen(*idx) + 1;
// return copy;
// }
//
// #if __APPLE__
// #define ceph_time_t __darwin_time_t
// #define ceph_suseconds_t __darwin_suseconds_t
// #elif __GLIBC__
// #define ceph_time_t __time_t
// #define ceph_suseconds_t __suseconds_t
// #else
// #define ceph_time_t time_t
// #define ceph_suseconds_t suseconds_t
// #endif
import "C"
import (
"syscall"
"time"
"unsafe"
)
// PoolStat represents Ceph pool statistics.
type PoolStat struct {
// space used in bytes
Num_bytes uint64
// space used in KB
Num_kb uint64
// number of objects in the pool
Num_objects uint64
// number of clones of objects
Num_object_clones uint64
// num_objects * num_replicas
Num_object_copies uint64
Num_objects_missing_on_primary uint64
// number of objects found on no OSDs
Num_objects_unfound uint64
// number of objects replicated fewer times than they should be
// (but found on at least one OSD)
Num_objects_degraded uint64
Num_rd uint64
Num_rd_kb uint64
Num_wr uint64
Num_wr_kb uint64
}
// ObjectStat represents an object stat information
type ObjectStat struct {
// current length in bytes
Size uint64
// last modification time
ModTime time.Time
}
// LockInfo represents information on a current Ceph lock
type LockInfo struct {
NumLockers int
Exclusive bool
Tag string
Clients []string
Cookies []string
Addrs []string
}
// IOContext represents a context for performing I/O within a pool.
type IOContext struct {
ioctx C.rados_ioctx_t
}
// Pointer returns a uintptr representation of the IOContext.
func (ioctx *IOContext) Pointer() uintptr {
return uintptr(ioctx.ioctx)
}
// SetNamespace sets the namespace for objects within this IO context (pool).
// Setting namespace to a empty or zero length string sets the pool to the default namespace.
func (ioctx *IOContext) SetNamespace(namespace string) {
var c_ns *C.char
if len(namespace) > 0 {
c_ns = C.CString(namespace)
defer C.free(unsafe.Pointer(c_ns))
}
C.rados_ioctx_set_namespace(ioctx.ioctx, c_ns)
}
// Write writes len(data) bytes to the object with key oid starting at byte
// offset offset. It returns an error, if any.
func (ioctx *IOContext) Write(oid string, data []byte, offset uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_write(ioctx.ioctx, c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
return GetRadosError(int(ret))
}
// WriteFull writes len(data) bytes to the object with key oid.
// The object is filled with the provided data. If the object exists,
// it is atomically truncated and then written. It returns an error, if any.
func (ioctx *IOContext) WriteFull(oid string, data []byte) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_write_full(ioctx.ioctx, c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
return GetRadosError(int(ret))
}
// Append appends len(data) bytes to the object with key oid.
// The object is appended with the provided data. If the object exists,
// it is atomically appended to. It returns an error, if any.
func (ioctx *IOContext) Append(oid string, data []byte) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_append(ioctx.ioctx, c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
return GetRadosError(int(ret))
}
// Read reads up to len(data) bytes from the object with key oid starting at byte
// offset offset. It returns the number of bytes read and an error, if any.
func (ioctx *IOContext) Read(oid string, data []byte, offset uint64) (int, error) {
if len(data) == 0 {
return 0, nil
}
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_read(
ioctx.ioctx,
c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
if ret >= 0 {
return int(ret), nil
} else {
return 0, GetRadosError(int(ret))
}
}
// Delete deletes the object with key oid. It returns an error, if any.
func (ioctx *IOContext) Delete(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
return GetRadosError(int(C.rados_remove(ioctx.ioctx, c_oid)))
}
// Truncate resizes the object with key oid to size size. If the operation
// enlarges the object, the new area is logically filled with zeroes. If the
// operation shrinks the object, the excess data is removed. It returns an
// error, if any.
func (ioctx *IOContext) Truncate(oid string, size uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
return GetRadosError(int(C.rados_trunc(ioctx.ioctx, c_oid, (C.uint64_t)(size))))
}
// Destroy informs librados that the I/O context is no longer in use.
// Resources associated with the context may not be freed immediately, and the
// context should not be used again after calling this method.
func (ioctx *IOContext) Destroy() {
C.rados_ioctx_destroy(ioctx.ioctx)
}
// Stat returns a set of statistics about the pool associated with this I/O
// context.
func (ioctx *IOContext) GetPoolStats() (stat PoolStat, err error) {
c_stat := C.struct_rados_pool_stat_t{}
ret := C.rados_ioctx_pool_stat(ioctx.ioctx, &c_stat)
if ret < 0 {
return PoolStat{}, GetRadosError(int(ret))
} else {
return PoolStat{
Num_bytes: uint64(c_stat.num_bytes),
Num_kb: uint64(c_stat.num_kb),
Num_objects: uint64(c_stat.num_objects),
Num_object_clones: uint64(c_stat.num_object_clones),
Num_object_copies: uint64(c_stat.num_object_copies),
Num_objects_missing_on_primary: uint64(c_stat.num_objects_missing_on_primary),
Num_objects_unfound: uint64(c_stat.num_objects_unfound),
Num_objects_degraded: uint64(c_stat.num_objects_degraded),
Num_rd: uint64(c_stat.num_rd),
Num_rd_kb: uint64(c_stat.num_rd_kb),
Num_wr: uint64(c_stat.num_wr),
Num_wr_kb: uint64(c_stat.num_wr_kb),
}, nil
}
}
// GetPoolName returns the name of the pool associated with the I/O context.
func (ioctx *IOContext) GetPoolName() (name string, err error) {
buf := make([]byte, 128)
for {
ret := C.rados_ioctx_get_pool_name(ioctx.ioctx,
(*C.char)(unsafe.Pointer(&buf[0])), C.unsigned(len(buf)))
if ret == -34 { // FIXME
buf = make([]byte, len(buf)*2)
continue
} else if ret < 0 {
return "", GetRadosError(int(ret))
}
name = C.GoStringN((*C.char)(unsafe.Pointer(&buf[0])), ret)
return name, nil
}
}
// ObjectListFunc is the type of the function called for each object visited
// by ListObjects.
type ObjectListFunc func(oid string)
// ListObjects lists all of the objects in the pool associated with the I/O
// context, and called the provided listFn function for each object, passing
// to the function the name of the object.
func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error {
var ctx C.rados_list_ctx_t
ret := C.rados_nobjects_list_open(ioctx.ioctx, &ctx)
if ret < 0 {
return GetRadosError(int(ret))
}
defer func() { C.rados_nobjects_list_close(ctx) }()
for {
var c_entry *C.char
ret := C.rados_nobjects_list_next(ctx, &c_entry, nil, nil)
if ret == -2 { // FIXME
return nil
} else if ret < 0 {
return GetRadosError(int(ret))
}
listFn(C.GoString(c_entry))
}
panic("invalid state")
}
// Stat returns the size of the object and its last modification time
func (ioctx *IOContext) Stat(object string) (stat ObjectStat, err error) {
var c_psize C.uint64_t
var c_pmtime C.time_t
c_object := C.CString(object)
defer C.free(unsafe.Pointer(c_object))
ret := C.rados_stat(
ioctx.ioctx,
c_object,
&c_psize,
&c_pmtime)
if ret < 0 {
return ObjectStat{}, GetRadosError(int(ret))
} else {
return ObjectStat{
Size: uint64(c_psize),
ModTime: time.Unix(int64(c_pmtime), 0),
}, nil
}
}
// GetXattr gets an xattr with key `name`, it returns the length of
// the key read or an error if not successful
func (ioctx *IOContext) GetXattr(object string, name string, data []byte) (int, error) {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_getxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
if ret >= 0 {
return int(ret), nil
} else {
return 0, GetRadosError(int(ret))
}
}
// Sets an xattr for an object with key `name` with value as `data`
func (ioctx *IOContext) SetXattr(object string, name string, data []byte) error {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_setxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
return GetRadosError(int(ret))
}
// function that lists all the xattrs for an object, since xattrs are
// a k-v pair, this function returns a map of k-v pairs on
// success, error code on failure
func (ioctx *IOContext) ListXattrs(oid string) (map[string][]byte, error) {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var it C.rados_xattrs_iter_t
ret := C.rados_getxattrs(ioctx.ioctx, c_oid, &it)
if ret < 0 {
return nil, GetRadosError(int(ret))
}
defer func() { C.rados_getxattrs_end(it) }()
m := make(map[string][]byte)
for {
var c_name, c_val *C.char
var c_len C.size_t
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_getxattrs_next(it, &c_name, &c_val, &c_len)
if ret < 0 {
return nil, GetRadosError(int(ret))
}
// rados api returns a null name,val & 0-length upon
// end of iteration
if c_name == nil {
return m, nil // stop iteration
}
m[C.GoString(c_name)] = C.GoBytes(unsafe.Pointer(c_val), (C.int)(c_len))
}
}
// Remove an xattr with key `name` from object `oid`
func (ioctx *IOContext) RmXattr(oid string, name string) error {
c_oid := C.CString(oid)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_rmxattr(
ioctx.ioctx,
c_oid,
c_name)
return GetRadosError(int(ret))
}
// Append the map `pairs` to the omap `oid`
func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var s C.size_t
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_values := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_lengths := C.malloc(C.size_t(len(pairs)) * C.size_t(unsafe.Sizeof(s)))
defer C.free(unsafe.Pointer(c_keys))
defer C.free(unsafe.Pointer(c_values))
defer C.free(unsafe.Pointer(c_lengths))
i := 0
for key, value := range pairs {
// key
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i)*ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
// value and its length
c_value_ptr := (**C.char)(unsafe.Pointer(uintptr(c_values) + uintptr(i)*ptrSize))
var c_length C.size_t
if len(value) > 0 {
*c_value_ptr = (*C.char)(unsafe.Pointer(&value[0]))
c_length = C.size_t(len(value))
} else {
*c_value_ptr = nil
c_length = C.size_t(0)
}
c_length_ptr := (*C.size_t)(unsafe.Pointer(uintptr(c_lengths) + uintptr(i)*ptrSize))
*c_length_ptr = c_length
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_set(
op,
(**C.char)(c_keys),
(**C.char)(c_values),
(*C.size_t)(c_lengths),
C.size_t(len(pairs)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
return GetRadosError(int(ret))
}
// OmapListFunc is the type of the function called for each omap key
// visited by ListOmapValues
type OmapListFunc func(key string, value []byte)
// Iterate on a set of keys and their values from an omap
// `startAfter`: iterate only on the keys after this specified one
// `filterPrefix`: iterate only on the keys beginning with this prefix
// `maxReturn`: iterate no more than `maxReturn` key/value pairs
// `listFn`: the function called at each iteration
func (ioctx *IOContext) ListOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64, listFn OmapListFunc) error {
c_oid := C.CString(oid)
c_start_after := C.CString(startAfter)
c_filter_prefix := C.CString(filterPrefix)
c_max_return := C.uint64_t(maxReturn)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_start_after))
defer C.free(unsafe.Pointer(c_filter_prefix))
op := C.rados_create_read_op()
var c_iter C.rados_omap_iter_t
var c_prval C.int
C.rados_read_op_omap_get_vals(
op,
c_start_after,
c_filter_prefix,
c_max_return,
&c_iter,
&c_prval,
)
ret := C.rados_read_op_operate(op, ioctx.ioctx, c_oid, 0)
if int(ret) != 0 {
return GetRadosError(int(ret))
} else if int(c_prval) != 0 {
return RadosError(int(c_prval))
}
for {
var c_key *C.char
var c_val *C.char
var c_len C.size_t
ret = C.rados_omap_get_next(c_iter, &c_key, &c_val, &c_len)
if int(ret) != 0 {
return GetRadosError(int(ret))
}
if c_key == nil {
break
}
listFn(C.GoString(c_key), C.GoBytes(unsafe.Pointer(c_val), C.int(c_len)))
}
C.rados_omap_get_end(c_iter)
C.rados_release_read_op(op)
return nil
}
// Fetch a set of keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `maxReturn`: retrieve no more than `maxReturn` key/value pairs
func (ioctx *IOContext) GetOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64) (map[string][]byte, error) {
omap := map[string][]byte{}
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, maxReturn,
func(key string, value []byte) {
omap[key] = value
},
)
return omap, err
}
// Fetch all the keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `iteratorSize`: internal number of keys to fetch during a read operation
func (ioctx *IOContext) GetAllOmapValues(oid string, startAfter string, filterPrefix string, iteratorSize int64) (map[string][]byte, error) {
omap := map[string][]byte{}
omapSize := 0
for {
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, iteratorSize,
func(key string, value []byte) {
omap[key] = value
startAfter = key
},
)
if err != nil {
return omap, err
}
// End of omap
if len(omap) == omapSize {
break
}
omapSize = len(omap)
}
return omap, nil
}
// Remove the specified `keys` from the omap `oid`
func (ioctx *IOContext) RmOmapKeys(oid string, keys []string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(keys)) * C.size_t(ptrSize))
defer C.free(unsafe.Pointer(c_keys))
i := 0
for _, key := range keys {
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i)*ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_rm_keys(
op,
(**C.char)(c_keys),
C.size_t(len(keys)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
return GetRadosError(int(ret))
}
// Clear the omap `oid`
func (ioctx *IOContext) CleanOmap(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
op := C.rados_create_write_op()
C.rados_write_op_omap_clear(op)
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
return GetRadosError(int(ret))
}
type Iter struct {
ctx C.rados_list_ctx_t
err error
entry string
}
type IterToken uint32
// Return a Iterator object that can be used to list the object names in the current pool
func (ioctx *IOContext) Iter() (*Iter, error) {
iter := Iter{}
if cerr := C.rados_nobjects_list_open(ioctx.ioctx, &iter.ctx); cerr < 0 {
return nil, GetRadosError(int(cerr))
}
return &iter, nil
}
// Returns a token marking the current position of the iterator. To be used in combination with Iter.Seek()
func (iter *Iter) Token() IterToken {
return IterToken(C.rados_nobjects_list_get_pg_hash_position(iter.ctx))
}
func (iter *Iter) Seek(token IterToken) {
C.rados_nobjects_list_seek(iter.ctx, C.uint32_t(token))
}
// Next retrieves the next object name in the pool/namespace iterator.
// Upon a successful invocation (return value of true), the Value method should
// be used to obtain the name of the retrieved object name. When the iterator is
// exhausted, Next returns false. The Err method should used to verify whether the
// end of the iterator was reached, or the iterator received an error.
//
// Example:
// iter := pool.Iter()
// defer iter.Close()
// for iter.Next() {
// fmt.Printf("%v\n", iter.Value())
// }
// return iter.Err()
//
func (iter *Iter) Next() bool {
var c_entry *C.char
if cerr := C.rados_nobjects_list_next(iter.ctx, &c_entry, nil, nil); cerr < 0 {
iter.err = GetRadosError(int(cerr))
return false
}
iter.entry = C.GoString(c_entry)
return true
}
// Returns the current value of the iterator (object name), after a successful call to Next.
func (iter *Iter) Value() string {
if iter.err != nil {
return ""
}
return iter.entry
}
// Checks whether the iterator has encountered an error.
func (iter *Iter) Err() error {
if iter.err == RadosErrorNotFound {
return nil
}
return iter.err
}
// Closes the iterator cursor on the server. Be aware that iterators are not closed automatically
// at the end of iteration.
func (iter *Iter) Close() {
C.rados_nobjects_list_close(iter.ctx)
}
// Take an exclusive lock on an object.
func (ioctx *IOContext) LockExclusive(oid, name, cookie, desc string, duration time.Duration, flags *byte) (int, error) {
c_oid := C.CString(oid)
c_name := C.CString(name)
c_cookie := C.CString(cookie)
c_desc := C.CString(desc)
var c_duration C.struct_timeval
if duration != 0 {
tv := syscall.NsecToTimeval(duration.Nanoseconds())
c_duration = C.struct_timeval{tv_sec: C.ceph_time_t(tv.Sec), tv_usec: C.ceph_suseconds_t(tv.Usec)}
}
var c_flags C.uint8_t
if flags != nil {
c_flags = C.uint8_t(*flags)
}
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_cookie))
defer C.free(unsafe.Pointer(c_desc))
ret := C.rados_lock_exclusive(
ioctx.ioctx,
c_oid,
c_name,
c_cookie,
c_desc,
&c_duration,
c_flags)
// 0 on success, negative error code on failure
// -EBUSY if the lock is already held by another (client, cookie) pair
// -EEXIST if the lock is already held by the same (client, cookie) pair
switch ret {
case 0:
return int(ret), nil
case -16: // EBUSY
return int(ret), nil
case -17: // EEXIST
return int(ret), nil
default:
return int(ret), RadosError(int(ret))
}
}
// Take a shared lock on an object.
func (ioctx *IOContext) LockShared(oid, name, cookie, tag, desc string, duration time.Duration, flags *byte) (int, error) {
c_oid := C.CString(oid)
c_name := C.CString(name)
c_cookie := C.CString(cookie)
c_tag := C.CString(tag)
c_desc := C.CString(desc)
var c_duration C.struct_timeval
if duration != 0 {
tv := syscall.NsecToTimeval(duration.Nanoseconds())
c_duration = C.struct_timeval{tv_sec: C.ceph_time_t(tv.Sec), tv_usec: C.ceph_suseconds_t(tv.Usec)}
}
var c_flags C.uint8_t
if flags != nil {
c_flags = C.uint8_t(*flags)
}
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_cookie))
defer C.free(unsafe.Pointer(c_tag))
defer C.free(unsafe.Pointer(c_desc))
ret := C.rados_lock_shared(
ioctx.ioctx,
c_oid,
c_name,
c_cookie,
c_tag,
c_desc,
&c_duration,
c_flags)
// 0 on success, negative error code on failure
// -EBUSY if the lock is already held by another (client, cookie) pair
// -EEXIST if the lock is already held by the same (client, cookie) pair
switch ret {
case 0:
return int(ret), nil
case -16: // EBUSY
return int(ret), nil
case -17: // EEXIST
return int(ret), nil
default:
return int(ret), RadosError(int(ret))
}
}
// Release a shared or exclusive lock on an object.
func (ioctx *IOContext) Unlock(oid, name, cookie string) (int, error) {
c_oid := C.CString(oid)
c_name := C.CString(name)
c_cookie := C.CString(cookie)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_cookie))
// 0 on success, negative error code on failure
// -ENOENT if the lock is not held by the specified (client, cookie) pair
ret := C.rados_unlock(
ioctx.ioctx,
c_oid,
c_name,
c_cookie)
switch ret {
case 0:
return int(ret), nil
case -2: // -ENOENT
return int(ret), nil
default:
return int(ret), RadosError(int(ret))
}
}
// List clients that have locked the named object lock and information about the lock.
// The number of bytes required in each buffer is put in the corresponding size out parameter.
// If any of the provided buffers are too short, -ERANGE is returned after these sizes are filled in.
func (ioctx *IOContext) ListLockers(oid, name string) (*LockInfo, error) {
c_oid := C.CString(oid)
c_name := C.CString(name)
c_tag := (*C.char)(C.malloc(C.size_t(1024)))
c_clients := (*C.char)(C.malloc(C.size_t(1024)))
c_cookies := (*C.char)(C.malloc(C.size_t(1024)))
c_addrs := (*C.char)(C.malloc(C.size_t(1024)))
var c_exclusive C.int
c_tag_len := C.size_t(1024)
c_clients_len := C.size_t(1024)
c_cookies_len := C.size_t(1024)
c_addrs_len := C.size_t(1024)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_tag))
defer C.free(unsafe.Pointer(c_clients))
defer C.free(unsafe.Pointer(c_cookies))
defer C.free(unsafe.Pointer(c_addrs))
ret := C.rados_list_lockers(
ioctx.ioctx,
c_oid,
c_name,
&c_exclusive,
c_tag,
&c_tag_len,
c_clients,
&c_clients_len,
c_cookies,
&c_cookies_len,
c_addrs,
&c_addrs_len)
splitCString := func(items *C.char, itemsLen C.size_t) []string {
currLen := 0
clients := []string{}
for currLen < int(itemsLen) {
client := C.GoString(C.nextChunk(&items))
clients = append(clients, client)
currLen += len(client) + 1
}
return clients
}
if ret < 0 {
return nil, RadosError(int(ret))
} else {
return &LockInfo{int(ret), c_exclusive == 1, C.GoString(c_tag), splitCString(c_clients, c_clients_len), splitCString(c_cookies, c_cookies_len), splitCString(c_addrs, c_addrs_len)}, nil
}
}
// Releases a shared or exclusive lock on an object, which was taken by the specified client.
func (ioctx *IOContext) BreakLock(oid, name, client, cookie string) (int, error) {
c_oid := C.CString(oid)
c_name := C.CString(name)
c_client := C.CString(client)
c_cookie := C.CString(cookie)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_client))
defer C.free(unsafe.Pointer(c_cookie))
// 0 on success, negative error code on failure
// -ENOENT if the lock is not held by the specified (client, cookie) pair
// -EINVAL if the client cannot be parsed
ret := C.rados_break_lock(
ioctx.ioctx,
c_oid,
c_name,
c_client,
c_cookie)
switch ret {
case 0:
return int(ret), nil
case -2: // -ENOENT
return int(ret), nil
case -22: // -EINVAL
return int(ret), nil
default:
return int(ret), RadosError(int(ret))
}
}

View File

@@ -0,0 +1,83 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <errno.h>
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import (
"fmt"
"unsafe"
)
type RadosError int
func (e RadosError) Error() string {
return fmt.Sprintf("rados: %s", C.GoString(C.strerror(C.int(-e))))
}
var RadosErrorNotFound = RadosError(-C.ENOENT)
var RadosErrorPermissionDenied = RadosError(-C.EPERM)
func GetRadosError(err int) error {
if err == 0 {
return nil
}
return RadosError(err)
}
// Version returns the major, minor, and patch components of the version of
// the RADOS library linked against.
func Version() (int, int, int) {
var c_major, c_minor, c_patch C.int
C.rados_version(&c_major, &c_minor, &c_patch)
return int(c_major), int(c_minor), int(c_patch)
}
// NewConn creates a new connection object. It returns the connection and an
// error, if any.
func NewConn() (*Conn, error) {
conn := &Conn{}
ret := C.rados_create(&conn.cluster, nil)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}
// NewConnWithUser creates a new connection object with a custom username.
// It returns the connection and an error, if any.
func NewConnWithUser(user string) (*Conn, error) {
c_user := C.CString(user)
defer C.free(unsafe.Pointer(c_user))
conn := &Conn{}
ret := C.rados_create(&conn.cluster, c_user)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}
// NewConnWithClusterAndUser creates a new connection object for a specific cluster and username.
// It returns the connection and an error, if any.
func NewConnWithClusterAndUser(clusterName string, userName string) (*Conn, error) {
c_cluster_name := C.CString(clusterName)
defer C.free(unsafe.Pointer(c_cluster_name))
c_name := C.CString(userName)
defer C.free(unsafe.Pointer(c_name))
conn := &Conn{}
ret := C.rados_create2(&conn.cluster, c_cluster_name, c_name, 0)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,4 @@
/*
Wrappers around librbd.
*/
package rbd

View File

@@ -0,0 +1,874 @@
package rbd
// #cgo LDFLAGS: -lrbd
// #include <errno.h>
// #include <stdlib.h>
// #include <rados/librados.h>
// #include <rbd/librbd.h>
import "C"
import (
"bytes"
"errors"
"fmt"
"github.com/ceph/go-ceph/rados"
"io"
"unsafe"
)
//
type RBDError int
var RbdErrorImageNotOpen = errors.New("RBD image not open")
var RbdErrorNotFound = errors.New("RBD image not found")
//Rdb feature
var RbdFeatureLayering = uint64(1 << 0)
var RbdFeatureStripingV2 = uint64(1 << 1)
//
type ImageInfo struct {
Size uint64
Obj_size uint64
Num_objs uint64
Order int
Block_name_prefix string
Parent_pool int64
Parent_name string
}
//
type SnapInfo struct {
Id uint64
Size uint64
Name string
}
//
type Locker struct {
Client string
Cookie string
Addr string
}
//
type Image struct {
io.Reader
io.Writer
io.Seeker
io.ReaderAt
io.WriterAt
name string
offset int64
ioctx *rados.IOContext
image C.rbd_image_t
}
//
type Snapshot struct {
image *Image
name string
}
//
func split(buf []byte) (values []string) {
tmp := bytes.Split(buf[:len(buf)-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
go_s := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
values = append(values, go_s)
}
}
return values
}
//
func (e RBDError) Error() string {
return fmt.Sprintf("rbd: ret=%d", e)
}
//
func GetError(err C.int) error {
if err != 0 {
if err == -C.ENOENT {
return RbdErrorNotFound
}
return RBDError(err)
} else {
return nil
}
}
//
func Version() (int, int, int) {
var c_major, c_minor, c_patch C.int
C.rbd_version(&c_major, &c_minor, &c_patch)
return int(c_major), int(c_minor), int(c_patch)
}
// GetImageNames returns the list of current RBD images.
func GetImageNames(ioctx *rados.IOContext) (names []string, err error) {
buf := make([]byte, 4096)
for {
size := C.size_t(len(buf))
ret := C.rbd_list(C.rados_ioctx_t(ioctx.Pointer()),
(*C.char)(unsafe.Pointer(&buf[0])), &size)
if ret == -34 { // FIXME
buf = make([]byte, size)
continue
} else if ret < 0 {
return nil, RBDError(ret)
}
tmp := bytes.Split(buf[:size-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
names = append(names, name)
}
}
return names, nil
}
}
//
func GetImage(ioctx *rados.IOContext, name string) *Image {
return &Image{
ioctx: ioctx,
name: name,
}
}
// int rbd_create(rados_ioctx_t io, const char *name, uint64_t size, int *order);
// int rbd_create2(rados_ioctx_t io, const char *name, uint64_t size,
// uint64_t features, int *order);
// int rbd_create3(rados_ioctx_t io, const char *name, uint64_t size,
// uint64_t features, int *order,
// uint64_t stripe_unit, uint64_t stripe_count);
func Create(ioctx *rados.IOContext, name string, size uint64, order int,
args ...uint64) (image *Image, err error) {
var ret C.int
var c_order C.int = C.int(order)
var c_name *C.char = C.CString(name)
defer C.free(unsafe.Pointer(c_name))
switch len(args) {
case 2:
ret = C.rbd_create3(C.rados_ioctx_t(ioctx.Pointer()),
c_name, C.uint64_t(size),
C.uint64_t(args[0]), &c_order,
C.uint64_t(args[1]), C.uint64_t(args[2]))
case 1:
ret = C.rbd_create2(C.rados_ioctx_t(ioctx.Pointer()),
c_name, C.uint64_t(size),
C.uint64_t(args[0]), &c_order)
case 0:
ret = C.rbd_create(C.rados_ioctx_t(ioctx.Pointer()),
c_name, C.uint64_t(size), &c_order)
default:
return nil, errors.New("Wrong number of argument")
}
if ret < 0 {
return nil, RBDError(int(ret))
}
return &Image{
ioctx: ioctx,
name: name,
}, nil
}
// int rbd_clone(rados_ioctx_t p_ioctx, const char *p_name,
// const char *p_snapname, rados_ioctx_t c_ioctx,
// const char *c_name, uint64_t features, int *c_order);
// int rbd_clone2(rados_ioctx_t p_ioctx, const char *p_name,
// const char *p_snapname, rados_ioctx_t c_ioctx,
// const char *c_name, uint64_t features, int *c_order,
// uint64_t stripe_unit, int stripe_count);
func (image *Image) Clone(snapname string, c_ioctx *rados.IOContext, c_name string, features uint64, order int) (*Image, error) {
var c_order C.int = C.int(order)
var c_p_name *C.char = C.CString(image.name)
var c_p_snapname *C.char = C.CString(snapname)
var c_c_name *C.char = C.CString(c_name)
defer C.free(unsafe.Pointer(c_p_name))
defer C.free(unsafe.Pointer(c_p_snapname))
defer C.free(unsafe.Pointer(c_c_name))
ret := C.rbd_clone(C.rados_ioctx_t(image.ioctx.Pointer()),
c_p_name, c_p_snapname,
C.rados_ioctx_t(c_ioctx.Pointer()),
c_c_name, C.uint64_t(features), &c_order)
if ret < 0 {
return nil, RBDError(int(ret))
}
return &Image{
ioctx: c_ioctx,
name: c_name,
}, nil
}
// int rbd_remove(rados_ioctx_t io, const char *name);
// int rbd_remove_with_progress(rados_ioctx_t io, const char *name,
// librbd_progress_fn_t cb, void *cbdata);
func (image *Image) Remove() error {
var c_name *C.char = C.CString(image.name)
defer C.free(unsafe.Pointer(c_name))
return GetError(C.rbd_remove(C.rados_ioctx_t(image.ioctx.Pointer()), c_name))
}
// int rbd_rename(rados_ioctx_t src_io_ctx, const char *srcname, const char *destname);
func (image *Image) Rename(destname string) error {
var c_srcname *C.char = C.CString(image.name)
var c_destname *C.char = C.CString(destname)
defer C.free(unsafe.Pointer(c_srcname))
defer C.free(unsafe.Pointer(c_destname))
err := RBDError(C.rbd_rename(C.rados_ioctx_t(image.ioctx.Pointer()),
c_srcname, c_destname))
if err == 0 {
image.name = destname
return nil
}
return err
}
// int rbd_open(rados_ioctx_t io, const char *name, rbd_image_t *image, const char *snap_name);
// int rbd_open_read_only(rados_ioctx_t io, const char *name, rbd_image_t *image,
// const char *snap_name);
func (image *Image) Open(args ...interface{}) error {
var c_image C.rbd_image_t
var c_name *C.char = C.CString(image.name)
var c_snap_name *C.char
var ret C.int
var read_only bool = false
defer C.free(unsafe.Pointer(c_name))
for _, arg := range args {
switch t := arg.(type) {
case string:
if t != "" {
c_snap_name = C.CString(t)
defer C.free(unsafe.Pointer(c_snap_name))
}
case bool:
read_only = t
default:
return errors.New("Unexpected argument")
}
}
if read_only {
ret = C.rbd_open_read_only(C.rados_ioctx_t(image.ioctx.Pointer()), c_name,
&c_image, c_snap_name)
} else {
ret = C.rbd_open(C.rados_ioctx_t(image.ioctx.Pointer()), c_name,
&c_image, c_snap_name)
}
image.image = c_image
return GetError(ret)
}
// int rbd_close(rbd_image_t image);
func (image *Image) Close() error {
if image.image == nil {
return RbdErrorImageNotOpen
}
ret := C.rbd_close(image.image)
if ret != 0 {
return RBDError(ret)
}
image.image = nil
return nil
}
// int rbd_resize(rbd_image_t image, uint64_t size);
func (image *Image) Resize(size uint64) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
return GetError(C.rbd_resize(image.image, C.uint64_t(size)))
}
// int rbd_stat(rbd_image_t image, rbd_image_info_t *info, size_t infosize);
func (image *Image) Stat() (info *ImageInfo, err error) {
if image.image == nil {
return nil, RbdErrorImageNotOpen
}
var c_stat C.rbd_image_info_t
ret := C.rbd_stat(image.image,
&c_stat, C.size_t(unsafe.Sizeof(info)))
if ret < 0 {
return info, RBDError(int(ret))
}
return &ImageInfo{
Size: uint64(c_stat.size),
Obj_size: uint64(c_stat.obj_size),
Num_objs: uint64(c_stat.num_objs),
Order: int(c_stat.order),
Block_name_prefix: C.GoString((*C.char)(&c_stat.block_name_prefix[0])),
Parent_pool: int64(c_stat.parent_pool),
Parent_name: C.GoString((*C.char)(&c_stat.parent_name[0]))}, nil
}
// int rbd_get_old_format(rbd_image_t image, uint8_t *old);
func (image *Image) IsOldFormat() (old_format bool, err error) {
if image.image == nil {
return false, RbdErrorImageNotOpen
}
var c_old_format C.uint8_t
ret := C.rbd_get_old_format(image.image,
&c_old_format)
if ret < 0 {
return false, RBDError(int(ret))
}
return c_old_format != 0, nil
}
// int rbd_size(rbd_image_t image, uint64_t *size);
func (image *Image) GetSize() (size uint64, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
ret := C.rbd_get_size(image.image,
(*C.uint64_t)(&size))
if ret < 0 {
return 0, RBDError(int(ret))
}
return size, nil
}
// int rbd_get_features(rbd_image_t image, uint64_t *features);
func (image *Image) GetFeatures() (features uint64, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
ret := C.rbd_get_features(image.image,
(*C.uint64_t)(&features))
if ret < 0 {
return 0, RBDError(int(ret))
}
return features, nil
}
// int rbd_get_stripe_unit(rbd_image_t image, uint64_t *stripe_unit);
func (image *Image) GetStripeUnit() (stripe_unit uint64, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
ret := C.rbd_get_stripe_unit(image.image, (*C.uint64_t)(&stripe_unit))
if ret < 0 {
return 0, RBDError(int(ret))
}
return stripe_unit, nil
}
// int rbd_get_stripe_count(rbd_image_t image, uint64_t *stripe_count);
func (image *Image) GetStripeCount() (stripe_count uint64, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
ret := C.rbd_get_stripe_count(image.image, (*C.uint64_t)(&stripe_count))
if ret < 0 {
return 0, RBDError(int(ret))
}
return stripe_count, nil
}
// int rbd_get_overlap(rbd_image_t image, uint64_t *overlap);
func (image *Image) GetOverlap() (overlap uint64, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
ret := C.rbd_get_overlap(image.image, (*C.uint64_t)(&overlap))
if ret < 0 {
return overlap, RBDError(int(ret))
}
return overlap, nil
}
// int rbd_copy(rbd_image_t image, rados_ioctx_t dest_io_ctx, const char *destname);
// int rbd_copy2(rbd_image_t src, rbd_image_t dest);
// int rbd_copy_with_progress(rbd_image_t image, rados_ioctx_t dest_p, const char *destname,
// librbd_progress_fn_t cb, void *cbdata);
// int rbd_copy_with_progress2(rbd_image_t src, rbd_image_t dest,
// librbd_progress_fn_t cb, void *cbdata);
func (image *Image) Copy(args ...interface{}) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
switch t := args[0].(type) {
case rados.IOContext:
switch t2 := args[1].(type) {
case string:
var c_destname *C.char = C.CString(t2)
defer C.free(unsafe.Pointer(c_destname))
return RBDError(C.rbd_copy(image.image,
C.rados_ioctx_t(t.Pointer()),
c_destname))
default:
return errors.New("Must specify destname")
}
case Image:
var dest Image = t
if dest.image == nil {
return errors.New(fmt.Sprintf("RBD image %s is not open", dest.name))
}
return GetError(C.rbd_copy2(image.image,
dest.image))
default:
return errors.New("Must specify either destination pool " +
"or destination image")
}
}
// int rbd_flatten(rbd_image_t image);
func (image *Image) Flatten() error {
if image.image == nil {
return errors.New(fmt.Sprintf("RBD image %s is not open", image.name))
}
return GetError(C.rbd_flatten(image.image))
}
// ssize_t rbd_list_children(rbd_image_t image, char *pools, size_t *pools_len,
// char *images, size_t *images_len);
func (image *Image) ListChildren() (pools []string, images []string, err error) {
if image.image == nil {
return nil, nil, RbdErrorImageNotOpen
}
var c_pools_len, c_images_len C.size_t
ret := C.rbd_list_children(image.image,
nil, &c_pools_len,
nil, &c_images_len)
if ret == 0 {
return nil, nil, nil
}
if ret < 0 && ret != -C.ERANGE {
return nil, nil, RBDError(int(ret))
}
pools_buf := make([]byte, c_pools_len)
images_buf := make([]byte, c_images_len)
ret = C.rbd_list_children(image.image,
(*C.char)(unsafe.Pointer(&pools_buf[0])),
&c_pools_len,
(*C.char)(unsafe.Pointer(&images_buf[0])),
&c_images_len)
if ret < 0 {
return nil, nil, RBDError(int(ret))
}
tmp := bytes.Split(pools_buf[:c_pools_len-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
pools = append(pools, name)
}
}
tmp = bytes.Split(images_buf[:c_images_len-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
images = append(images, name)
}
}
return pools, images, nil
}
// ssize_t rbd_list_lockers(rbd_image_t image, int *exclusive,
// char *tag, size_t *tag_len,
// char *clients, size_t *clients_len,
// char *cookies, size_t *cookies_len,
// char *addrs, size_t *addrs_len);
func (image *Image) ListLockers() (tag string, lockers []Locker, err error) {
if image.image == nil {
return "", nil, RbdErrorImageNotOpen
}
var c_exclusive C.int
var c_tag_len, c_clients_len, c_cookies_len, c_addrs_len C.size_t
var c_locker_cnt C.ssize_t
C.rbd_list_lockers(image.image, &c_exclusive,
nil, (*C.size_t)(&c_tag_len),
nil, (*C.size_t)(&c_clients_len),
nil, (*C.size_t)(&c_cookies_len),
nil, (*C.size_t)(&c_addrs_len))
// no locker held on rbd image when either c_clients_len,
// c_cookies_len or c_addrs_len is *0*, so just quickly returned
if int(c_clients_len) == 0 || int(c_cookies_len) == 0 ||
int(c_addrs_len) ==0 {
lockers = make([]Locker, 0)
return "", lockers, nil
}
tag_buf := make([]byte, c_tag_len)
clients_buf := make([]byte, c_clients_len)
cookies_buf := make([]byte, c_cookies_len)
addrs_buf := make([]byte, c_addrs_len)
c_locker_cnt = C.rbd_list_lockers(image.image, &c_exclusive,
(*C.char)(unsafe.Pointer(&tag_buf[0])), (*C.size_t)(&c_tag_len),
(*C.char)(unsafe.Pointer(&clients_buf[0])), (*C.size_t)(&c_clients_len),
(*C.char)(unsafe.Pointer(&cookies_buf[0])), (*C.size_t)(&c_cookies_len),
(*C.char)(unsafe.Pointer(&addrs_buf[0])), (*C.size_t)(&c_addrs_len))
// rbd_list_lockers returns negative value for errors
// and *0* means no locker held on rbd image.
// but *0* is unexpected here because first rbd_list_lockers already
// dealt with no locker case
if int(c_locker_cnt) <= 0 {
return "", nil, RBDError(int(c_locker_cnt))
}
clients := split(clients_buf)
cookies := split(cookies_buf)
addrs := split(addrs_buf)
lockers = make([]Locker, c_locker_cnt)
for i := 0; i < int(c_locker_cnt); i++ {
lockers[i] = Locker{Client: clients[i],
Cookie: cookies[i],
Addr: addrs[i]}
}
return string(tag_buf), lockers, nil
}
// int rbd_lock_exclusive(rbd_image_t image, const char *cookie);
func (image *Image) LockExclusive(cookie string) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
var c_cookie *C.char = C.CString(cookie)
defer C.free(unsafe.Pointer(c_cookie))
return GetError(C.rbd_lock_exclusive(image.image, c_cookie))
}
// int rbd_lock_shared(rbd_image_t image, const char *cookie, const char *tag);
func (image *Image) LockShared(cookie string, tag string) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
var c_cookie *C.char = C.CString(cookie)
var c_tag *C.char = C.CString(tag)
defer C.free(unsafe.Pointer(c_cookie))
defer C.free(unsafe.Pointer(c_tag))
return GetError(C.rbd_lock_shared(image.image, c_cookie, c_tag))
}
// int rbd_lock_shared(rbd_image_t image, const char *cookie, const char *tag);
func (image *Image) Unlock(cookie string) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
var c_cookie *C.char = C.CString(cookie)
defer C.free(unsafe.Pointer(c_cookie))
return GetError(C.rbd_unlock(image.image, c_cookie))
}
// int rbd_break_lock(rbd_image_t image, const char *client, const char *cookie);
func (image *Image) BreakLock(client string, cookie string) error {
if image.image == nil {
return RbdErrorImageNotOpen
}
var c_client *C.char = C.CString(client)
var c_cookie *C.char = C.CString(cookie)
defer C.free(unsafe.Pointer(c_client))
defer C.free(unsafe.Pointer(c_cookie))
return GetError(C.rbd_break_lock(image.image, c_client, c_cookie))
}
// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, char *buf);
// TODO: int64_t rbd_read_iterate(rbd_image_t image, uint64_t ofs, size_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_read_iterate2(rbd_image_t image, uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_diff_iterate(rbd_image_t image,
// const char *fromsnapname,
// uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, int, void *), void *arg);
func (image *Image) Read(data []byte) (n int, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
if len(data) == 0 {
return 0, nil
}
ret := int(C.rbd_read(
image.image,
(C.uint64_t)(image.offset),
(C.size_t)(len(data)),
(*C.char)(unsafe.Pointer(&data[0]))))
if ret < 0 {
return 0, RBDError(ret)
}
image.offset += int64(ret)
if ret < n {
return ret, io.EOF
}
return ret, nil
}
// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, const char *buf);
func (image *Image) Write(data []byte) (n int, err error) {
ret := int(C.rbd_write(image.image, C.uint64_t(image.offset),
C.size_t(len(data)), (*C.char)(unsafe.Pointer(&data[0]))))
if ret >= 0 {
image.offset += int64(ret)
}
if ret != len(data) {
err = RBDError(-1)
}
return ret, err
}
func (image *Image) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
image.offset = offset
case 1:
image.offset += offset
case 2:
stats, err := image.Stat()
if err != nil {
return 0, err
}
image.offset = int64(stats.Size) - offset
default:
return 0, errors.New("Wrong value for whence")
}
return image.offset, nil
}
// int rbd_discard(rbd_image_t image, uint64_t ofs, uint64_t len);
func (image *Image) Discard(ofs uint64, length uint64) error {
return RBDError(C.rbd_discard(image.image, C.uint64_t(ofs),
C.uint64_t(length)))
}
func (image *Image) ReadAt(data []byte, off int64) (n int, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
if len(data) == 0 {
return 0, nil
}
ret := int(C.rbd_read(
image.image,
(C.uint64_t)(off),
(C.size_t)(len(data)),
(*C.char)(unsafe.Pointer(&data[0]))))
if ret < 0 {
return 0, RBDError(ret)
}
if ret < n {
return ret, io.EOF
}
return ret, nil
}
func (image *Image) WriteAt(data []byte, off int64) (n int, err error) {
if image.image == nil {
return 0, RbdErrorImageNotOpen
}
if len(data) == 0 {
return 0, nil
}
ret := int(C.rbd_write(image.image, C.uint64_t(off),
C.size_t(len(data)), (*C.char)(unsafe.Pointer(&data[0]))))
if ret != len(data) {
err = RBDError(-1)
}
return ret, err
}
// int rbd_flush(rbd_image_t image);
func (image *Image) Flush() error {
return GetError(C.rbd_flush(image.image))
}
// int rbd_snap_list(rbd_image_t image, rbd_snap_info_t *snaps, int *max_snaps);
// void rbd_snap_list_end(rbd_snap_info_t *snaps);
func (image *Image) GetSnapshotNames() (snaps []SnapInfo, err error) {
if image.image == nil {
return nil, RbdErrorImageNotOpen
}
var c_max_snaps C.int = 0
ret := C.rbd_snap_list(image.image, nil, &c_max_snaps)
c_snaps := make([]C.rbd_snap_info_t, c_max_snaps)
snaps = make([]SnapInfo, c_max_snaps)
ret = C.rbd_snap_list(image.image,
&c_snaps[0], &c_max_snaps)
if ret < 0 {
return nil, RBDError(int(ret))
}
for i, s := range c_snaps {
snaps[i] = SnapInfo{Id: uint64(s.id),
Size: uint64(s.size),
Name: C.GoString(s.name)}
}
C.rbd_snap_list_end(&c_snaps[0])
return snaps[:len(snaps)-1], nil
}
// int rbd_snap_create(rbd_image_t image, const char *snapname);
func (image *Image) CreateSnapshot(snapname string) (*Snapshot, error) {
if image.image == nil {
return nil, RbdErrorImageNotOpen
}
var c_snapname *C.char = C.CString(snapname)
defer C.free(unsafe.Pointer(c_snapname))
ret := C.rbd_snap_create(image.image, c_snapname)
if ret < 0 {
return nil, RBDError(int(ret))
}
return &Snapshot{
image: image,
name: snapname,
}, nil
}
//
func (image *Image) GetSnapshot(snapname string) *Snapshot {
return &Snapshot{
image: image,
name: snapname,
}
}
// int rbd_get_parent_info(rbd_image_t image,
// char *parent_pool_name, size_t ppool_namelen, char *parent_name,
// size_t pnamelen, char *parent_snap_name, size_t psnap_namelen)
func (image *Image) GetParentInfo(p_pool, p_name, p_snapname []byte) error {
ret := C.rbd_get_parent_info(
image.image,
(*C.char)(unsafe.Pointer(&p_pool[0])),
(C.size_t)(len(p_pool)),
(*C.char)(unsafe.Pointer(&p_name[0])),
(C.size_t)(len(p_name)),
(*C.char)(unsafe.Pointer(&p_snapname[0])),
(C.size_t)(len(p_snapname)))
if ret == 0 {
return nil
} else {
return RBDError(int(ret))
}
}
// int rbd_snap_remove(rbd_image_t image, const char *snapname);
func (snapshot *Snapshot) Remove() error {
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return GetError(C.rbd_snap_remove(snapshot.image.image, c_snapname))
}
// int rbd_snap_rollback(rbd_image_t image, const char *snapname);
// int rbd_snap_rollback_with_progress(rbd_image_t image, const char *snapname,
// librbd_progress_fn_t cb, void *cbdata);
func (snapshot *Snapshot) Rollback() error {
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return GetError(C.rbd_snap_rollback(snapshot.image.image, c_snapname))
}
// int rbd_snap_protect(rbd_image_t image, const char *snap_name);
func (snapshot *Snapshot) Protect() error {
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return GetError(C.rbd_snap_protect(snapshot.image.image, c_snapname))
}
// int rbd_snap_unprotect(rbd_image_t image, const char *snap_name);
func (snapshot *Snapshot) Unprotect() error {
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return GetError(C.rbd_snap_unprotect(snapshot.image.image, c_snapname))
}
// int rbd_snap_is_protected(rbd_image_t image, const char *snap_name,
// int *is_protected);
func (snapshot *Snapshot) IsProtected() (bool, error) {
var c_is_protected C.int
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
ret := C.rbd_snap_is_protected(snapshot.image.image, c_snapname,
&c_is_protected)
if ret < 0 {
return false, RBDError(int(ret))
}
return c_is_protected != 0, nil
}
// int rbd_snap_set(rbd_image_t image, const char *snapname);
func (snapshot *Snapshot) Set() error {
var c_snapname *C.char = C.CString(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return GetError(C.rbd_snap_set(snapshot.image.image, c_snapname))
}

View File

@@ -0,0 +1,277 @@
package rbd_test
import (
"bytes"
"encoding/json"
"github.com/ceph/go-ceph/rados"
"github.com/ceph/go-ceph/rbd"
"github.com/stretchr/testify/assert"
"os/exec"
"sort"
"testing"
)
func GetUUID() string {
out, _ := exec.Command("uuidgen").Output()
return string(out[:36])
}
func TestVersion(t *testing.T) {
var major, minor, patch = rbd.Version()
assert.False(t, major < 0 || major > 1000, "invalid major")
assert.False(t, minor < 0 || minor > 1000, "invalid minor")
assert.False(t, patch < 0 || patch > 1000, "invalid patch")
}
func TestGetImageNames(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
createdList := []string{}
for i := 0; i < 10; i++ {
name := GetUUID()
_, err := rbd.Create(ioctx, name, 1<<22, 22)
assert.NoError(t, err)
createdList = append(createdList, name)
}
imageNames, err := rbd.GetImageNames(ioctx)
assert.NoError(t, err)
sort.Strings(createdList)
sort.Strings(imageNames)
assert.Equal(t, createdList, imageNames)
for _, name := range createdList {
img := rbd.GetImage(ioctx, name)
err := img.Remove()
assert.NoError(t, err)
}
ioctx.Destroy()
conn.DeletePool(poolname)
conn.Shutdown()
}
func TestIOReaderWriter(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
name := GetUUID()
img, err := rbd.Create(ioctx, name, 1<<22, 22)
assert.NoError(t, err)
err = img.Open()
assert.NoError(t, err)
stats, err := img.Stat()
assert.NoError(t, err)
encoder := json.NewEncoder(img)
encoder.Encode(stats)
err = img.Flush()
assert.NoError(t, err)
_, err = img.Seek(0, 0)
assert.NoError(t, err)
var stats2 *rbd.ImageInfo
decoder := json.NewDecoder(img)
decoder.Decode(&stats2)
assert.Equal(t, &stats, &stats2)
_, err = img.Seek(0, 0)
bytes_in := []byte("input data")
_, err = img.Write(bytes_in)
assert.NoError(t, err)
_, err = img.Seek(0, 0)
assert.NoError(t, err)
bytes_out := make([]byte, len(bytes_in))
n_out, err := img.Read(bytes_out)
assert.Equal(t, n_out, len(bytes_in))
assert.Equal(t, bytes_in, bytes_out)
err = img.Close()
assert.NoError(t, err)
img.Remove()
assert.NoError(t, err)
ioctx.Destroy()
conn.DeletePool(poolname)
conn.Shutdown()
}
func TestCreateSnapshot(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
name := GetUUID()
img, err := rbd.Create(ioctx, name, 1<<22, 22)
assert.NoError(t, err)
err = img.Open()
assert.NoError(t, err)
snapshot, err := img.CreateSnapshot("mysnap")
assert.NoError(t, err)
err = img.Close()
err = img.Open("mysnap")
assert.NoError(t, err)
snapshot.Remove()
assert.NoError(t, err)
err = img.Close()
assert.NoError(t, err)
img.Remove()
assert.NoError(t, err)
ioctx.Destroy()
conn.DeletePool(poolname)
conn.Shutdown()
}
func TestParentInfo(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
name := "parent"
img, err := rbd.Create(ioctx, name, 1<<22, 22, 1)
assert.NoError(t, err)
err = img.Open()
assert.NoError(t, err)
snapshot, err := img.CreateSnapshot("mysnap")
assert.NoError(t, err)
err = snapshot.Protect()
assert.NoError(t, err)
// create an image context with the parent+snapshot
snapImg := rbd.GetImage(ioctx, "parent")
err = snapImg.Open("mysnap")
assert.NoError(t, err)
// ensure no children prior to clone
pools, images, err := snapImg.ListChildren()
assert.NoError(t, err)
assert.Equal(t, len(pools), 0, "pools equal")
assert.Equal(t, len(images), 0, "children length equal")
imgNew, err := img.Clone("mysnap", ioctx, "child", 1, 22)
assert.NoError(t, err)
err = imgNew.Open()
assert.NoError(t, err)
parentPool := make([]byte, 128)
parentName := make([]byte, 128)
parentSnapname := make([]byte, 128)
err = imgNew.GetParentInfo(parentPool, parentName, parentSnapname)
assert.NoError(t, err)
n := bytes.Index(parentName, []byte{0})
pName := string(parentName[:n])
n = bytes.Index(parentSnapname, []byte{0})
pSnapname := string(parentSnapname[:n])
assert.Equal(t, pName, "parent", "they should be equal")
assert.Equal(t, pSnapname, "mysnap", "they should be equal")
pools, images, err = snapImg.ListChildren()
assert.NoError(t, err)
assert.Equal(t, len(pools), 1, "pools equal")
assert.Equal(t, len(images), 1, "children length equal")
err = imgNew.Close()
assert.NoError(t, err)
err = imgNew.Remove()
assert.NoError(t, err)
err = snapshot.Unprotect()
assert.NoError(t, err)
err = snapshot.Remove()
assert.NoError(t, err)
err = img.Close()
assert.NoError(t, err)
err = snapImg.Close()
assert.NoError(t, err)
err = img.Remove()
assert.NoError(t, err)
ioctx.Destroy()
conn.DeletePool(poolname)
conn.Shutdown()
}
func TestNotFound(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
name := GetUUID()
img := rbd.GetImage(ioctx, name)
err = img.Open()
assert.Equal(t, err, rbd.RbdErrorNotFound)
img.Remove()
assert.Equal(t, err, rbd.RbdErrorNotFound)
ioctx.Destroy()
conn.DeletePool(poolname)
conn.Shutdown()
}

114
ci/ceph_entrypoint.sh Executable file
View File

@@ -0,0 +1,114 @@
#!/bin/bash
#
# Copyright (C) 2013,2014 Loic Dachary <loic@dachary.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
set -e
set -u
DIR=$1
#if ! dpkg -l ceph ; then
# wget -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | sudo apt-key add -
# echo deb http://ceph.com/debian-dumpling/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list
# sudo apt-get update
# sudo apt-get --yes install ceph ceph-common
#fi
# get rid of process and directories leftovers
pkill ceph-mon || true
pkill ceph-osd || true
rm -fr $DIR
# cluster wide parameters
mkdir -p ${DIR}/log
cat >> $DIR/ceph.conf <<EOF
[global]
fsid = $(uuidgen)
osd crush chooseleaf type = 0
run dir = ${DIR}/run
auth cluster required = none
auth service required = none
auth client required = none
osd pool default size = 1
EOF
export CEPH_ARGS="--conf ${DIR}/ceph.conf"
# single monitor
MON_DATA=${DIR}/mon
mkdir -p $MON_DATA
cat >> $DIR/ceph.conf <<EOF
[mon.0]
log file = ${DIR}/log/mon.log
chdir = ""
mon cluster log file = ${DIR}/log/mon-cluster.log
mon data = ${MON_DATA}
mon addr = 127.0.0.1
EOF
ceph-mon --id 0 --mkfs --keyring /dev/null
touch ${MON_DATA}/keyring
ceph-mon --id 0
# single osd
OSD_DATA=${DIR}/osd
mkdir ${OSD_DATA}
cat >> $DIR/ceph.conf <<EOF
[osd.0]
log file = ${DIR}/log/osd.log
chdir = ""
osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
EOF
OSD_ID=$(ceph osd create)
ceph osd crush add osd.${OSD_ID} 1 root=default host=localhost
ceph-osd --id ${OSD_ID} --mkjournal --mkfs
ceph-osd --id ${OSD_ID}
# single mds
MDS_DATA=${DIR}/mds.a
mkdir ${MDS_DATA}
cat >> $DIR/ceph.conf <<EOF
[mds.a]
mds data = ${MDS_DATA}
mds log max segments = 2
mds cache size = 10000
host = localhost
EOF
ceph-authtool --create-keyring --gen-key --name=mds.a ${MDS_DATA}/keyring
ceph -i ${MDS_DATA}/keyring auth add mds.a mon 'allow profile mds' osd 'allow *' mds 'allow'
ceph osd pool create cephfs_data 8
ceph osd pool create cephfs_metadata 8
ceph fs new cephfs cephfs_metadata cephfs_data
ceph-mds -i a
# check that it works
rados --pool rbd put group /etc/group
rados --pool rbd get group ${DIR}/group
diff /etc/group ${DIR}/group
ceph osd tree
export CEPH_CONF="${DIR}/ceph.conf"
go get github.com/stretchr/testify/assert
cd /go/src/github.com/ceph/go-ceph
exec go test -v ./...

28
ci/ceph_install.sh Executable file
View File

@@ -0,0 +1,28 @@
#!/bin/bash
set -e
set -x
sudo apt-get install -y python-virtualenv
# ceph-deploy and ceph
WORKDIR=$HOME/workdir
CEPH_RELEASE=jewel
mkdir $WORKDIR
pushd $WORKDIR
ssh-keygen -f $HOME/.ssh/id_rsa -t rsa -N ''
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
git clone git://github.com/ceph/ceph-deploy
pushd ceph-deploy
./bootstrap
./ceph-deploy install --release ${CEPH_RELEASE} `hostname`
./ceph-deploy pkg --install librados-dev `hostname`
./ceph-deploy pkg --install librbd-dev `hostname`
./ceph-deploy pkg --install libcephfs-dev `hostname`
popd # ceph-deploy
popd # workdir

123
ci/ceph_micro-osd.sh Normal file
View File

@@ -0,0 +1,123 @@
#
# Copyright (C) 2013,2014 Loic Dachary <loic@dachary.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
set -e
set -u
DIR=$1
#if ! dpkg -l ceph ; then
# wget -q -O- 'https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc' | sudo apt-key add -
# echo deb http://ceph.com/debian-dumpling/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list
# sudo apt-get update
# sudo apt-get --yes install ceph ceph-common
#fi
# get rid of process and directories leftovers
pkill ceph-mon || true
pkill ceph-osd || true
rm -fr $DIR
# cluster wide parameters
mkdir -p ${DIR}/log
cat >> $DIR/ceph.conf <<EOF
[global]
fsid = $(uuidgen)
osd crush chooseleaf type = 0
run dir = ${DIR}/run
auth cluster required = none
auth service required = none
auth client required = none
osd pool default size = 1
mon allow pool delete = true
EOF
export CEPH_ARGS="--conf ${DIR}/ceph.conf"
# single monitor
MON_DATA=${DIR}/mon
mkdir -p $MON_DATA
cat >> $DIR/ceph.conf <<EOF
[mon.0]
log file = ${DIR}/log/mon.log
chdir = ""
mon cluster log file = ${DIR}/log/mon-cluster.log
mon data = ${MON_DATA}
mon addr = 127.0.0.1
EOF
ceph-mon --id 0 --mkfs --keyring /dev/null
touch ${MON_DATA}/keyring
ceph-mon --id 0
# single osd
OSD_DATA=${DIR}/osd
mkdir ${OSD_DATA}
cat >> $DIR/ceph.conf <<EOF
[osd.0]
log file = ${DIR}/log/osd.log
chdir = ""
osd data = ${OSD_DATA}
osd journal = ${OSD_DATA}.journal
osd journal size = 100
osd objectstore = memstore
EOF
OSD_ID=$(ceph osd create)
ceph osd crush add osd.${OSD_ID} 1 root=default host=localhost
ceph-osd --id ${OSD_ID} --mkjournal --mkfs
ceph-osd --id ${OSD_ID}
# single mds
MDS_DATA=${DIR}/mds.a
mkdir ${MDS_DATA}
cat >> $DIR/ceph.conf <<EOF
[mds.a]
mds data = ${MDS_DATA}
mds log max segments = 2
mds cache size = 10000
host = localhost
EOF
ceph-authtool --create-keyring --gen-key --name=mds.a ${MDS_DATA}/keyring
ceph -i ${MDS_DATA}/keyring auth add mds.a mon 'allow profile mds' osd 'allow *' mds 'allow'
ceph osd pool create cephfs_data 8
ceph osd pool create cephfs_metadata 8
ceph fs new cephfs cephfs_metadata cephfs_data
ceph-mds -i a
# create iscsi pool and a 10Mb rbd
ceph osd pool create iscsi_pool 8
rbd create lun0 --size 10 --pool iscsi_pool
export CEPH_CONF="${DIR}/ceph.conf"
while true; do
if ceph status | tee /dev/tty | grep -q HEALTH_OK; then
if ! ceph status | grep -q creating &> /dev/null; then
break
fi
fi
sleep 1
done
# check that it works
rados --pool rbd put group /etc/group
rados --pool rbd get group ${DIR}/group
diff /etc/group ${DIR}/group
ceph osd tree

View File

@@ -0,0 +1,151 @@
/*
Copyright 2018 The GoStor Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backingstore
import (
"fmt"
"strings"
"github.com/gostor/gotgt/pkg/api"
"github.com/gostor/gotgt/pkg/scsi"
log "github.com/Sirupsen/logrus"
"github.com/ceph/go-ceph/rados"
"github.com/ceph/go-ceph/rbd"
)
// This ceph-rbd plugin is only for linux
// path format ceph-rbd:poolname/imagename
const (
CephBackingStorage = "ceph-rbd"
)
func init() {
scsi.RegisterBackingStore(CephBackingStorage, newCeph)
}
type CephBackingStore struct {
scsi.BaseBackingStore
poolName string
imageName string
conn *rados.Conn
ioctx *rados.IOContext
image *rbd.Image
}
func newCeph() (api.BackingStore, error) {
return &CephBackingStore{
BaseBackingStore: scsi.BaseBackingStore{
Name: CephBackingStorage,
DataSize: 0,
OflagsSupported: 0,
},
}, nil
}
func (bs *CephBackingStore) Open(dev *api.SCSILu, path string) error {
pathinfo := strings.SplitN(path, "/", 2)
if len(pathinfo) != 2 {
return fmt.Errorf("invalid device path string:%s", path)
}
poolName := pathinfo[0]
imageName := pathinfo[1]
log.Debugf("ceph path = %s", path)
if conn, err := rados.NewConn(); err != nil {
log.Error(err)
return err
} else {
bs.conn = conn
}
if err := bs.conn.ReadDefaultConfigFile(); err != nil {
log.Error(err)
return err
}
if err := bs.conn.Connect(); err != nil {
log.Error(err)
return err
}
if ioctx, err := bs.conn.OpenIOContext(poolName); err != nil {
bs.conn.Shutdown()
log.Error(err)
return err
} else {
bs.ioctx = ioctx
}
if image := rbd.GetImage(bs.ioctx, imageName); image == nil {
err := fmt.Errorf("rbdGetImage failed:poolName:%s,imageName:%s",
poolName, imageName)
log.Error(err)
} else {
bs.image = image
}
if err := bs.image.Open(); err != nil {
log.Error(err)
return err
}
if dataSize, err := bs.image.GetSize(); err != nil {
log.Error(err)
return err
} else {
bs.DataSize = dataSize
}
return nil
}
func (bs *CephBackingStore) Close(dev *api.SCSILu) error {
err := bs.image.Close()
bs.ioctx.Destroy()
bs.conn.Shutdown()
return err
}
func (bs *CephBackingStore) Init(dev *api.SCSILu, Opts string) error {
return nil
}
func (bs *CephBackingStore) Exit(dev *api.SCSILu) error {
return nil
}
func (bs *CephBackingStore) Size(dev *api.SCSILu) uint64 {
return bs.DataSize
}
func (bs *CephBackingStore) Read(offset, tl int64) ([]byte, error) {
tmpbuf := make([]byte, tl)
_, err := bs.image.ReadAt(tmpbuf, offset)
return tmpbuf, err
}
func (bs *CephBackingStore) Write(wbuf []byte, offset int64) error {
_, err := bs.image.WriteAt(wbuf, offset)
return err
}
func (bs *CephBackingStore) DataSync() error {
err := bs.image.Flush()
return err
}
func (bs *CephBackingStore) DataAdvise(offset, length int64, advise uint32) error {
return nil
}