]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package aws |
2 | ||
3 | import ( | |
4 | "io" | |
5 | "sync" | |
15c0b25d AP |
6 | |
7 | "github.com/aws/aws-sdk-go/internal/sdkio" | |
bae9f6d2 JC |
8 | ) |
9 | ||
863486a6 AG |
10 | // ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the |
11 | // SDK to accept an io.Reader that is not also an io.Seeker for unsigned | |
12 | // streaming payload API operations. | |
bae9f6d2 | 13 | // |
863486a6 AG |
14 | // A ReadSeekCloser wrapping an nonseekable io.Reader used in an API |
15 | // operation's input will prevent that operation being retried in the case of | |
16 | // network errors, and cause operation requests to fail if the operation | |
17 | // requires payload signing. | |
18 | // | |
19 | // Note: If using With S3 PutObject to stream an object upload The SDK's S3 | |
20 | // Upload manager (s3manager.Uploader) provides support for streaming with the | |
21 | // ability to retry network errors. | |
bae9f6d2 JC |
22 | func ReadSeekCloser(r io.Reader) ReaderSeekerCloser { |
23 | return ReaderSeekerCloser{r} | |
24 | } | |
25 | ||
26 | // ReaderSeekerCloser represents a reader that can also delegate io.Seeker and | |
27 | // io.Closer interfaces to the underlying object if they are available. | |
28 | type ReaderSeekerCloser struct { | |
29 | r io.Reader | |
30 | } | |
31 | ||
15c0b25d AP |
32 | // IsReaderSeekable returns if the underlying reader type can be seeked. A |
33 | // io.Reader might not actually be seekable if it is the ReaderSeekerCloser | |
34 | // type. | |
35 | func IsReaderSeekable(r io.Reader) bool { | |
36 | switch v := r.(type) { | |
37 | case ReaderSeekerCloser: | |
38 | return v.IsSeeker() | |
39 | case *ReaderSeekerCloser: | |
40 | return v.IsSeeker() | |
41 | case io.ReadSeeker: | |
42 | return true | |
43 | default: | |
44 | return false | |
45 | } | |
46 | } | |
47 | ||
bae9f6d2 JC |
48 | // Read reads from the reader up to size of p. The number of bytes read, and |
49 | // error if it occurred will be returned. | |
50 | // | |
863486a6 AG |
51 | // If the reader is not an io.Reader zero bytes read, and nil error will be |
52 | // returned. | |
bae9f6d2 JC |
53 | // |
54 | // Performs the same functionality as io.Reader Read | |
55 | func (r ReaderSeekerCloser) Read(p []byte) (int, error) { | |
56 | switch t := r.r.(type) { | |
57 | case io.Reader: | |
58 | return t.Read(p) | |
59 | } | |
60 | return 0, nil | |
61 | } | |
62 | ||
63 | // Seek sets the offset for the next Read to offset, interpreted according to | |
64 | // whence: 0 means relative to the origin of the file, 1 means relative to the | |
65 | // current offset, and 2 means relative to the end. Seek returns the new offset | |
66 | // and an error, if any. | |
67 | // | |
68 | // If the ReaderSeekerCloser is not an io.Seeker nothing will be done. | |
69 | func (r ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) { | |
70 | switch t := r.r.(type) { | |
71 | case io.Seeker: | |
72 | return t.Seek(offset, whence) | |
73 | } | |
74 | return int64(0), nil | |
75 | } | |
76 | ||
77 | // IsSeeker returns if the underlying reader is also a seeker. | |
78 | func (r ReaderSeekerCloser) IsSeeker() bool { | |
79 | _, ok := r.r.(io.Seeker) | |
80 | return ok | |
81 | } | |
82 | ||
15c0b25d AP |
83 | // HasLen returns the length of the underlying reader if the value implements |
84 | // the Len() int method. | |
85 | func (r ReaderSeekerCloser) HasLen() (int, bool) { | |
86 | type lenner interface { | |
87 | Len() int | |
88 | } | |
89 | ||
90 | if lr, ok := r.r.(lenner); ok { | |
91 | return lr.Len(), true | |
92 | } | |
93 | ||
94 | return 0, false | |
95 | } | |
96 | ||
97 | // GetLen returns the length of the bytes remaining in the underlying reader. | |
98 | // Checks first for Len(), then io.Seeker to determine the size of the | |
99 | // underlying reader. | |
100 | // | |
101 | // Will return -1 if the length cannot be determined. | |
102 | func (r ReaderSeekerCloser) GetLen() (int64, error) { | |
103 | if l, ok := r.HasLen(); ok { | |
104 | return int64(l), nil | |
105 | } | |
106 | ||
107 | if s, ok := r.r.(io.Seeker); ok { | |
108 | return seekerLen(s) | |
109 | } | |
110 | ||
111 | return -1, nil | |
112 | } | |
113 | ||
114 | // SeekerLen attempts to get the number of bytes remaining at the seeker's | |
115 | // current position. Returns the number of bytes remaining or error. | |
116 | func SeekerLen(s io.Seeker) (int64, error) { | |
117 | // Determine if the seeker is actually seekable. ReaderSeekerCloser | |
118 | // hides the fact that a io.Readers might not actually be seekable. | |
119 | switch v := s.(type) { | |
120 | case ReaderSeekerCloser: | |
121 | return v.GetLen() | |
122 | case *ReaderSeekerCloser: | |
123 | return v.GetLen() | |
124 | } | |
125 | ||
126 | return seekerLen(s) | |
127 | } | |
128 | ||
129 | func seekerLen(s io.Seeker) (int64, error) { | |
130 | curOffset, err := s.Seek(0, sdkio.SeekCurrent) | |
131 | if err != nil { | |
132 | return 0, err | |
133 | } | |
134 | ||
135 | endOffset, err := s.Seek(0, sdkio.SeekEnd) | |
136 | if err != nil { | |
137 | return 0, err | |
138 | } | |
139 | ||
140 | _, err = s.Seek(curOffset, sdkio.SeekStart) | |
141 | if err != nil { | |
142 | return 0, err | |
143 | } | |
144 | ||
145 | return endOffset - curOffset, nil | |
146 | } | |
147 | ||
bae9f6d2 JC |
148 | // Close closes the ReaderSeekerCloser. |
149 | // | |
150 | // If the ReaderSeekerCloser is not an io.Closer nothing will be done. | |
151 | func (r ReaderSeekerCloser) Close() error { | |
152 | switch t := r.r.(type) { | |
153 | case io.Closer: | |
154 | return t.Close() | |
155 | } | |
156 | return nil | |
157 | } | |
158 | ||
159 | // A WriteAtBuffer provides a in memory buffer supporting the io.WriterAt interface | |
160 | // Can be used with the s3manager.Downloader to download content to a buffer | |
161 | // in memory. Safe to use concurrently. | |
162 | type WriteAtBuffer struct { | |
163 | buf []byte | |
164 | m sync.Mutex | |
165 | ||
166 | // GrowthCoeff defines the growth rate of the internal buffer. By | |
167 | // default, the growth rate is 1, where expanding the internal | |
168 | // buffer will allocate only enough capacity to fit the new expected | |
169 | // length. | |
170 | GrowthCoeff float64 | |
171 | } | |
172 | ||
173 | // NewWriteAtBuffer creates a WriteAtBuffer with an internal buffer | |
174 | // provided by buf. | |
175 | func NewWriteAtBuffer(buf []byte) *WriteAtBuffer { | |
176 | return &WriteAtBuffer{buf: buf} | |
177 | } | |
178 | ||
179 | // WriteAt writes a slice of bytes to a buffer starting at the position provided | |
180 | // The number of bytes written will be returned, or error. Can overwrite previous | |
181 | // written slices if the write ats overlap. | |
182 | func (b *WriteAtBuffer) WriteAt(p []byte, pos int64) (n int, err error) { | |
183 | pLen := len(p) | |
184 | expLen := pos + int64(pLen) | |
185 | b.m.Lock() | |
186 | defer b.m.Unlock() | |
187 | if int64(len(b.buf)) < expLen { | |
188 | if int64(cap(b.buf)) < expLen { | |
189 | if b.GrowthCoeff < 1 { | |
190 | b.GrowthCoeff = 1 | |
191 | } | |
192 | newBuf := make([]byte, expLen, int64(b.GrowthCoeff*float64(expLen))) | |
193 | copy(newBuf, b.buf) | |
194 | b.buf = newBuf | |
195 | } | |
196 | b.buf = b.buf[:expLen] | |
197 | } | |
198 | copy(b.buf[pos:], p) | |
199 | return pLen, nil | |
200 | } | |
201 | ||
202 | // Bytes returns a slice of bytes written to the buffer. | |
203 | func (b *WriteAtBuffer) Bytes() []byte { | |
204 | b.m.Lock() | |
205 | defer b.m.Unlock() | |
206 | return b.buf | |
207 | } |