Skip to content

Commit 0b43992

Browse files
committed
Fixes and test file added.
1 parent 2bbf944 commit 0b43992

File tree

6 files changed

+20
-85
lines changed

6 files changed

+20
-85
lines changed

.idea/modules/stream-m_test.iml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main/java/org/czentral/minirtmp/RTMPStreamProcessor.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ class RTMPStreamProcessor implements Processor {
2828

2929
protected boolean finished = false;
3030

31-
protected Map<Integer, MessageInfo> lastMessages = new HashMap<Integer, MessageInfo>();
32-
3331
private final int DEFAULT_CHUNK_SIZE = 128;
3432

3533
protected ResourceLimit limit;
@@ -39,6 +37,7 @@ class RTMPStreamProcessor implements Processor {
3937
protected RtmpReader reader = new RtmpReader(DEFAULT_CHUNK_SIZE);
4038

4139
protected RtmpPacket lastPacket = null;
40+
protected int chunkOffset = 0;
4241

4342
public RTMPStreamProcessor(ResourceLimit limit, ChunkProcessor chunkProcessor) {
4443
this.limit = limit;
@@ -90,29 +89,21 @@ public int processPacket(byte[] buffer, int offset, int length) {
9089
int processed = 0;
9190

9291
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
93-
if (lastPacket == null || lastPacket.chunkOffset >= reader.getChunkSize() || lastPacket.messageOffset >= lastPacket.messageSize) {
92+
if (lastPacket == null || chunkOffset >= reader.getChunkSize() || lastPacket.messageOffset + chunkOffset >= lastPacket.messageSize) {
9493
try {
9594
Optional<RtmpPacket> op = reader.read(bb);
9695
if (!op.isPresent()) {
9796
return 0;
9897
}
9998
lastPacket = op.get();
99+
chunkOffset = 0;
100100
processed += lastPacket.headerLength;
101-
//System.out.println(op.get());
102101
} catch (RtmpException e) {
103102
e.printStackTrace();
104103
throw new RuntimeException(e);
105-
//return 0;
106104
}
107105
}
108-
//if (bb.remaining() < lastPacket.payloadLegth) {
109-
// return 0;
110-
//}
111-
112-
//System.err.println("code: " + code + ", sid: " + sid + ", type: " + type + ", length: " + payloadLength);
113-
//System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, sidLength + headLength + payloadLength));
114-
//System.err.println(HexDump.prettyPrintHex(buffer, bufferOffset, Math.min(16, sidLength + headLength + payloadLength)));
115-
106+
116107
// change chunk size command processed
117108
if (lastPacket.messageType == 0x01) {
118109
if (bb.remaining() < 4) {
@@ -125,30 +116,24 @@ public int processPacket(byte[] buffer, int offset, int length) {
125116
reader.setChunkSize(newChunkSize);
126117
}
127118

128-
//chunkProcessor.processChunk(lastMessage, buffer, readOffset, payloadLength);
129-
//MessageInfo messageInfo = buildInfo(p);
130-
int chunkRemaining = (int)Math.min(lastPacket.messageSize-lastPacket.messageOffset, reader.getChunkSize() - lastPacket.chunkOffset);
119+
int chunkRemaining = (int)Math.min(lastPacket.messageSize-(lastPacket.messageOffset+chunkOffset), reader.getChunkSize() - chunkOffset);
131120
int byteCount = Math.min(chunkRemaining, bb.remaining());
132121
if (byteCount > 0) {
133-
chunkProcessor.processChunk(buildInfo(lastPacket), bb.array(), bb.position(), byteCount);
122+
chunkProcessor.processChunk(buildInfo(lastPacket, chunkOffset), bb.array(), bb.position(), byteCount);
134123

135124
finished = !chunkProcessor.alive();
136125

137-
lastPacket = lastPacket.transposed(byteCount);
126+
chunkOffset += byteCount;
138127

139128
processed += byteCount;
140129
}
141130

142-
//System.out.printf("len: %d, newLen: %d%n", sidLength + headLength + payloadLength, p.messageSize + p.headerLength);
143-
//return sidLength + headLength + payloadLength;
144-
//System.out.printf("len: %d, newLen: %d (%d + %d)%n", sidLength + headLength + payloadLength, p.headerLength + p.payloadLegth, p.headerLength, p.payloadLegth);
145131
return processed;
146132
}
147133

148-
private MessageInfo buildInfo(RtmpPacket p) {
134+
private static MessageInfo buildInfo(RtmpPacket p, int chunkOffset) {
149135
MessageInfo mi = new MessageInfo(p.sid, p.messageType, (int)p.messageSize);
150-
mi.offset = (int)p.messageOffset;
151-
mi.length = (int)p.messageSize;
136+
mi.offset = (int)p.messageOffset + chunkOffset;
152137
mi.calculatedTimestamp = p.absoluteTimestamp;
153138
return mi;
154139
}

src/main/java/org/czentral/minirtmp/RtmpPacket.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public class RtmpPacket {
3232

3333
long absoluteTimestamp;
3434
long messageOffset;
35-
long chunkOffset;
3635
long headerLength;
3736

3837
protected RtmpPacket() {
@@ -76,31 +75,6 @@ public boolean hasExtendedTimestamp() {
7675
return packetTimestamp == 0xffffff;
7776
}
7877

79-
private RtmpPacket(RtmpPacket other) {
80-
this.chunkType = other.chunkType;
81-
this.sid = other.sid;
82-
this.packetTimestamp = other.packetTimestamp;
83-
this.messageSize = other.messageSize;
84-
this.messageType = other.messageType;
85-
this.messageStreamId = other.messageStreamId;
86-
this.absoluteTimestamp = other.absoluteTimestamp;
87-
this.messageOffset = other.messageOffset;
88-
this.chunkOffset = other.chunkOffset;
89-
this.headerLength = other.headerLength;
90-
}
91-
92-
/**
93-
* Accounts part of the packet being processed by creating a new packet with modified messageOffset and payloadLength.
94-
* @param byteCount
95-
* @return
96-
*/
97-
public RtmpPacket transposed(int byteCount) {
98-
RtmpPacket result = new RtmpPacket(this);
99-
result.messageOffset += byteCount;
100-
result.chunkOffset += byteCount;
101-
return result;
102-
}
103-
10478
@Override
10579
public String toString() {
10680
return "RtmpPacket{" +
@@ -112,7 +86,6 @@ public String toString() {
11286
", messageStreamId=" + messageStreamId +
11387
", absoluteTimestamp=" + absoluteTimestamp +
11488
", messageOffset=" + messageOffset +
115-
", chunkOffset=" + chunkOffset +
11689
", headerLength=" + headerLength +
11790
'}';
11891
}

src/main/java/org/czentral/minirtmp/RtmpReader.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.czentral.minirtmp;
1919

20-
import org.czentral.incubator.streamm.HexDump;
21-
2220
import java.nio.ByteBuffer;
2321
import java.util.HashMap;
2422
import java.util.Map;
@@ -29,10 +27,6 @@ public class RtmpReader {
2927

3028
private long chunkSize;
3129

32-
private byte[] lastBytes = new byte[16 * 1024];
33-
private ByteBuffer lastBuffer = ByteBuffer.wrap(lastBytes);
34-
private RtmpPacket lastParsedPacket = null;
35-
3630
public RtmpReader(long chunkSize) {
3731
this.chunkSize = chunkSize;
3832
}
@@ -48,7 +42,11 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
4842
RtmpPacket lastPacket = lastPackets.getOrDefault(p.sid, null);
4943

5044
boolean newMessage = lastPacket == null ||
51-
lastPacket.messageOffset == lastPacket.messageSize;
45+
lastPacket.messageOffset + chunkSize >= lastPacket.messageSize;
46+
47+
if (!newMessage) {
48+
p.messageOffset = lastPacket.messageOffset + chunkSize;
49+
}
5250

5351
if (lastPacket != null) {
5452
p.absoluteTimestamp = lastPacket.absoluteTimestamp;
@@ -68,15 +66,6 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
6866

6967
} else {
7068
if (p.chunkType > 0) {
71-
//System.err.println("Last good packet --------------------------");
72-
//System.err.println(lastParsedPacket);
73-
//System.err.print(HexDump.prettyPrintHex(lastBytes, 0, lastBuffer.limit()));
74-
byte[] b = new byte[1024];
75-
ByteBuffer temp = buffer.duplicate();
76-
temp.position(originalOffset);
77-
int length = Math.min(b.length, temp.remaining());
78-
temp.get(b, 0, length);
79-
System.err.print(HexDump.prettyPrintHex(b, 0, length));
8069
throw new RtmpException(String.format("Missing previous chunk (chunkId: %d, type: %d)", p.sid, p.chunkType));
8170
}
8271
}
@@ -97,19 +86,10 @@ public Optional<RtmpPacket> read(ByteBuffer buffer) throws RtmpException {
9786
}
9887
}
9988

100-
if (!newMessage) {
101-
p.messageOffset = lastPacket.messageOffset;
102-
}
10389
p.headerLength = buffer.position() - originalOffset;
10490

10591
lastPackets.put(p.sid, p);
10692

107-
ByteBuffer temp = buffer.duplicate();
108-
temp.position(originalOffset);
109-
lastBuffer.limit(Math.min(temp.limit()-temp.position(), lastBytes.length));
110-
temp.get(lastBytes, 0, lastBuffer.limit());
111-
lastParsedPacket = p;
112-
11393
return Optional.of(p);
11494
}
11595

src/test/java/org/czentral/minirtmp/RTMPStreamProcessorTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,11 @@
1717

1818
package org.czentral.minirtmp;
1919

20-
import org.czentral.minihttp.ChunkedInputStream;
2120
import org.junit.Test;
2221

2322
import java.io.File;
2423
import java.io.FileInputStream;
2524
import java.io.IOException;
26-
import java.nio.file.Files;
27-
28-
import static org.junit.Assert.*;
2925

3026
public class RTMPStreamProcessorTest {
3127

@@ -36,14 +32,12 @@ public void process() throws Exception {
3632
limit.assemblyBufferSize = 200 * 1024;
3733
limit.chunkStreamCount = 100;
3834

39-
String pathname = "httpresp.pcap";
35+
String pathname = "rtmp-in.pcap";
4036
byte[] buffer = loadFile(pathname);
41-
//int start = 0xc938;
4237
int start = 0xc5f;
43-
//int end = 0xe938;
4438
int end = 0x16c7 + 0x28;
4539

46-
for (int i=0xc6b; i<end; i++) {
40+
for (int i=start; i<end; i++) {
4741
TestChunkProcessor cp = new TestChunkProcessor();
4842
RTMPStreamProcessor o = new RTMPStreamProcessor(limit, cp);
4943
try {
@@ -60,7 +54,9 @@ public void process() throws Exception {
6054
}
6155

6256
private byte[] loadFile(String pathname) throws IOException {
63-
File file = new File(pathname);
57+
//File file = new File(pathname);
58+
ClassLoader classLoader = getClass().getClassLoader();
59+
File file = new File(classLoader.getResource(pathname).getFile());
6460
FileInputStream fis = new FileInputStream(file);
6561
byte[] buffer = new byte[(int)file.length()];
6662
int offset = 0;

src/test/resources/rtmp-in.pcap

5.73 KB
Binary file not shown.

0 commit comments

Comments
 (0)