Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add remote IP to the source object #55

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@
import com.illposed.osc.argument.handler.StringArgumentHandler;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;

/**
* Dispatches {@link OSCPacket}s to registered listeners (<i>Method</i>s).
Expand All @@ -41,8 +35,8 @@ public class OSCPacketDispatcher implements OSCPacketListener {
private final ByteBuffer argumentTypesBuffer;
private final OSCSerializer serializer;
private final Charset typeTagsCharset;
private final List<SelectiveMessageListener> selectiveMessageListeners;
private final List<OSCBadDataListener> badDataListeners;
private final Queue<SelectiveMessageListener> selectiveMessageListeners;
private final Queue<OSCBadDataListener> badDataListeners;
private boolean metaInfoRequired;
/**
* Whether to disregard bundle time-stamps for dispatch-scheduling.
Expand Down Expand Up @@ -149,8 +143,8 @@ public OSCPacketDispatcher(
this.typeTagsCharset = (propertiesCharset == null)
? Charset.defaultCharset()
: propertiesCharset;
this.selectiveMessageListeners = new ArrayList<>();
this.badDataListeners = new ArrayList<>();
this.selectiveMessageListeners = new ConcurrentLinkedQueue<>();
this.badDataListeners = new ConcurrentLinkedQueue<>();
this.metaInfoRequired = false;
this.alwaysDispatchingImmediately = false;
this.dispatchScheduler = dispatchScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,140 +37,166 @@
*/
public class OSCDatagramChannel extends SelectableChannel {

private final DatagramChannel underlyingChannel;
private final OSCParser parser;
private final OSCSerializerAndParserBuilder serializerBuilder;

public OSCDatagramChannel(
final DatagramChannel underlyingChannel,
final OSCSerializerAndParserBuilder serializerAndParserBuilder
)
{
this.underlyingChannel = underlyingChannel;
OSCParser tmpParser = null;
if (serializerAndParserBuilder != null) {
tmpParser = serializerAndParserBuilder.buildParser();
}
this.parser = tmpParser;
this.serializerBuilder = serializerAndParserBuilder;
}

public OSCPacket read(final ByteBuffer buffer) throws IOException, OSCParseException {

boolean completed = false;
OSCPacket oscPacket;
try {
begin();
private final DatagramChannel underlyingChannel;
private final OSCParser parser;
private final OSCSerializerAndParserBuilder serializerBuilder;

public static class OSCPacketWithSource {
private OSCPacket packet;
private SocketAddress source;

public OSCPacketWithSource(OSCPacket packet, SocketAddress source) {
this.packet = packet;
this.source = source;
}

public OSCPacket getPacket() {
return packet;
}

public SocketAddress getSource() {
return source;
}

@Override
public boolean equals(Object obj) {
return ((obj instanceof OSCPacketWithSource))
&& (((OSCPacketWithSource) obj).packet == packet
&& ((OSCPacketWithSource) obj).source == source);
}
}

public OSCDatagramChannel(
final DatagramChannel underlyingChannel,
final OSCSerializerAndParserBuilder serializerAndParserBuilder
) {
this.underlyingChannel = underlyingChannel;
OSCParser tmpParser = null;
if (serializerAndParserBuilder != null) {
tmpParser = serializerAndParserBuilder.buildParser();
}
this.parser = tmpParser;
this.serializerBuilder = serializerAndParserBuilder;
}

public OSCPacketWithSource read(final ByteBuffer buffer) throws IOException, OSCParseException {
boolean completed = false;
OSCPacket oscPacket;
SocketAddress peer;
try {
begin();

buffer.clear();
// NOTE From the doc of `read()` and `receive()`:
// "If there are fewer bytes remaining in the buffer
// than are required to hold the datagram
// then the remainder of the datagram is silently discarded."
if (underlyingChannel.isConnected()) {
peer = underlyingChannel.getRemoteAddress();
underlyingChannel.read(buffer);
} else {
underlyingChannel.receive(buffer);
peer = underlyingChannel.receive(buffer);
}
// final int readBytes = buffer.position();
// if (readBytes == buffer.capacity()) {
// // TODO In this case it is very likely that the buffer was actually too small, and the remainder of the datagram/packet was silently discarded. We might want to give a warning, like throw an exception in this case, but whether this happens should probably be user configurable.
// }
buffer.flip();
if (buffer.limit() == 0) {
throw new OSCParseException("Received a packet without any data");
} else {
oscPacket = parser.convert(buffer);
completed = true;
}
} finally {
end(completed);
}

return oscPacket;
}

public void send(final ByteBuffer buffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {

boolean completed = false;
try {
begin();

final OSCSerializer serializer = serializerBuilder.buildSerializer(buffer);
buffer.rewind();
serializer.write(packet);
buffer.flip();
if (underlyingChannel.isConnected()) {
underlyingChannel.write(buffer);
} else if (remoteAddress == null) {
throw new IllegalStateException("Not connected and no remote address is given");
} else {
underlyingChannel.send(buffer, remoteAddress);
}
completed = true;
} finally {
end(completed);
}
}

public void write(final ByteBuffer buffer, final OSCPacket packet) throws IOException, OSCSerializeException {

boolean completed = false;
try {
begin();
if (!underlyingChannel.isConnected()) {
throw new IllegalStateException("Either connect the channel or use write()");
}
send(buffer, packet, null);
completed = true;
} finally {
end(completed);
}
}

@Override
public SelectorProvider provider() {
return underlyingChannel.provider();
}

@Override
public boolean isRegistered() {
return underlyingChannel.isRegistered();
}

@Override
public SelectionKey keyFor(final Selector sel) {
return underlyingChannel.keyFor(sel);
}

@Override
public SelectionKey register(final Selector sel, final int ops, final Object att) throws ClosedChannelException {
return underlyingChannel.register(sel, ops, att);
}

@Override
public SelectableChannel configureBlocking(final boolean block) throws IOException {
return underlyingChannel.configureBlocking(block);
}

@Override
public boolean isBlocking() {
return underlyingChannel.isBlocking();
}

@Override
public Object blockingLock() {
return underlyingChannel.blockingLock();
}

@Override
protected void implCloseChannel() throws IOException {
// XXX is this ok?
underlyingChannel.close();
}

@Override
public int validOps() {
return underlyingChannel.validOps();
}
buffer.flip();
if (buffer.limit() == 0) {
throw new OSCParseException("Received a packet without any data");
} else {
oscPacket = parser.convert(buffer);
completed = true;
}
} finally {
end(completed);
}

return new OSCPacketWithSource(oscPacket, peer);
}


public void send(final ByteBuffer buffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {

boolean completed = false;
try {
begin();

final OSCSerializer serializer = serializerBuilder.buildSerializer(buffer);
buffer.rewind();
serializer.write(packet);
buffer.flip();
if (underlyingChannel.isConnected()) {
underlyingChannel.write(buffer);
} else if (remoteAddress == null) {
throw new IllegalStateException("Not connected and no remote address is given");
} else {
underlyingChannel.send(buffer, remoteAddress);
}
completed = true;
} finally {
end(completed);
}
}

public void write(final ByteBuffer buffer, final OSCPacket packet) throws IOException, OSCSerializeException {

boolean completed = false;
try {
begin();
if (!underlyingChannel.isConnected()) {
throw new IllegalStateException("Either connect the channel or use write()");
}
send(buffer, packet, null);
completed = true;
} finally {
end(completed);
}
}

@Override
public SelectorProvider provider() {
return underlyingChannel.provider();
}

@Override
public boolean isRegistered() {
return underlyingChannel.isRegistered();
}

@Override
public SelectionKey keyFor(final Selector sel) {
return underlyingChannel.keyFor(sel);
}

@Override
public SelectionKey register(final Selector sel, final int ops, final Object att) throws ClosedChannelException {
return underlyingChannel.register(sel, ops, att);
}

@Override
public SelectableChannel configureBlocking(final boolean block) throws IOException {
return underlyingChannel.configureBlocking(block);
}

@Override
public boolean isBlocking() {
return underlyingChannel.isBlocking();
}

@Override
public Object blockingLock() {
return underlyingChannel.blockingLock();
}

@Override
protected void implCloseChannel() throws IOException {
// XXX is this ok?
underlyingChannel.close();
}

@Override
public int validOps() {
return underlyingChannel.validOps();
}
}
Loading