/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.syscall;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.apache.activemq.syscall.jni.PosixAIO;
import org.apache.activemq.syscall.jni.IO;
import org.apache.activemq.syscall.jni.PosixAIO.aiocb;
import org.apache.activemq.syscall.jni.IO.iovec;
import static org.apache.activemq.syscall.jni.PosixAIO.*;
import static org.apache.activemq.syscall.jni.CLibrary.*;
import static org.apache.activemq.syscall.jni.IO.*;
import static org.apache.activemq.syscall.jni.IO.iovec.*;
/**
* Used to access a file descriptor.
*
* @author Hiram Chirino
*/
public class FileDescriptor {
private final int fd;
boolean opened;
private AioPollAgent aioPollAgent;
public FileDescriptor(int fd) {
this.fd = fd;
}
public static FileDescriptor open(String path, int oflags) throws IOException {
return open(path, oflags, 0);
}
public static FileDescriptor open(File file, int oflags) throws IOException {
return open(file.getPath(), oflags, 0);
}
public static FileDescriptor open(File file, int oflags, int mode) throws IOException {
return open(file.getPath(), oflags, mode);
}
public static FileDescriptor open(String path, int oflags, int mode) throws IOException {
int fd = IO.open(path, oflags, mode);
if (fd == -1) {
throw error();
}
FileDescriptor rc = new FileDescriptor(fd);
rc.opened = true;
return rc;
}
public int dispose() {
if (closeCheck()) {
return IO.close(fd);
}
return 0;
}
public void close() throws IOException {
if (dispose() == -1) {
throw error();
}
}
private boolean closeCheck() {
if (opened) {
opened = false;
return true;
}
return false;
}
int getFD() {
return fd;
}
public long seek(long offset) throws IOException {
return seek(offset, SEEK_SET);
}
public long seek(long offset, int whence) throws IOException {
long rc = IO.lseek(fd, offset, whence);
if (rc == -1) {
throw error();
}
return rc;
}
public long write(NativeAllocation buffer) throws IOException {
long rc = IO.write(fd, buffer.pointer(), buffer.length());
if (rc == -1) {
throw error();
}
return rc;
}
public long read(NativeAllocation buffer) throws IOException {
long rc = IO.read(fd, buffer.pointer(), buffer.length());
if (rc == -1) {
throw error();
}
return rc;
}
public long write(long offset, NativeAllocation buffer) throws IOException {
long rc = IO.pwrite(fd, buffer.pointer(), buffer.length(), offset);
if (rc == -1) {
throw error();
}
return rc;
}
public long read(long offset, NativeAllocation buffer) throws IOException {
long rc = IO.pread(fd, buffer.pointer(), buffer.length(), offset);
if (rc == -1) {
throw error();
}
return rc;
}
public long write(Collection buffers) throws IOException {
long iovecp = malloc(iovec.SIZEOF * buffers.size());
if (iovecp == NULL) {
throw new OutOfMemoryError();
}
try {
long cur = iovecp;
iovec v = new iovec();
for (NativeAllocation buffer : buffers) {
v.iov_base = buffer.pointer();
v.iov_len = buffer.length();
memmove(cur, v, iovec.SIZEOF);
cur = iovec_add(cur, 1);
}
long rc = IO.writev(fd, iovecp, buffers.size());
if (rc == -1) {
throw error();
}
return rc;
} finally {
free(iovecp);
}
}
public long read(Collection buffers) throws IOException {
long iovecp = malloc(iovec.SIZEOF * buffers.size());
if (iovecp == NULL) {
throw new OutOfMemoryError();
}
try {
long cur = iovecp;
iovec v = new iovec();
for (NativeAllocation buffer : buffers) {
v.iov_base = buffer.pointer();
v.iov_len = buffer.length();
memmove(cur, v, iovec.SIZEOF);
cur = iovec_add(cur, 1);
}
long rc = IO.readv(fd, iovecp, buffers.size());
if (rc == -1) {
throw error();
}
return rc;
} finally {
free(iovecp);
}
}
public boolean isAsyncIOSupported() {
return PosixAIO.SUPPORTED;
}
/**
* Performs a non blocking write, the callback gets executed once the write
* completes. The buffer should not be read until the operation completes.
*
* @param buffer
* @param callback
*/
public void write(long offset, NativeAllocation buffer, Callback callback) throws IOException {
long aiocbp = block(offset, buffer);
int rc = aio_write(aiocbp);
if (rc == -1) {
free(aiocbp);
throw error();
}
agent().watch(aiocbp, callback);
}
static private IOException error() {
return new IOException(string(strerror(errno())));
}
/**
* Performs a non blocking read, the callback gets executed once the read
* completes. The buffer should not be modified until the operation
* completes.
*
* @param buffer
* @param callback
*/
public void read(long offset, NativeAllocation buffer, Callback callback) throws IOException {
long aiocbp = block(offset, buffer);
int rc = aio_read(aiocbp);
if (rc == -1) {
free(aiocbp);
throw error();
}
agent().watch(aiocbp, callback);
}
public void sync() throws IOException {
int rc = IO.fsync(fd);
if( rc == -1 ) {
throw error();
}
}
public boolean isfullSyncSupported() {
return F_FULLFSYNC != 0;
}
public void fullSync() throws IOException, UnsupportedOperationException {
if (!isfullSyncSupported()) {
throw new UnsupportedOperationException();
}
int rc = fcntl(fd, F_FULLFSYNC);
if( rc == -1 ) {
throw error();
}
}
public boolean isDirectIOSupported() {
if (!HAVE_FCNTL_FUNCTION)
return false;
if (F_NOCACHE != 0)
return true;
if (O_DIRECT != 0)
return true;
return false;
}
public void enableDirectIO() throws IOException, UnsupportedOperationException {
if (F_NOCACHE != 0) {
int rc = fcntl(fd, F_NOCACHE);
if( rc == -1 ) {
throw error();
}
} else if (O_DIRECT != 0) {
int rc = fcntl(fd, F_GETFL);
if( rc == -1 ) {
throw error();
}
rc = fcntl(fd, F_SETFL, rc|O_DIRECT );
if( rc == -1 ) {
throw error();
}
} else {
throw new UnsupportedOperationException();
}
}
/**
* allocates an initialized aiocb structure on the heap using the given
* parameters.
*/
private long block(long offset, NativeAllocation buffer) throws OutOfMemoryError {
aiocb cb = new aiocb();
cb.aio_fildes = fd;
cb.aio_offset = offset;
cb.aio_buf = buffer.pointer();
cb.aio_nbytes = buffer.length();
long aiocbp = malloc(aiocb.SIZEOF);
if (aiocbp == NULL) {
throw new OutOfMemoryError();
}
aiocb.memmove(aiocbp, cb, aiocb.SIZEOF);
return aiocbp;
}
/**
* gets the poll agent that will be used to watch of completion of AIO
* requets.
*
* @return
*/
private AioPollAgent agent() {
if (aioPollAgent == null) {
aioPollAgent = AioPollAgent.getMainPollAgent();
}
return aioPollAgent;
}
public void setAioPollAgent(AioPollAgent aioPollAgent) {
this.aioPollAgent = aioPollAgent;
}
}