Merge pull request #127 from gostor/perf/reduce-alloc-and-copies
optimize performance: reduce allocations, buffered I/O, zero-copy reads
This commit is contained in:
@@ -312,8 +312,7 @@ func parseHeader(data []byte) (*ISCSICommand, error) {
|
||||
if len(data) != BHS_SIZE {
|
||||
return nil, fmt.Errorf("garbled header")
|
||||
}
|
||||
// TODO: sync.Pool
|
||||
m := &ISCSICommand{}
|
||||
m := getCommand()
|
||||
m.Immediate = 0x40&data[0] == 0x40
|
||||
m.OpCode = OpCode(data[0] & ISCSI_OPCODE_MASK)
|
||||
m.Final = 0x80&data[1] == 0x80
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package iscsit
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"net"
|
||||
"sort"
|
||||
@@ -61,6 +62,7 @@ type iscsiConnection struct {
|
||||
txIOState int
|
||||
refcount int
|
||||
conn net.Conn
|
||||
writer *bufio.Writer
|
||||
|
||||
rxBuffer []byte
|
||||
txBuffer []byte
|
||||
@@ -117,6 +119,7 @@ func (c *iscsiConnection) init() {
|
||||
c.state = CONN_STATE_FREE
|
||||
c.refcount = 1
|
||||
c.readLock = new(sync.RWMutex)
|
||||
c.writer = bufio.NewWriterSize(c.conn, 256*1024)
|
||||
c.loginParam.sessionParam = []ISCSISessionParam{}
|
||||
c.loginParam.tgtCSG = LoginOperationalNegotiation
|
||||
c.loginParam.tgtNSG = LoginOperationalNegotiation
|
||||
@@ -136,10 +139,15 @@ func (c *iscsiConnection) readData(buf []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (c *iscsiConnection) write(resp []byte) (int, error) {
|
||||
return c.conn.Write(resp)
|
||||
return c.writer.Write(resp)
|
||||
}
|
||||
|
||||
func (c *iscsiConnection) flush() error {
|
||||
return c.writer.Flush()
|
||||
}
|
||||
|
||||
func (c *iscsiConnection) close() {
|
||||
c.writer.Flush()
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -830,6 +830,13 @@ SendRemainingData:
|
||||
}
|
||||
}
|
||||
|
||||
// Flush buffered writes to the network
|
||||
if err := conn.flush(); err != nil {
|
||||
log.Errorf("failed to flush data to client: %v", err)
|
||||
conn.state = CONN_STATE_CLOSE
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("connection state: %v", conn.State())
|
||||
switch conn.state {
|
||||
case CONN_STATE_CLOSE, CONN_STATE_EXIT:
|
||||
|
||||
@@ -109,7 +109,6 @@ func bsPerformCommand(bs api.BackingStore, cmd *api.SCSICommand) (err error, key
|
||||
// TODO
|
||||
break
|
||||
case api.READ_6, api.READ_10, api.READ_12, api.READ_16:
|
||||
rbuf = make([]byte, int(tl))
|
||||
rbuf, err = bs.Read(int64(offset), tl)
|
||||
if err != nil && err != io.EOF {
|
||||
key = MEDIUM_ERROR
|
||||
@@ -117,9 +116,6 @@ func bsPerformCommand(bs api.BackingStore, cmd *api.SCSICommand) (err error, key
|
||||
break
|
||||
}
|
||||
length = len(rbuf)
|
||||
for i := 0; i < int(tl)-length; i++ {
|
||||
rbuf = append(rbuf, 0)
|
||||
}
|
||||
|
||||
if (opcode != api.READ_6) && (scb[1]&0x10 != 0) {
|
||||
bs.DataAdvise(int64(offset), int64(length), util.POSIX_FADV_NOREUSE)
|
||||
@@ -131,6 +127,12 @@ func bsPerformCommand(bs api.BackingStore, cmd *api.SCSICommand) (err error, key
|
||||
goto sense
|
||||
}
|
||||
copy(cmd.InSDBBuffer.Buffer, rbuf)
|
||||
// Zero-fill any remaining bytes if read was short
|
||||
if length < int(tl) {
|
||||
for i := length; i < int(tl) && i < len(cmd.InSDBBuffer.Buffer); i++ {
|
||||
cmd.InSDBBuffer.Buffer[i] = 0
|
||||
}
|
||||
}
|
||||
case api.PRE_FETCH_10, api.PRE_FETCH_16:
|
||||
err = bs.DataAdvise(int64(offset), tl, util.POSIX_FADV_WILLNEED)
|
||||
if err != nil {
|
||||
|
||||
@@ -159,11 +159,26 @@ func (bs *FileBackingStore) DataAdvise(offset, length int64, advise uint32) erro
|
||||
return util.Fadvise(bs.file, offset, length, advise)
|
||||
}
|
||||
|
||||
// unmapZeroBufSize is the size of the reusable zero buffer for unmap operations.
|
||||
const unmapZeroBufSize = 1 << 20 // 1MB
|
||||
|
||||
// unmapZeroBuf is a pre-allocated zero buffer shared across unmap calls.
|
||||
var unmapZeroBuf = make([]byte, unmapZeroBufSize)
|
||||
|
||||
func (bs *FileBackingStore) Unmap(descriptors []api.UnmapBlockDescriptor) error {
|
||||
for _, desc := range descriptors {
|
||||
zeros := make([]byte, desc.TL)
|
||||
if _, err := bs.file.WriteAt(zeros, int64(desc.Offset)); err != nil {
|
||||
return err
|
||||
remaining := desc.TL
|
||||
off := int64(desc.Offset)
|
||||
for remaining > 0 {
|
||||
writeLen := remaining
|
||||
if writeLen > unmapZeroBufSize {
|
||||
writeLen = unmapZeroBufSize
|
||||
}
|
||||
if _, err := bs.file.WriteAt(unmapZeroBuf[:writeLen], off); err != nil {
|
||||
return err
|
||||
}
|
||||
off += int64(writeLen)
|
||||
remaining -= writeLen
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user