mirror of
https://github.com/kevinveenbirkenbach/baserow-ifto.git
synced 2024-11-25 20:31:04 +01:00
Refactored code
This commit is contained in:
parent
28eb9677e8
commit
a7bf8f3708
@ -1,41 +1,45 @@
|
|||||||
import requests
|
import requests
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
def get_all_rows_from_table(base_url, api_key, table_id):
|
def create_headers(api_key):
|
||||||
headers = {
|
"""Create headers for API requests."""
|
||||||
|
return {
|
||||||
"Authorization": f"Token {api_key}",
|
"Authorization": f"Token {api_key}",
|
||||||
"Content-Type": "application/json"
|
"Content-Type": "application/json"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def handle_api_response(response):
|
||||||
|
"""Handle API response, check for errors and decode JSON."""
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Error: Received status code {response.status_code} from Baserow API.")
|
||||||
|
print("Response content:", response.content.decode())
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
return response.json()
|
||||||
|
except requests.RequestsJSONDecodeError:
|
||||||
|
print("Error: Failed to decode the response as JSON.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_all_rows_from_table(base_url, api_key, table_id):
|
||||||
|
headers = create_headers(api_key)
|
||||||
rows = []
|
rows = []
|
||||||
next_url = f"{base_url}database/rows/table/{table_id}/"
|
next_url = f"{base_url}database/rows/table/{table_id}/"
|
||||||
|
|
||||||
while next_url:
|
while next_url:
|
||||||
response = requests.get(next_url, headers=headers)
|
response = requests.get(next_url, headers=headers)
|
||||||
data = response.json()
|
data = handle_api_response(response)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
rows.extend(data['results'])
|
rows.extend(data['results'])
|
||||||
next_url = data['next']
|
next_url = data['next']
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
def get_all_tables_from_database(base_url, api_key, database_id):
|
def get_all_tables_from_database(base_url, api_key, database_id):
|
||||||
headers = {
|
headers = create_headers(api_key)
|
||||||
"Authorization": f"Token {api_key}",
|
|
||||||
"Content-Type": "application/json"
|
|
||||||
}
|
|
||||||
response = requests.get(f"{base_url}database/tables/database/{database_id}/", headers=headers)
|
response = requests.get(f"{base_url}database/tables/database/{database_id}/", headers=headers)
|
||||||
|
return handle_api_response(response) or []
|
||||||
# Check if the response status code indicates success
|
|
||||||
if response.status_code != 200:
|
|
||||||
print(f"Error: Received status code {response.status_code} from Baserow API.")
|
|
||||||
print("Response content:", response.content.decode())
|
|
||||||
return []
|
|
||||||
|
|
||||||
try:
|
|
||||||
tables = response.json()
|
|
||||||
return tables
|
|
||||||
except requests.RequestsJSONDecodeError:
|
|
||||||
print("Error: Failed to decode the response as JSON.")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def get_all_data_from_database(base_url, api_key, database_id):
|
def get_all_data_from_database(base_url, api_key, database_id):
|
||||||
tables = get_all_tables_from_database(base_url, api_key, database_id)
|
tables = get_all_tables_from_database(base_url, api_key, database_id)
|
||||||
|
Loading…
Reference in New Issue
Block a user