Files
my_openplace/scripts/scrape_regions.py
T
2025-10-02 19:27:15 -07:00

382 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Script to scrape region data from wplace.live API and generate a CSV mapping.
This creates a tile-to-region mapping by sampling one pixel per tile.
The region is determined by tile coordinates, not individual pixels.
"""
import csv
import time
import json
import os
from typing import Optional, Dict
import sys
try:
import cloudscraper
print("✓ Using cloudscraper to bypass Cloudflare protection")
except ImportError:
print("⚠️ cloudscraper not found. Install it with: pip install cloudscraper")
print(" This is required to bypass Cloudflare protection on wplace.live")
sys.exit(1)
try:
import socks
print("✓ SOCKS proxy support available")
except ImportError:
print("⚠️ PySocks not found. Install it with: pip install pysocks")
print(" This is required to use SOCKS5 proxies")
sys.exit(1)
# Configuration
BASE_URL = "https://backend.wplace.live/s0/pixel"
OUTPUT_CSV = "tile_region_mapping.csv"
OUTPUT_REGIONS_CSV = "regions.csv"
# Sample density - adjust based on how detailed you want the mapping
# 1 = sample all tiles, 2 = sample every other tile, etc.
TILE_SAMPLE_STEP = 1 # Sample every Nth tile
# Tile range - adjust based on the canvas size
TILE_X_MIN, TILE_X_MAX = 0, 2047
TILE_Y_MIN, TILE_Y_MAX = 0, 2047
# Proxy rotation - helps avoid rate limits and Cloudflare blocks
PROXIES = [
"socks5://spmhmfozio:ze1sg%2BsP3n4apXhDV9@isp.decodo.com:10001",
"socks5://user-spcwoviqpj-sessionduration-1440:aw2igK7QDgscu~41Gl@gate.decodo.com:10001",
"socks5://spk2ihoy6o:ympO0wyr9X32%2BgXRfj@isp.decodo.com:10010"
]
# Track current proxy index for rotation
current_proxy_index = 0
# Rate limit backoff settings
rate_limit_backoff_seconds = 0 # Exponential backoff for rate limits
consecutive_rate_limits = 0 # Track consecutive rate limit errors
# Create a cloudscraper session that can bypass Cloudflare
scraper = cloudscraper.create_scraper(
browser={
'browser': 'chrome',
'platform': 'windows',
'mobile': False
}
)
def get_next_proxy() -> Dict[str, str]:
"""Get the next proxy in rotation."""
global current_proxy_index
proxy_url = PROXIES[current_proxy_index]
current_proxy_index = (current_proxy_index + 1) % len(PROXIES)
return {
'http': proxy_url,
'https': proxy_url
}
def detect_cloudflare_rate_limit(response) -> bool:
"""Detect if response is a Cloudflare 1015 rate limit error."""
# Check for 1015 error code in response
if response.status_code == 429:
return True
# Check for Cloudflare rate limit page (error 1015)
if 'text/html' in response.headers.get('Content-Type', ''):
if b'error 1015' in response.content.lower() or b'rate limited' in response.content.lower():
return True
# Check for specific Cloudflare headers
cf_ray = response.headers.get('CF-RAY', '')
if cf_ray and response.status_code in [403, 429, 503]:
return True
return False
def handle_rate_limit_backoff():
"""Handle exponential backoff when rate limited."""
global rate_limit_backoff_seconds, consecutive_rate_limits
consecutive_rate_limits += 1
# Exponential backoff: 5s, 10s, 20s, 40s, 60s (max)
if consecutive_rate_limits == 1:
rate_limit_backoff_seconds = 5
elif consecutive_rate_limits == 2:
rate_limit_backoff_seconds = 10
elif consecutive_rate_limits == 3:
rate_limit_backoff_seconds = 20
elif consecutive_rate_limits == 4:
rate_limit_backoff_seconds = 40
else:
rate_limit_backoff_seconds = 60
print(f"\n🛑 CLOUDFLARE RATE LIMIT DETECTED (Error 1015)")
print(f" Consecutive rate limits: {consecutive_rate_limits}")
print(f" Backing off for {rate_limit_backoff_seconds} seconds...")
time.sleep(rate_limit_backoff_seconds)
def reset_rate_limit_backoff():
"""Reset backoff when we get successful responses."""
global rate_limit_backoff_seconds, consecutive_rate_limits
consecutive_rate_limits = 0
rate_limit_backoff_seconds = 0
def fetch_tile_region(tile_x: int, tile_y: int) -> Optional[Dict]:
"""Fetch region info for a tile from wplace.live API.
Since region is determined by tile, we just need to check one pixel per tile.
We'll use coordinates (1, 1) as a sample point.
"""
url = f"{BASE_URL}/{tile_x}/{tile_y}?x=1&y=1"
proxies = get_next_proxy()
try:
response = scraper.get(url, proxies=proxies, timeout=15)
# Check for Cloudflare rate limiting (error 1015)
if detect_cloudflare_rate_limit(response):
handle_rate_limit_backoff()
# Retry with same proxy after backoff
response = scraper.get(url, proxies=proxies, timeout=15)
if detect_cloudflare_rate_limit(response):
print(f" Still rate limited after backoff. Skipping tile ({tile_x}, {tile_y})")
return None
# Check if we got HTML (Cloudflare challenge) instead of JSON
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type and not detect_cloudflare_rate_limit(response):
print(f"\n⚠️ Received HTML instead of JSON for tile ({tile_x}, {tile_y})")
print(f" This might be a Cloudflare challenge page. Waiting 5 seconds...")
time.sleep(5)
# Retry once
response = scraper.get(url, proxies=proxies, timeout=15)
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type:
print(f" Still getting HTML. Skipping this tile.")
return None
if response.status_code == 200:
# Success! Reset rate limit backoff
reset_rate_limit_backoff()
try:
data = response.json()
# Extract region if it exists
if 'region' in data:
return data['region']
return None
except json.JSONDecodeError:
print(f"\n⚠️ Failed to parse JSON for tile ({tile_x}, {tile_y})")
return None
elif response.status_code == 404:
# No pixel painted at (1,1), try center of tile
url_center = f"{BASE_URL}/{tile_x}/{tile_y}?x=500&y=500"
response = scraper.get(url_center, proxies=proxies, timeout=15)
# Check for rate limit on retry
if detect_cloudflare_rate_limit(response):
handle_rate_limit_backoff()
return None
if response.status_code == 200:
try:
data = response.json()
if 'region' in data:
reset_rate_limit_backoff()
return data['region']
except json.JSONDecodeError:
pass
return None
elif response.status_code == 403:
print(f"\n⚠️ Got 403 Forbidden for tile ({tile_x}, {tile_y})")
print(f" You may be rate limited. Consider increasing the delay.")
return None
elif response.status_code == 429:
print(f"\n⚠️ Got 429 Too Many Requests for tile ({tile_x}, {tile_y})")
handle_rate_limit_backoff()
return None
else:
print(f"\n⚠️ Got status {response.status_code} for tile ({tile_x}, {tile_y})")
return None
except Exception as e:
print(f"\n⚠️ Error fetching tile ({tile_x}, {tile_y}): {e}")
return None
def load_already_scraped_tiles() -> set:
"""Load tiles that have already been scraped from the CSV file."""
scraped_tiles = set()
if os.path.exists(OUTPUT_CSV):
print(f"📂 Found existing {OUTPUT_CSV}, loading already scraped tiles...")
try:
with open(OUTPUT_CSV, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
tile_x = int(row['tile_x'])
tile_y = int(row['tile_y'])
scraped_tiles.add((tile_x, tile_y))
print(f" ✓ Loaded {len(scraped_tiles)} already scraped tiles")
except Exception as e:
print(f" ⚠️ Error reading existing CSV: {e}")
print(f" Starting fresh...")
scraped_tiles.clear()
return scraped_tiles
def load_unique_regions() -> dict:
"""Load unique regions from the regions CSV file."""
unique_regions = {}
if os.path.exists(OUTPUT_REGIONS_CSV):
print(f"📂 Found existing {OUTPUT_REGIONS_CSV}, loading unique regions...")
try:
with open(OUTPUT_REGIONS_CSV, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
region_key = (int(row['id']), int(row['city_id']))
unique_regions[region_key] = row
print(f" ✓ Loaded {len(unique_regions)} unique regions")
except Exception as e:
print(f" ⚠️ Error reading regions CSV: {e}")
return unique_regions
def append_tile_to_csv(tile_data: dict):
"""Append a single tile to the CSV file."""
file_exists = os.path.exists(OUTPUT_CSV)
with open(OUTPUT_CSV, 'a', newline='', encoding='utf-8') as f:
fieldnames = ['tile_x', 'tile_y', 'region_id', 'city_id',
'region_name', 'region_number', 'country_id', 'flag_id']
writer = csv.DictWriter(f, fieldnames=fieldnames)
# Write header if file is new
if not file_exists:
writer.writeheader()
writer.writerow(tile_data)
def update_regions_csv(unique_regions: dict):
"""Update the regions CSV file with all unique regions."""
with open(OUTPUT_REGIONS_CSV, 'w', newline='', encoding='utf-8') as f:
fieldnames = ['id', 'city_id', 'name', 'number', 'country_id', 'flag_id']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(unique_regions.values())
def main():
print("Starting wplace.live region data scraping...")
print(f"Using {len(PROXIES)} rotating proxies for requests")
print(f"Tile range: X({TILE_X_MIN}-{TILE_X_MAX}), Y({TILE_Y_MIN}-{TILE_Y_MAX})")
print(f"Sampling every {TILE_SAMPLE_STEP} tile(s)")
tiles_x = (TILE_X_MAX - TILE_X_MIN + 1) // TILE_SAMPLE_STEP
tiles_y = (TILE_Y_MAX - TILE_Y_MIN + 1) // TILE_SAMPLE_STEP
total_to_sample = tiles_x * tiles_y
print(f"Total tiles to sample: {total_to_sample}")
print()
# Load already scraped tiles for resume support
already_scraped = load_already_scraped_tiles()
# Load unique regions
unique_regions = load_unique_regions()
total_tiles = 0
successful_tiles = 0
skipped_tiles = len(already_scraped)
last_success_time = time.time()
# Sample across tiles (one sample per tile is enough since region is tile-based)
for tile_x in range(TILE_X_MIN, TILE_X_MAX + 1, TILE_SAMPLE_STEP):
for tile_y in range(TILE_Y_MIN, TILE_Y_MAX + 1, TILE_SAMPLE_STEP):
total_tiles += 1
# Skip if already scraped
if (tile_x, tile_y) in already_scraped:
continue
print(f"Sampling tile ({tile_x}, {tile_y})... [{successful_tiles} new, {skipped_tiles} skipped/cached]", end="\r")
region = fetch_tile_region(tile_x, tile_y)
if region:
# Prepare tile data
tile_data = {
'tile_x': tile_x,
'tile_y': tile_y,
'region_id': region.get('id'),
'city_id': region.get('cityId'),
'region_name': region.get('name'),
'region_number': region.get('number'),
'country_id': region.get('countryId'),
'flag_id': region.get('flagId')
}
# Immediately write to CSV (for resume support)
append_tile_to_csv(tile_data)
# Store unique region
region_key = (region.get('id'), region.get('cityId'))
if region_key not in unique_regions:
unique_regions[region_key] = {
'id': region.get('id'),
'city_id': region.get('cityId'),
'name': region.get('name'),
'number': region.get('number'),
'country_id': region.get('countryId'),
'flag_id': region.get('flagId')
}
# Update regions CSV periodically
update_regions_csv(unique_regions)
successful_tiles += 1
last_success_time = time.time()
else:
skipped_tiles += 1
# If we've had too many failures in a row, warn the user
if time.time() - last_success_time > 60:
print(f"\n⚠️ No successful requests in the last 60 seconds.")
print(f" You may be blocked by Cloudflare. Consider:")
print(f" 1. Increasing TILE_SAMPLE_STEP to reduce request rate")
print(f" 2. Taking a break and trying again later")
print(f" 3. Using a VPN or different IP address")
print(f" Progress is saved! You can resume by running this script again.")
user_input = input("Continue? (y/n): ")
if user_input.lower() != 'y':
break
last_success_time = time.time()
# Rate limiting - be nice to their server
# Add extra delay if we've been rate limited recently
base_delay = 0.1
if consecutive_rate_limits > 0:
# Slow down if we've been rate limited
base_delay = 0.5
time.sleep(base_delay)
else:
continue
break
print()
print(f"\n✅ Scanning complete!")
print(f"Total tiles processed: {total_tiles}")
print(f"New tiles scraped: {successful_tiles}")
print(f"Skipped/cached tiles: {skipped_tiles}")
print(f"Unique regions found: {len(unique_regions)}")
# Final update of regions CSV
if unique_regions:
update_regions_csv(unique_regions)
print(f"✓ Final regions list saved to {OUTPUT_REGIONS_CSV}")
print(f"✓ Tile-to-region mappings in {OUTPUT_CSV}")
print("\n💡 Tip: You can resume this script anytime - progress is automatically saved!")
print(" Done! You can now import this data into your database.")
if __name__ == "__main__":
main()