from google.oauth2 import service_account from googleapiclient.discovery import build from datetime import datetime, timedelta import os import json import base64 import tempfile # Google Calendar API setup SCOPES = ['https://www.googleapis.com/auth/calendar'] SERVICE_ACCOUNT_FILE = 'credentials.json' # Global calendar service calendar_service = None def get_credentials(): """ Get Google Calendar credentials from various sources with robust fallback: 1. credentials.json file 2. GOOGLE_CREDENTIALS_BASE64 3. GOOGLE_CREDENTIALS_JSON 4. GOOGLE_APPLICATION_CREDENTIALS (Path, JSON, or Base64) """ # helper to load from dict def load_from_dict(info): try: return service_account.Credentials.from_service_account_info( info, scopes=SCOPES ) except Exception as e: print(f"āŒ Error creating credentials from dict: {e}") return None # 1. Try local file if os.path.exists(SERVICE_ACCOUNT_FILE): print(f"Loading credentials from {SERVICE_ACCOUNT_FILE}") return service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=SCOPES ) # 2. Collect potential environment variables sources = [ ('GOOGLE_CREDENTIALS_BASE64', os.getenv('GOOGLE_CREDENTIALS_BASE64')), ('GOOGLE_CREDENTIALS_JSON', os.getenv('GOOGLE_CREDENTIALS_JSON')), ('GOOGLE_APPLICATION_CREDENTIALS', os.getenv('GOOGLE_APPLICATION_CREDENTIALS')), ] for var_name, content in sources: if not content: continue print(f"Checking {var_name}...") # A. Check if it's a raw JSON string if content.strip().startswith('{'): print(" Type: JSON String") try: creds_dict = json.loads(content) return load_from_dict(creds_dict) except Exception as e: print(f" āŒ Invalid JSON in {var_name}: {e}") # B. Check if it's a file path elif os.path.exists(content): print(f" Type: File Path -> {content}") try: return service_account.Credentials.from_service_account_file( content, scopes=SCOPES ) except Exception as e: print(f" Error loading file from {var_name}: {e}") # C. Try Base64 decode else: print(" Type: Potential Base64") try: # Validate if it looks like base64 (no spaces, length multiple of 4 roughly) # Just try to decode decoded = base64.b64decode(content).decode('utf-8') if decoded.strip().startswith('{'): print(" āœ… Successfully decoded Base64 to JSON") creds_dict = json.loads(decoded) return load_from_dict(creds_dict) else: print(" Decoded content is not JSON (skipping)") except Exception as e: print(f" Failed to process as Base64/Path/JSON: {e}") print("No valid credentials found in files or environment variables!") return None def initialize_calendar(): """Initialize Google Calendar service with credentials""" global calendar_service try: credentials = get_credentials() if credentials is None: return None # Build the calendar service calendar_service = build('calendar', 'v3', credentials=credentials) print("Google Calendar service initialized successfully!") return calendar_service except Exception as e: print(f"Error initializing calendar service: {str(e)}") import traceback traceback.print_exc() return None def parse_datetime(date_str, time_str): """ Parse date and time strings into datetime object Handles multiple formats for user convenience Args: date_str: Date in various formats time_str: Time in various formats Returns: datetime object or None if parsing fails """ try: # Clean up input strings date_str = date_str.strip() time_str = time_str.strip() print(f"šŸ• Parsing date: '{date_str}' and time: '{time_str}'") # Common date formats to try date_formats = [ "%Y-%m-%d", # 2024-12-25 "%d-%m-%Y", # 25-12-2024 "%m/%d/%Y", # 12/25/2024 "%d/%m/%Y", # 25/12/2024 "%B %d, %Y", # December 25, 2024 "%B %d %Y", # December 25 2024 "%d %B %Y", # 25 December 2024 "%b %d, %Y", # Dec 25, 2024 "%b %d %Y", # Dec 25 2024 "%Y/%m/%d", # 2024/12/25 ] parsed_date = None for fmt in date_formats: try: parsed_date = datetime.strptime(date_str, fmt) print(f"āœ… Date parsed with format: {fmt}") break except ValueError: continue if not parsed_date: print(f"āŒ Could not parse date: {date_str}") return None # Common time formats to try time_formats = [ "%H:%M", # 14:30 "%I:%M %p", # 2:30 PM "%I:%M%p", # 2:30PM "%H.%M", # 14.30 "%I:%M %p", # 02:30 PM "%I %p", # 2 PM "%H", # 14 ] parsed_time = None for fmt in time_formats: try: parsed_time = datetime.strptime(time_str, fmt) print(f"āœ… Time parsed with format: {fmt}") break except ValueError: continue if not parsed_time: print(f"āŒ Could not parse time: {time_str}") return None # Combine date and time final_datetime = parsed_date.replace( hour=parsed_time.hour, minute=parsed_time.minute, second=0, microsecond=0 ) print(f"āœ… Final datetime: {final_datetime}") return final_datetime except Exception as e: print(f"āŒ Error parsing date/time: {str(e)}") return None def create_calendar_event(name, date, time, title="Meeting", duration_hours=1): """ Create a Google Calendar event Args: name: Name of the person date: Date string (flexible format) time: Time string (flexible format) title: Meeting title (default: "Meeting") duration_hours: Duration in hours (default: 1) Returns: dict with success status, event link, and error message if any """ global calendar_service print(f"\nšŸ“… Creating calendar event...") print(f" Name: {name}") print(f" Date: {date}") print(f" Time: {time}") print(f" Title: {title}") # Initialize service if not already done if calendar_service is None: print("šŸ”„ Calendar service not initialized, initializing now...") calendar_service = initialize_calendar() if calendar_service is None: error_msg = "Calendar service not initialized. Check credentials." print(f"āŒ {error_msg}") return { "success": False, "error": error_msg, "event_link": None } try: # Parse the date and time start_datetime = parse_datetime(date, time) if start_datetime is None: error_msg = f"Could not parse date '{date}' and time '{time}'. Please use formats like 'December 25, 2024' and '2:30 PM'" print(f"āŒ {error_msg}") return { "success": False, "error": error_msg, "event_link": None } # Calculate end time end_datetime = start_datetime + timedelta(hours=duration_hours) # Get timezone from environment or use default timezone = os.getenv('TIMEZONE', 'Asia/Kolkata') # Create event body event = { 'summary': title, 'description': f'Meeting scheduled with {name} via Voice Scheduling Agent', 'start': { 'dateTime': start_datetime.isoformat(), 'timeZone': timezone, }, 'end': { 'dateTime': end_datetime.isoformat(), 'timeZone': timezone, }, 'reminders': { 'useDefault': False, 'overrides': [ {'method': 'email', 'minutes': 24 * 60}, # 1 day before {'method': 'popup', 'minutes': 30}, # 30 min before ], }, } print(f"Sending event to Google Calendar...") # Insert the event created_event = calendar_service.events().insert( calendarId='primary', body=event ).execute() event_link = created_event.get('htmlLink') event_id = created_event.get('id') print(f"Event created successfully!") print(f" Event ID: {event_id}") print(f" Link: {event_link}") return { "success": True, "error": None, "event_link": event_link, "event_id": event_id } except Exception as e: error_msg = f"Failed to create calendar event: {str(e)}" print(f" {error_msg}") import traceback traceback.print_exc() return { "success": False, "error": error_msg, "event_link": None } def list_upcoming_events(max_results=10): """ List upcoming calendar events Args: max_results: Maximum number of events to return Returns: List of upcoming events """ global calendar_service if calendar_service is None: calendar_service = initialize_calendar() if calendar_service is None: return [] try: now = datetime.utcnow().isoformat() + 'Z' print(f"šŸ“‹ Fetching upcoming events...") events_result = calendar_service.events().list( calendarId='primary', timeMin=now, maxResults=max_results, singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) print(f"āœ… Found {len(events)} upcoming events") return events except Exception as e: print(f"āŒ Error fetching events: {str(e)}") return [] def get_busy_slots(date_str): """ Get busy slots for a specific date Args: date_str: Date string (e.g., "2024-12-25" or "December 25, 2024") Returns: String describing busy slots or "No busy slots" """ global calendar_service print(f"\nšŸ” Checking availability for: {date_str}") if calendar_service is None: calendar_service = initialize_calendar() if calendar_service is None: return "Error: Calendar service not available" try: # Parse the date to get start and end of day # We'll use a dummy time to parse the date first dt = parse_datetime(date_str, "12:00 PM") if not dt: return f"Error: Could not parse date '{date_str}'" # Define start and end of the requested day start_of_day = dt.replace(hour=0, minute=0, second=0).isoformat() + 'Z' end_of_day = dt.replace(hour=23, minute=59, second=59).isoformat() + 'Z' print(f" Querying from {start_of_day} to {end_of_day}") # Query events events_result = calendar_service.events().list( calendarId='primary', timeMin=start_of_day, timeMax=end_of_day, singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: print("āœ… No events found for this date") return "I'm completely free on that day!" busy_slots = [] for event in events: start = event['start'].get('dateTime', event['start'].get('date')) end = event['end'].get('dateTime', event['end'].get('date')) summary = event.get('summary', 'Busy') # Format times to be readable (e.g., "14:30") # If it's a full day event, it might only have 'date' if 'T' in start: start_time = datetime.fromisoformat(start.replace('Z', '+00:00')).strftime("%I:%M %p") end_time = datetime.fromisoformat(end.replace('Z', '+00:00')).strftime("%I:%M %p") busy_slots.append(f"- {start_time} to {end_time} ({summary})") else: busy_slots.append(f"- All DayString ({summary})") response_text = f"Here are my busy times for {date_str}:\n" + "\n".join(busy_slots) print(f"āœ… Found {len(events)} busy slots") return response_text except Exception as e: print(f"āŒ Error getting busy slots: {str(e)}") return "I'm having trouble checking my calendar right now." # Test function for local development if __name__ == "__main__": print("=" * 60) print("Testing Calendar Integration") print("=" * 60) # Initialize print("\n1. Initializing calendar service...") service = initialize_calendar() if service: print("\n2. Testing event creation...") result = create_calendar_event( name="Test User", date="2024-12-20", time="14:30", title="Test Meeting from Script" ) print("\n3. Result:") print(json.dumps(result, indent=2)) if result["success"]: print(f"\nāœ… Success! View event at: {result['event_link']}") else: print(f"\nāŒ Failed: {result['error']}") else: print("\nāŒ Calendar service initialization failed!") print("Check your credentials configuration.")