]>
Commit | Line | Data |
---|---|---|
1 | // Copyright 2016 Google LLC | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | ||
15 | package storage | |
16 | ||
17 | import ( | |
18 | "context" | |
19 | "errors" | |
20 | "fmt" | |
21 | ||
22 | "cloud.google.com/go/internal/trace" | |
23 | raw "google.golang.org/api/storage/v1" | |
24 | ) | |
25 | ||
26 | // CopierFrom creates a Copier that can copy src to dst. | |
27 | // You can immediately call Run on the returned Copier, or | |
28 | // you can configure it first. | |
29 | // | |
30 | // For Requester Pays buckets, the user project of dst is billed, unless it is empty, | |
31 | // in which case the user project of src is billed. | |
32 | func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier { | |
33 | return &Copier{dst: dst, src: src} | |
34 | } | |
35 | ||
36 | // A Copier copies a source object to a destination. | |
37 | type Copier struct { | |
38 | // ObjectAttrs are optional attributes to set on the destination object. | |
39 | // Any attributes must be initialized before any calls on the Copier. Nil | |
40 | // or zero-valued attributes are ignored. | |
41 | ObjectAttrs | |
42 | ||
43 | // RewriteToken can be set before calling Run to resume a copy | |
44 | // operation. After Run returns a non-nil error, RewriteToken will | |
45 | // have been updated to contain the value needed to resume the copy. | |
46 | RewriteToken string | |
47 | ||
48 | // ProgressFunc can be used to monitor the progress of a multi-RPC copy | |
49 | // operation. If ProgressFunc is not nil and copying requires multiple | |
50 | // calls to the underlying service (see | |
51 | // https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then | |
52 | // ProgressFunc will be invoked after each call with the number of bytes of | |
53 | // content copied so far and the total size in bytes of the source object. | |
54 | // | |
55 | // ProgressFunc is intended to make upload progress available to the | |
56 | // application. For example, the implementation of ProgressFunc may update | |
57 | // a progress bar in the application's UI, or log the result of | |
58 | // float64(copiedBytes)/float64(totalBytes). | |
59 | // | |
60 | // ProgressFunc should return quickly without blocking. | |
61 | ProgressFunc func(copiedBytes, totalBytes uint64) | |
62 | ||
63 | // The Cloud KMS key, in the form projects/P/locations/L/keyRings/R/cryptoKeys/K, | |
64 | // that will be used to encrypt the object. Overrides the object's KMSKeyName, if | |
65 | // any. | |
66 | // | |
67 | // Providing both a DestinationKMSKeyName and a customer-supplied encryption key | |
68 | // (via ObjectHandle.Key) on the destination object will result in an error when | |
69 | // Run is called. | |
70 | DestinationKMSKeyName string | |
71 | ||
72 | dst, src *ObjectHandle | |
73 | } | |
74 | ||
75 | // Run performs the copy. | |
76 | func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { | |
77 | ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Copier.Run") | |
78 | defer func() { trace.EndSpan(ctx, err) }() | |
79 | ||
80 | if err := c.src.validate(); err != nil { | |
81 | return nil, err | |
82 | } | |
83 | if err := c.dst.validate(); err != nil { | |
84 | return nil, err | |
85 | } | |
86 | if c.DestinationKMSKeyName != "" && c.dst.encryptionKey != nil { | |
87 | return nil, errors.New("storage: cannot use DestinationKMSKeyName with a customer-supplied encryption key") | |
88 | } | |
89 | // Convert destination attributes to raw form, omitting the bucket. | |
90 | // If the bucket is included but name or content-type aren't, the service | |
91 | // returns a 400 with "Required" as the only message. Omitting the bucket | |
92 | // does not cause any problems. | |
93 | rawObject := c.ObjectAttrs.toRawObject("") | |
94 | for { | |
95 | res, err := c.callRewrite(ctx, rawObject) | |
96 | if err != nil { | |
97 | return nil, err | |
98 | } | |
99 | if c.ProgressFunc != nil { | |
100 | c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize)) | |
101 | } | |
102 | if res.Done { // Finished successfully. | |
103 | return newObject(res.Resource), nil | |
104 | } | |
105 | } | |
106 | } | |
107 | ||
108 | func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) { | |
109 | call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj) | |
110 | ||
111 | call.Context(ctx).Projection("full") | |
112 | if c.RewriteToken != "" { | |
113 | call.RewriteToken(c.RewriteToken) | |
114 | } | |
115 | if c.DestinationKMSKeyName != "" { | |
116 | call.DestinationKmsKeyName(c.DestinationKMSKeyName) | |
117 | } | |
118 | if c.PredefinedACL != "" { | |
119 | call.DestinationPredefinedAcl(c.PredefinedACL) | |
120 | } | |
121 | if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil { | |
122 | return nil, err | |
123 | } | |
124 | if c.dst.userProject != "" { | |
125 | call.UserProject(c.dst.userProject) | |
126 | } else if c.src.userProject != "" { | |
127 | call.UserProject(c.src.userProject) | |
128 | } | |
129 | if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil { | |
130 | return nil, err | |
131 | } | |
132 | if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { | |
133 | return nil, err | |
134 | } | |
135 | if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil { | |
136 | return nil, err | |
137 | } | |
138 | var res *raw.RewriteResponse | |
139 | var err error | |
140 | setClientHeader(call.Header()) | |
141 | err = runWithRetry(ctx, func() error { res, err = call.Do(); return err }) | |
142 | if err != nil { | |
143 | return nil, err | |
144 | } | |
145 | c.RewriteToken = res.RewriteToken | |
146 | return res, nil | |
147 | } | |
148 | ||
149 | // ComposerFrom creates a Composer that can compose srcs into dst. | |
150 | // You can immediately call Run on the returned Composer, or you can | |
151 | // configure it first. | |
152 | // | |
153 | // The encryption key for the destination object will be used to decrypt all | |
154 | // source objects and encrypt the destination object. It is an error | |
155 | // to specify an encryption key for any of the source objects. | |
156 | func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer { | |
157 | return &Composer{dst: dst, srcs: srcs} | |
158 | } | |
159 | ||
160 | // A Composer composes source objects into a destination object. | |
161 | // | |
162 | // For Requester Pays buckets, the user project of dst is billed. | |
163 | type Composer struct { | |
164 | // ObjectAttrs are optional attributes to set on the destination object. | |
165 | // Any attributes must be initialized before any calls on the Composer. Nil | |
166 | // or zero-valued attributes are ignored. | |
167 | ObjectAttrs | |
168 | ||
169 | dst *ObjectHandle | |
170 | srcs []*ObjectHandle | |
171 | } | |
172 | ||
173 | // Run performs the compose operation. | |
174 | func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { | |
175 | ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Composer.Run") | |
176 | defer func() { trace.EndSpan(ctx, err) }() | |
177 | ||
178 | if err := c.dst.validate(); err != nil { | |
179 | return nil, err | |
180 | } | |
181 | if len(c.srcs) == 0 { | |
182 | return nil, errors.New("storage: at least one source object must be specified") | |
183 | } | |
184 | ||
185 | req := &raw.ComposeRequest{} | |
186 | // Compose requires a non-empty Destination, so we always set it, | |
187 | // even if the caller-provided ObjectAttrs is the zero value. | |
188 | req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket) | |
189 | for _, src := range c.srcs { | |
190 | if err := src.validate(); err != nil { | |
191 | return nil, err | |
192 | } | |
193 | if src.bucket != c.dst.bucket { | |
194 | return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket) | |
195 | } | |
196 | if src.encryptionKey != nil { | |
197 | return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object) | |
198 | } | |
199 | srcObj := &raw.ComposeRequestSourceObjects{ | |
200 | Name: src.object, | |
201 | } | |
202 | if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil { | |
203 | return nil, err | |
204 | } | |
205 | req.SourceObjects = append(req.SourceObjects, srcObj) | |
206 | } | |
207 | ||
208 | call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx) | |
209 | if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil { | |
210 | return nil, err | |
211 | } | |
212 | if c.dst.userProject != "" { | |
213 | call.UserProject(c.dst.userProject) | |
214 | } | |
215 | if c.PredefinedACL != "" { | |
216 | call.DestinationPredefinedAcl(c.PredefinedACL) | |
217 | } | |
218 | if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { | |
219 | return nil, err | |
220 | } | |
221 | var obj *raw.Object | |
222 | setClientHeader(call.Header()) | |
223 | err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) | |
224 | if err != nil { | |
225 | return nil, err | |
226 | } | |
227 | return newObject(obj), nil | |
228 | } |