]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blob - tap_google_sheets/sync.py
Skip normal looking times
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
1 import time
2 import math
3 import json
4 import re
5 import urllib.parse
6 from datetime import datetime, timedelta
7 import pytz
8 import singer
9 from singer import metrics, metadata, Transformer, utils
10 from singer.utils import strptime_to_utc, strftime
11 from singer.messages import RecordMessage
12 from tap_google_sheets.streams import STREAMS
13 from tap_google_sheets.schema import get_sheet_metadata
14
15 LOGGER = singer.get_logger()
16
17
18 def write_schema(catalog, stream_name):
19 stream = catalog.get_stream(stream_name)
20 schema = stream.schema.to_dict()
21 try:
22 singer.write_schema(stream_name, schema, stream.key_properties)
23 LOGGER.info('Writing schema for: {}'.format(stream_name))
24 except OSError as err:
25 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
26 raise err
27
28
29 def write_record(stream_name, record, time_extracted, version=None):
30 try:
31 if version:
32 singer.messages.write_message(
33 RecordMessage(
34 stream=stream_name,
35 record=record,
36 version=version,
37 time_extracted=time_extracted))
38 else:
39 singer.messages.write_record(
40 stream_name=stream_name,
41 record=record,
42 time_extracted=time_extracted)
43 except OSError as err:
44 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
45 LOGGER.info('record: {}'.format(record))
46 raise err
47
48
49 def get_bookmark(state, stream, default):
50 if (state is None) or ('bookmarks' not in state):
51 return default
52 return (
53 state
54 .get('bookmarks', {})
55 .get(stream, default)
56 )
57
58
59 def write_bookmark(state, stream, value):
60 if 'bookmarks' not in state:
61 state['bookmarks'] = {}
62 state['bookmarks'][stream] = value
63 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
64 singer.write_state(state)
65
66
67 def drop_date_on_time(schema, record):
68 for field, field_schema in schema['properties'].items():
69 if field_schema.get('format') == 'time' and 'days,' in record[field]:
70 # `time` fields can come back from Google like `X days, H:M:S`
71 old_time = record[field]
72 new_time = old_time.split(',')[1].strip()
73 record[field] = new_time
74 return record
75
76 # Transform/validate batch of records w/ schema and sent to target
77 def process_records(catalog,
78 stream_name,
79 records,
80 time_extracted,
81 version=None):
82 stream = catalog.get_stream(stream_name)
83 schema = stream.schema.to_dict()
84 stream_metadata = metadata.to_map(stream.metadata)
85 with metrics.record_counter(stream_name) as counter:
86 for record in records:
87 # Transform record for Singer.io
88 with Transformer() as transformer:
89 try:
90 edited_record = drop_date_on_time(schema, record)
91 transformed_record = transformer.transform(
92 edited_record,
93 schema,
94 stream_metadata)
95 except Exception as err:
96 LOGGER.error('{}'.format(err))
97 raise RuntimeError(err)
98 write_record(
99 stream_name=stream_name,
100 record=transformed_record,
101 time_extracted=time_extracted,
102 version=version)
103 counter.increment()
104 return counter.value
105
106
107 def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
108 # Should sheets_loaded be synced?
109 if stream_name in selected_streams:
110 LOGGER.info('STARTED Syncing {}'.format(stream_name))
111 update_currently_syncing(state, stream_name)
112 selected_fields = get_selected_fields(catalog, stream_name)
113 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
114 write_schema(catalog, stream_name)
115 if not time_extracted:
116 time_extracted = utils.now()
117 record_count = process_records(
118 catalog=catalog,
119 stream_name=stream_name,
120 records=records,
121 time_extracted=time_extracted)
122 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
123 update_currently_syncing(state, None)
124
125
126 # Currently syncing sets the stream currently being delivered in the state.
127 # If the integration is interrupted, this state property is used to identify
128 # the starting point to continue from.
129 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
130 def update_currently_syncing(state, stream_name):
131 if (stream_name is None) and ('currently_syncing' in state):
132 del state['currently_syncing']
133 else:
134 singer.set_currently_syncing(state, stream_name)
135 singer.write_state(state)
136
137
138 # List selected fields from stream catalog
139 def get_selected_fields(catalog, stream_name):
140 stream = catalog.get_stream(stream_name)
141 mdata = metadata.to_map(stream.metadata)
142 mdata_list = singer.metadata.to_list(mdata)
143 selected_fields = []
144 for entry in mdata_list:
145 field = None
146 try:
147 field = entry['breadcrumb'][1]
148 if entry.get('metadata', {}).get('selected', False):
149 selected_fields.append(field)
150 except IndexError:
151 pass
152 return selected_fields
153
154
155 def get_data(stream_name,
156 endpoint_config,
157 client,
158 spreadsheet_id,
159 range_rows=None):
160 if not range_rows:
161 range_rows = ''
162 # Replace {placeholder} variables in path
163 # Encode stream_name: fixes issue w/ special characters in sheet name
164 stream_name_escaped = re.escape(stream_name)
165 stream_name_encoded = urllib.parse.quote_plus(stream_name)
166 path = endpoint_config.get('path', stream_name).replace(
167 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
168 '{range_rows}', range_rows)
169 params = endpoint_config.get('params', {})
170 api = endpoint_config.get('api', 'sheets')
171 # Add in querystring parameters and replace {placeholder} variables
172 # querystring function ensures parameters are added but not encoded causing API errors
173 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
174 '{sheet_title}', stream_name_encoded)
175 LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
176 data = {}
177 time_extracted = utils.now()
178 data = client.get(
179 path=path,
180 api=api,
181 params=querystring,
182 endpoint=stream_name_escaped)
183 return data, time_extracted
184
185
186 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
187 def transform_file_metadata(file_metadata):
188 # Convert to dict
189 file_metadata_tf = json.loads(json.dumps(file_metadata))
190 # Remove keys
191 if file_metadata_tf.get('lastModifyingUser'):
192 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
193 file_metadata_tf['lastModifyingUser'].pop('me', None)
194 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
195 # Add record to an array of 1
196 file_metadata_arr = []
197 file_metadata_arr.append(file_metadata_tf)
198 return file_metadata_arr
199
200
201 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
202 def transform_spreadsheet_metadata(spreadsheet_metadata):
203 # Convert to dict
204 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
205 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
206 if spreadsheet_metadata_tf.get('properties'):
207 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
208 spreadsheet_metadata_tf.pop('sheets', None)
209 # Add record to an array of 1
210 spreadsheet_metadata_arr = []
211 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
212 return spreadsheet_metadata_arr
213
214
215 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
216 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
217 # Convert to properties to dict
218 sheet_metadata = sheet.get('properties')
219 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
220 sheet_id = sheet_metadata_tf.get('sheetId')
221 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
222 spreadsheet_id, sheet_id)
223 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
224 sheet_metadata_tf['sheetUrl'] = sheet_url
225 sheet_metadata_tf['columns'] = columns
226 return sheet_metadata_tf
227
228
229 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
230 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
231 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
232 if not timezone_str:
233 timezone_str = 'UTC'
234 tzn = pytz.timezone(timezone_str)
235 sec_per_day = 86400
236 excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
237 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
238 epoch_dttm = datetime(1970, 1, 1)
239 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
240 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
241 utc_dttm_str = strftime(utc_dttm)
242 return utc_dttm_str
243
244
245 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
246 # Convert from array of values to JSON with column names as keys
247 def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
248 sheet_data_tf = []
249 row_num = from_row
250 # Create sorted list of columns based on columnIndex
251 cols = sorted(columns, key=lambda i: i['columnIndex'])
252
253 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
254 for row in sheet_data_rows:
255 # If empty row, SKIP
256 if row == []:
257 LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
258 else:
259 sheet_data_row_tf = {}
260 # Add spreadsheet_id, sheet_id, and row
261 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
262 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
263 sheet_data_row_tf['__sdc_row'] = row_num
264 col_num = 1
265 for value in row:
266 # Select column metadata based on column index
267 col = cols[col_num - 1]
268 col_skipped = col.get('columnSkipped')
269 if not col_skipped:
270 # Get column metadata
271 col_name = col.get('columnName')
272 col_type = col.get('columnType')
273 col_letter = col.get('columnLetter')
274
275 # NULL values
276 if value is None or value == '':
277 col_val = None
278
279 # Convert dates/times from Lotus Notes Serial Numbers
280 # DATE-TIME
281 elif col_type == 'numberType.DATE_TIME':
282 if isinstance(value, (int, float)):
283 col_val = excel_to_dttm_str(value)
284 else:
285 col_val = str(value)
286 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
287 sheet_title, col_name, col_letter, row_num, col_type, value))
288 # DATE
289 elif col_type == 'numberType.DATE':
290 if isinstance(value, (int, float)):
291 col_val = excel_to_dttm_str(value)[:10]
292 else:
293 col_val = str(value)
294 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
295 sheet_title, col_name, col_letter, row_num, col_type, value))
296 # TIME ONLY (NO DATE)
297 elif col_type == 'numberType.TIME':
298 if isinstance(value, (int, float)):
299 try:
300 total_secs = value * 86400 # seconds in day
301 # Create string formatted like HH:MM:SS
302 col_val = str(timedelta(seconds=total_secs))
303 except ValueError:
304 col_val = str(value)
305 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
306 sheet_title, col_name, col_letter, row_num, col_type, value))
307 else:
308 col_val = str(value)
309 # NUMBER (INTEGER AND FLOAT)
310 elif col_type == 'numberType':
311 if isinstance(value, int):
312 col_val = int(value)
313 elif isinstance(value, float):
314 # Determine float decimal digits
315 decimal_digits = str(value)[::-1].find('.')
316 if decimal_digits > 15:
317 try:
318 # ROUND to multipleOf: 1e-15
319 col_val = float(round(value, 15))
320 except ValueError:
321 col_val = str(value)
322 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
323 sheet_title, col_name, col_letter, row_num, col_type, value))
324 else: # decimal_digits <= 15, no rounding
325 try:
326 col_val = float(value)
327 except ValueError:
328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row_num, col_type, value))
331 else:
332 col_val = str(value)
333 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
334 sheet_title, col_name, col_letter, row_num, col_type, value))
335 # STRING
336 elif col_type == 'stringValue':
337 col_val = str(value)
338 # BOOLEAN
339 elif col_type == 'boolValue':
340 if isinstance(value, bool):
341 col_val = value
342 elif isinstance(value, str):
343 if value.lower() in ('true', 't', 'yes', 'y'):
344 col_val = True
345 elif value.lower() in ('false', 'f', 'no', 'n'):
346 col_val = False
347 else:
348 col_val = str(value)
349 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
350 sheet_title, col_name, col_letter, row, col_type, value))
351 elif isinstance(value, int):
352 if value in (1, -1):
353 col_val = True
354 elif value == 0:
355 col_val = False
356 else:
357 col_val = str(value)
358 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
359 sheet_title, col_name, col_letter, row, col_type, value))
360 # OTHER: Convert everything else to a string
361 else:
362 col_val = str(value)
363 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
364 sheet_title, col_name, col_letter, row, col_type, value))
365 sheet_data_row_tf[col_name] = col_val
366 col_num = col_num + 1
367 # APPEND non-empty row
368 sheet_data_tf.append(sheet_data_row_tf)
369 row_num = row_num + 1
370 return sheet_data_tf, row_num
371
372
373 def sync(client, config, catalog, state):
374 start_date = config.get('start_date')
375 spreadsheet_id = config.get('spreadsheet_id')
376
377 # Get selected_streams from catalog, based on state last_stream
378 # last_stream = Previous currently synced stream, if the load was interrupted
379 last_stream = singer.get_currently_syncing(state)
380 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
381 selected_streams = []
382 for stream in catalog.get_selected_streams(state):
383 selected_streams.append(stream.stream)
384 LOGGER.info('selected_streams: {}'.format(selected_streams))
385
386 if not selected_streams:
387 return
388
389 # FILE_METADATA
390 file_metadata = {}
391 stream_name = 'file_metadata'
392 file_metadata_config = STREAMS.get(stream_name)
393
394 # GET file_metadata
395 LOGGER.info('GET file_meatadata')
396 file_metadata, time_extracted = get_data(stream_name=stream_name,
397 endpoint_config=file_metadata_config,
398 client=client,
399 spreadsheet_id=spreadsheet_id)
400 # Transform file_metadata
401 LOGGER.info('Transform file_meatadata')
402 file_metadata_tf = transform_file_metadata(file_metadata)
403 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
404
405 # Check if file has changed, if not break (return to __init__)
406 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
407 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
408 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
409 if this_datetime <= last_datetime:
410 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
411 # Update file_metadata bookmark
412 write_bookmark(state, 'file_metadata', strftime(this_datetime))
413 return
414 # Sync file_metadata if selected
415 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
416 # file_metadata bookmark is updated at the end of sync
417
418 # SPREADSHEET_METADATA
419 spreadsheet_metadata = {}
420 stream_name = 'spreadsheet_metadata'
421 spreadsheet_metadata_config = STREAMS.get(stream_name)
422
423 # GET spreadsheet_metadata
424 LOGGER.info('GET spreadsheet_meatadata')
425 spreadsheet_metadata, ss_time_extracted = get_data(
426 stream_name=stream_name,
427 endpoint_config=spreadsheet_metadata_config,
428 client=client,
429 spreadsheet_id=spreadsheet_id)
430
431 # Transform spreadsheet_metadata
432 LOGGER.info('Transform spreadsheet_meatadata')
433 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
434
435 # Sync spreadsheet_metadata if selected
436 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
437 ss_time_extracted)
438
439 # SHEET_METADATA and SHEET_DATA
440 sheets = spreadsheet_metadata.get('sheets')
441 sheet_metadata = []
442 sheets_loaded = []
443 sheets_loaded_config = STREAMS['sheets_loaded']
444 if sheets:
445 # Loop thru sheets (worksheet tabs) in spreadsheet
446 for sheet in sheets:
447 sheet_title = sheet.get('properties', {}).get('title')
448 sheet_id = sheet.get('properties', {}).get('sheetId')
449
450 # GET sheet_metadata and columns
451 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
452 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
453
454 # SKIP empty sheets (where sheet_schema and columns are None)
455 if not sheet_schema or not columns:
456 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
457 else:
458 # Transform sheet_metadata
459 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
460 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
461 sheet_metadata.append(sheet_metadata_tf)
462
463 # SHEET_DATA
464 # Should this worksheet tab be synced?
465 if sheet_title in selected_streams:
466 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
467 update_currently_syncing(state, sheet_title)
468 selected_fields = get_selected_fields(catalog, sheet_title)
469 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
470 write_schema(catalog, sheet_title)
471
472 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
473 # everytime after each sheet sync is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 last_integer = int(get_bookmark(state, sheet_title, 0))
477 activate_version = int(time.time() * 1000)
478 activate_version_message = singer.ActivateVersionMessage(
479 stream=sheet_title,
480 version=activate_version)
481 if last_integer == 0:
482 # initial load, send activate_version before AND after data sync
483 singer.write_message(activate_version_message)
484 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
485
486 # Determine max range of columns and rows for "paging" through the data
487 sheet_last_col_index = 1
488 sheet_last_col_letter = 'A'
489 for col in columns:
490 col_index = col.get('columnIndex')
491 col_letter = col.get('columnLetter')
492 if col_index > sheet_last_col_index:
493 sheet_last_col_index = col_index
494 sheet_last_col_letter = col_letter
495 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
496
497 # Initialize paging for 1st batch
498 is_last_row = False
499 batch_rows = 200
500 from_row = 2
501 if sheet_max_row < batch_rows:
502 to_row = sheet_max_row
503 else:
504 to_row = batch_rows
505
506 # Loop thru batches (each having 200 rows of data)
507 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
508 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
509
510 # GET sheet_data for a worksheet tab
511 sheet_data, time_extracted = get_data(
512 stream_name=sheet_title,
513 endpoint_config=sheets_loaded_config,
514 client=client,
515 spreadsheet_id=spreadsheet_id,
516 range_rows=range_rows)
517 # Data is returned as a list of arrays, an array of values for each row
518 sheet_data_rows = sheet_data.get('values', [])
519
520 # Transform batch of rows to JSON with keys for each column
521 sheet_data_tf, row_num = transform_sheet_data(
522 spreadsheet_id=spreadsheet_id,
523 sheet_id=sheet_id,
524 sheet_title=sheet_title,
525 from_row=from_row,
526 columns=columns,
527 sheet_data_rows=sheet_data_rows)
528 if row_num < to_row:
529 is_last_row = True
530
531 # Process records, send batch of records to target
532 record_count = process_records(
533 catalog=catalog,
534 stream_name=sheet_title,
535 records=sheet_data_tf,
536 time_extracted=ss_time_extracted,
537 version=activate_version)
538 LOGGER.info('Sheet: {}, records processed: {}'.format(
539 sheet_title, record_count))
540
541 # Update paging from/to_row for next batch
542 from_row = to_row + 1
543 if to_row + batch_rows > sheet_max_row:
544 to_row = sheet_max_row
545 else:
546 to_row = to_row + batch_rows
547
548 # End of Stream: Send Activate Version and update State
549 singer.write_message(activate_version_message)
550 write_bookmark(state, sheet_title, activate_version)
551 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
552 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
553 sheet_title, row_num - 2)) # subtract 1 for header row
554 update_currently_syncing(state, None)
555
556 # SHEETS_LOADED
557 # Add sheet to sheets_loaded
558 sheet_loaded = {}
559 sheet_loaded['spreadsheetId'] = spreadsheet_id
560 sheet_loaded['sheetId'] = sheet_id
561 sheet_loaded['title'] = sheet_title
562 sheet_loaded['loadDate'] = strftime(utils.now())
563 sheet_loaded['lastRowNumber'] = row_num
564 sheets_loaded.append(sheet_loaded)
565
566 stream_name = 'sheet_metadata'
567 # Sync sheet_metadata if selected
568 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
569
570 stream_name = 'sheets_loaded'
571 # Sync sheet_metadata if selected
572 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
573
574 # Update file_metadata bookmark
575 write_bookmark(state, 'file_metadata', strftime(this_datetime))
576
577 return