Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 增加视频转码相关命令 #12

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.jetlinks.sdk.server.media.transcode;

public interface AudioCodecs {

String AAC = "aac";
String PCMA = "pcm_alaw";
String PCMU = "pcm_mulaw";
String OPUS = "opus";
String FLAC = "flac";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.jetlinks.sdk.server.media.transcode;

import io.netty.buffer.ByteBuf;
import org.jetlinks.core.command.StreamCommand;
import org.jetlinks.sdk.server.utils.CastUtils;
import reactor.core.publisher.Flux;

import javax.annotation.Nonnull;

public class MediaStreamingTranscodeCommand extends TranscodeCommand<MediaStreamingTranscodeCommand> implements StreamCommand<ByteBuf, ByteBuf> {


private Flux<ByteBuf> stream;

@Nonnull
@Override
public Flux<ByteBuf> stream() {
return stream == null ? Flux.empty() : stream;
}

@Override
public void withStream(@Nonnull Flux<ByteBuf> stream) {
this.stream = stream;
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.jetlinks.sdk.server.media.transcode;


public class MediaTranscodeCommand extends TranscodeCommand<MediaTranscodeCommand> {



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.jetlinks.sdk.server.media.transcode;

import io.netty.buffer.ByteBuf;
import org.jetlinks.core.command.AbstractCommand;
import org.jetlinks.core.command.AbstractStreamCommand;
import org.jetlinks.sdk.server.utils.CastUtils;
import org.jetlinks.sdk.server.utils.ConverterUtils;
import reactor.core.publisher.Mono;

public abstract class StreamingTranscodeCommand<Self extends StreamingTranscodeCommand<Self>>
extends AbstractStreamCommand<ByteBuf, ByteBuf, Self> {

public boolean isIgnoreResult() {
return CastUtils.castBoolean(
readable()
.getOrDefault("ignoreResult", false));
}

public Self setIgnoreResult(boolean ignoreResult) {
return with("ignoreResult", ignoreResult);
}

public String getSourceFormat() {
return getOrNull("sourceFormat", String.class);
}

public Self setSourceFormat(String sourceFormat) {
return with("sourceFormat", sourceFormat);
}

public String getTargetFormat() {
return getOrNull("targetFormat", String.class);
}

public Self setTargetFormat(String targetFormat) {
return with("targetFormat", targetFormat);
}

public String getTarget() {
return getOrNull("target", String.class);
}

public Self setTarget(String target) {
return with("target", target);
}

@Override
public ByteBuf convertStreamValue(Object value) {
return ConverterUtils.convertNettyBuffer(value);
}

@Override
public Object createResponseData(Object value) {
return ConverterUtils.convertNettyBuffer(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.jetlinks.sdk.server.media.transcode;

import io.netty.buffer.ByteBuf;
import org.jetlinks.core.command.AbstractCommand;
import org.jetlinks.sdk.server.utils.CastUtils;
import org.jetlinks.sdk.server.utils.ConverterUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class TranscodeCommand<Self extends TranscodeCommand<Self>> extends AbstractCommand<Flux<ByteBuf>, Self> {

public String getSource() {
return getOrNull("source", String.class);
}

public Self setSource(String source) {
return with("source", source);
}

public String getTarget() {
return getOrNull("target", String.class);
}

public Self setTarget(String target) {
return with("target", target);
}

public String getSourceFormat() {
return getOrNull("sourceFormat", String.class);
}

public Self setSourceFormat(String sourceFormat) {
return with("sourceFormat", sourceFormat);
}

public String getTargetFormat() {
return getOrNull("targetFormat", String.class);
}

public Self setTargetFormat(String targetFormat) {
return with("targetFormat", targetFormat);
}

public boolean isIgnoreAudio() {
return CastUtils.castBoolean(
readable()
.getOrDefault("ignoreAudio", false));
}

public Self setIgnoreAudio(boolean onlyAudio) {
return with("ignoreAudio", onlyAudio);
}


public boolean isOnlyAudio() {
return CastUtils.castBoolean(
readable()
.getOrDefault("onlyAudio", false));
}

public Self setOnlyAudio(boolean onlyAudio) {
return with("onlyAudio", onlyAudio);
}

public boolean isAwait() {
return CastUtils.castBoolean(
readable()
.getOrDefault("await", false));
}

public Self setAwait(boolean await) {
return with("await", await);
}

/**
* @see AudioCodecs
*/
public String getAudioCodec() {
return getOrNull("audioCodec", String.class);
}

/**
* @see AudioCodecs
*/
public Self setAudioCodec(String codec) {
return with("audioCodec", codec);
}

/**
* @see VideoCodecs
*/
public String getVideoCodec() {
return getOrNull("videoCodec", String.class);
}

/**
* @see VideoCodecs
*/
public Self setVideoCodec(String codec) {
return with("videoCodec", codec);
}


@Override
public Object createResponseData(Object value) {
return ConverterUtils.convertNettyBuffer(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.jetlinks.sdk.server.media.transcode;

public interface VideoCodecs {
String copy = "copy";
String H264 = "h264";
String H265 = "h265";
String MPEG4 = "mpeg4";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.springframework.core.io.buffer.NettyDataBufferFactory;

import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.nio.*;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -201,8 +204,13 @@ public static ByteBuf convertNettyBuffer(Object obj) {
return Unpooled.wrappedBuffer(((DataBuffer) obj).asByteBuffer());
}

if (obj instanceof ByteBuffer) {
return Unpooled.wrappedBuffer(((ByteBuffer) obj));
// if (obj instanceof ByteBuffer) {
// return Unpooled.wrappedBuffer(((ByteBuffer) obj));
// }
if (obj instanceof Buffer) {
return convertNioBufferToNettyBuf(((Buffer) obj),
ignore -> {
});
}

if (obj instanceof String) {
Expand Down Expand Up @@ -235,5 +243,82 @@ public static ByteBuf convertNettyBuffer(Object obj) {
return Unpooled.wrappedBuffer(String.valueOf(obj).getBytes());
}

private static <T extends Buffer> ByteBuf convertNioBufferToNettyBuf0(T nioBuffer,
int bit,
Consumer<ByteBuffer> init,
BiConsumer<ByteBuffer, T> consumer) {
nioBuffer.mark();

ByteBuffer buffer = nioBuffer.isDirect()
? ByteBuffer.allocateDirect(nioBuffer.remaining() * bit)
: ByteBuffer.allocate(nioBuffer.remaining() * bit);

init.accept(buffer);

consumer.accept(buffer, nioBuffer);

nioBuffer.reset();

return Unpooled.wrappedBuffer(buffer);
}

public static ByteBuf convertNioBufferToNettyBuf(Buffer buffer, Consumer<ByteBuffer> init) {
if (buffer instanceof ByteBuffer) {
init.accept((ByteBuffer) buffer);
return Unpooled.wrappedBuffer((ByteBuffer) buffer);
}

if (buffer instanceof ShortBuffer) {

return convertNioBufferToNettyBuf0(
(ShortBuffer) buffer,
2,
init,
(buf, nioBuffer) -> buf.asShortBuffer().put(nioBuffer));
}

if (buffer instanceof IntBuffer) {
return convertNioBufferToNettyBuf0(
(IntBuffer) buffer,
4,
init,
(buf, nioBuffer) -> buf.asIntBuffer().put(nioBuffer));
}

if (buffer instanceof LongBuffer) {
return convertNioBufferToNettyBuf0(
(LongBuffer) buffer,
8,
init,
(buf, nioBuffer) -> buf.asLongBuffer().put(nioBuffer));
}

if (buffer instanceof FloatBuffer) {
return convertNioBufferToNettyBuf0(
(FloatBuffer) buffer,
4,
init,
(buf, nioBuffer) -> buf.asFloatBuffer().put(nioBuffer));
}

if (buffer instanceof DoubleBuffer) {
return convertNioBufferToNettyBuf0(
(DoubleBuffer) buffer,
8,
init,
(buf, nioBuffer) -> buf.asDoubleBuffer().put(nioBuffer));
}

if (buffer instanceof CharBuffer) {
return convertNioBufferToNettyBuf0(
(CharBuffer) buffer,
2,
init,
(buf, nioBuffer) -> buf.asCharBuffer().put(nioBuffer));
}


throw new UnsupportedOperationException("unsupported buffer type:" + buffer.getClass());
}

}