11#if ENABLE_S3
2- #if __linux__
32
3+ #include " s3.hh"
44#include " s3-binary-cache-store.hh"
55#include " nar-info.hh"
66#include " nar-info-disk-cache.hh"
1818
1919namespace nix {
2020
21- struct istringstream_nocopy : public std ::stringstream
22- {
23- istringstream_nocopy (const std::string & s)
24- {
25- rdbuf ()->pubsetbuf (
26- (char *) s.data (), s.size ());
27- }
28- };
29-
3021struct S3Error : public Error
3122{
3223 Aws::S3::S3Errors err;
@@ -60,21 +51,81 @@ static void initAWS()
6051 });
6152}
6253
54+ S3Helper::S3Helper ()
55+ : config(makeConfig())
56+ , client(make_ref<Aws::S3::S3Client>(*config))
57+ {
58+ }
59+
60+ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig ()
61+ {
62+ initAWS ();
63+ auto res = make_ref<Aws::Client::ClientConfiguration>();
64+ res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
65+ res->requestTimeoutMs = 600 * 1000 ;
66+ return res;
67+ }
68+
69+ S3Helper::DownloadResult S3Helper::getObject (
70+ const std::string & bucketName, const std::string & key)
71+ {
72+ debug (" fetching ‘s3://%s/%s’..." , bucketName, key);
73+
74+ auto request =
75+ Aws::S3::Model::GetObjectRequest ()
76+ .WithBucket (bucketName)
77+ .WithKey (key);
78+
79+ request.SetResponseStreamFactory ([&]() {
80+ return Aws::New<std::stringstream>(" STRINGSTREAM" );
81+ });
82+
83+ DownloadResult res;
84+
85+ auto now1 = std::chrono::steady_clock::now ();
86+
87+ try {
88+
89+ auto result = checkAws (fmt (" AWS error fetching ‘%s’" , key),
90+ client->GetObject (request));
91+
92+ res.data = std::make_shared<std::string>(
93+ dynamic_cast <std::stringstream &>(result.GetBody ()).str ());
94+
95+ } catch (S3Error & e) {
96+ if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw ;
97+ }
98+
99+ auto now2 = std::chrono::steady_clock::now ();
100+
101+ res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count ();
102+
103+ return res;
104+ }
105+
106+ #if __linux__
107+
108+ struct istringstream_nocopy : public std ::stringstream
109+ {
110+ istringstream_nocopy (const std::string & s)
111+ {
112+ rdbuf ()->pubsetbuf (
113+ (char *) s.data (), s.size ());
114+ }
115+ };
116+
63117struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
64118{
65119 std::string bucketName;
66120
67- ref<Aws::Client::ClientConfiguration> config;
68- ref<Aws::S3::S3Client> client;
69-
70121 Stats stats;
71122
123+ S3Helper s3Helper;
124+
72125 S3BinaryCacheStoreImpl (
73126 const Params & params, const std::string & bucketName)
74127 : S3BinaryCacheStore(params)
75128 , bucketName(bucketName)
76- , config(makeConfig())
77- , client(make_ref<Aws::S3::S3Client>(*config))
78129 {
79130 diskCache = getNarInfoDiskCache ();
80131 }
@@ -84,31 +135,22 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
84135 return " s3://" + bucketName;
85136 }
86137
87- ref<Aws::Client::ClientConfiguration> makeConfig ()
88- {
89- initAWS ();
90- auto res = make_ref<Aws::Client::ClientConfiguration>();
91- res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
92- res->requestTimeoutMs = 600 * 1000 ;
93- return res;
94- }
95-
96138 void init () override
97139 {
98140 if (!diskCache->cacheExists (getUri (), wantMassQuery_, priority)) {
99141
100142 /* Create the bucket if it doesn't already exists. */
101143 // FIXME: HeadBucket would be more appropriate, but doesn't return
102144 // an easily parsed 404 message.
103- auto res = client->GetBucketLocation (
145+ auto res = s3Helper. client ->GetBucketLocation (
104146 Aws::S3::Model::GetBucketLocationRequest ().WithBucket (bucketName));
105147
106148 if (!res.IsSuccess ()) {
107149 if (res.GetError ().GetErrorType () != Aws::S3::S3Errors::NO_SUCH_BUCKET)
108150 throw Error (format (" AWS error checking bucket ‘%s’: %s" ) % bucketName % res.GetError ().GetMessage ());
109151
110152 checkAws (format (" AWS error creating bucket ‘%s’" ) % bucketName,
111- client->CreateBucket (
153+ s3Helper. client ->CreateBucket (
112154 Aws::S3::Model::CreateBucketRequest ()
113155 .WithBucket (bucketName)
114156 .WithCreateBucketConfiguration (
@@ -146,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
146188 {
147189 stats.head ++;
148190
149- auto res = client->HeadObject (
191+ auto res = s3Helper. client ->HeadObject (
150192 Aws::S3::Model::HeadObjectRequest ()
151193 .WithBucket (bucketName)
152194 .WithKey (path));
@@ -179,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
179221 auto now1 = std::chrono::steady_clock::now ();
180222
181223 auto result = checkAws (format (" AWS error uploading ‘%s’" ) % path,
182- client->PutObject (request));
224+ s3Helper. client ->PutObject (request));
183225
184226 auto now2 = std::chrono::steady_clock::now ();
185227
@@ -198,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
198240 sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
199241 debug (format (" fetching ‘s3://%1%/%2%’..." ) % bucketName % path);
200242
201- auto request =
202- Aws::S3::Model::GetObjectRequest ()
203- .WithBucket (bucketName)
204- .WithKey (path);
205-
206- request.SetResponseStreamFactory ([&]() {
207- return Aws::New<std::stringstream>(" STRINGSTREAM" );
208- });
209-
210243 stats.get ++;
211244
212- try {
213-
214- auto now1 = std::chrono::steady_clock::now ();
215-
216- auto result = checkAws (format (" AWS error fetching ‘%s’" ) % path,
217- client->GetObject (request));
218-
219- auto now2 = std::chrono::steady_clock::now ();
245+ auto res = s3Helper.getObject (bucketName, path);
220246
221- auto res = dynamic_cast <std::stringstream &>(result.GetBody ()).str ();
247+ stats.getBytes += res.data ? res.data ->size () : 0 ;
248+ stats.getTimeMs += res.durationMs ;
222249
223- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count ();
250+ if (res.data )
251+ printTalkative (" downloaded ‘s3://%s/%s’ (%d bytes) in %d ms" ,
252+ bucketName, path, res.data ->size (), res.durationMs );
224253
225- printMsg (lvlTalkative, format (" downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms" )
226- % bucketName % path % res.size () % duration);
227-
228- stats.getBytes += res.size ();
229- stats.getTimeMs += duration;
230-
231- return std::make_shared<std::string>(res);
232-
233- } catch (S3Error & e) {
234- if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
235- throw ;
236- }
254+ return res.data ;
237255 });
238256 }
239257
@@ -246,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
246264 debug (format (" listing bucket ‘s3://%s’ from key ‘%s’..." ) % bucketName % marker);
247265
248266 auto res = checkAws (format (" AWS error listing bucket ‘%s’" ) % bucketName,
249- client->ListObjects (
267+ s3Helper. client ->ListObjects (
250268 Aws::S3::Model::ListObjectsRequest ()
251269 .WithBucket (bucketName)
252270 .WithDelimiter (" /" )
@@ -281,7 +299,8 @@ static RegisterStoreImplementation regStore([](
281299 return store;
282300});
283301
302+ #endif
303+
284304}
285305
286306#endif
287- #endif
0 commit comments