1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.codecs;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.FileChannel;
33 import java.nio.channels.WritableByteChannel;
34
35 import org.apache.http.impl.io.HttpTransportMetricsImpl;
36 import org.apache.http.nio.FileContentEncoder;
37 import org.apache.http.nio.reactor.SessionOutputBuffer;
38 import org.apache.http.util.Args;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class LengthDelimitedEncoder extends AbstractContentEncoder
54 implements FileContentEncoder {
55
56 private final long contentLength;
57 private final int fragHint;
58
59 private long remaining;
60
61
62
63
64
65
66
67
68
69
70
71
72 public LengthDelimitedEncoder(
73 final WritableByteChannel channel,
74 final SessionOutputBuffer buffer,
75 final HttpTransportMetricsImpl metrics,
76 final long contentLength,
77 final int fragementSizeHint) {
78 super(channel, buffer, metrics);
79 Args.notNegative(contentLength, "Content length");
80 this.contentLength = contentLength;
81 this.fragHint = fragementSizeHint > 0 ? fragementSizeHint : 0;
82 this.remaining = contentLength;
83 }
84
85 public LengthDelimitedEncoder(
86 final WritableByteChannel channel,
87 final SessionOutputBuffer buffer,
88 final HttpTransportMetricsImpl metrics,
89 final long contentLength) {
90 this(channel, buffer, metrics, contentLength, 0);
91 }
92
93 private int nextChunk(final ByteBuffer src) {
94 return (int) Math.min(Math.min(this.remaining, Integer.MAX_VALUE), src.remaining());
95 }
96
97 @Override
98 public int write(final ByteBuffer src) throws IOException {
99 if (src == null) {
100 return 0;
101 }
102 assertNotCompleted();
103
104 int total = 0;
105 while (src.hasRemaining() && this.remaining > 0) {
106 if (this.buffer.hasData() || this.fragHint > 0) {
107 final int chunk = nextChunk(src);
108 if (chunk <= this.fragHint) {
109 final int capacity = this.fragHint - this.buffer.length();
110 if (capacity > 0) {
111 final int limit = Math.min(capacity, chunk);
112 final int bytesWritten = writeToBuffer(src, limit);
113 this.remaining -= bytesWritten;
114 total += bytesWritten;
115 }
116 }
117 }
118 if (this.buffer.hasData()) {
119 final int chunk = nextChunk(src);
120 if (this.buffer.length() >= this.fragHint || chunk > 0) {
121 final int bytesWritten = flushToChannel();
122 if (bytesWritten == 0) {
123 break;
124 }
125 }
126 }
127 if (!this.buffer.hasData()) {
128 final int chunk = nextChunk(src);
129 if (chunk > this.fragHint) {
130 final int bytesWritten = writeToChannel(src, chunk);
131 this.remaining -= bytesWritten;
132 total += bytesWritten;
133 if (bytesWritten == 0) {
134 break;
135 }
136 }
137 }
138 }
139 if (this.remaining <= 0) {
140 super.complete();
141 }
142 return total;
143 }
144
145 @Override
146 public long transfer(
147 final FileChannel src,
148 final long position,
149 final long count) throws IOException {
150
151 if (src == null) {
152 return 0;
153 }
154 assertNotCompleted();
155
156 flushToChannel();
157 if (this.buffer.hasData()) {
158 return 0;
159 }
160
161 final long chunk = Math.min(this.remaining, count);
162 final long bytesWritten = src.transferTo(position, chunk, this.channel);
163 if (bytesWritten > 0) {
164 this.metrics.incrementBytesTransferred(bytesWritten);
165 }
166 this.remaining -= bytesWritten;
167 if (this.remaining <= 0) {
168 super.complete();
169 }
170 return bytesWritten;
171 }
172
173 @Override
174 public String toString() {
175 final StringBuilder sb = new StringBuilder();
176 sb.append("[content length: ");
177 sb.append(this.contentLength);
178 sb.append("; pos: ");
179 sb.append(this.contentLength - this.remaining);
180 sb.append("; completed: ");
181 sb.append(isCompleted());
182 sb.append("]");
183 return sb.toString();
184 }
185
186 }