1 Google Meet Recording Processing Pipeline

1.1 Overview

Automated system to process Google Meet recordings for client calls through video-to-audio extraction, Workers AI transcription, context-aware analysis, and integrated storage with client onboarding data.

1.2 Technical Architecture

graph TB
    A[Google Meet Recording in Drive] --> B[CF Worker Detects New Recording]
    B --> C[Download Video from Drive]
    C --> D[Extract Audio Track]
    D --> E[Workers AI Whisper Transcription]
    E --> F[Context-Aware Processing]
    F --> G[Compress Original Video]
    G --> H[Store on Hetzner S3]
    H --> I[Link to Client Notes Doc]
    I --> J[Update Client Record]

1.3 Core Processing Pipeline

1.3.1 API Endpoint: /api/recordings/process-meet-recording

export default {
  async fetch(request, env) {
    if (request.url.includes('/api/recordings/process-meet-recording')) {
      const { driveFileId, clientEmail } = await request.json();

      try {
        // 1. Download video from Google Drive
        const videoFile = await googleDrive.downloadFile(driveFileId, env.GOOGLE_DRIVE_TOKEN);

        // 2. Extract audio track for transcription
        const audioBuffer = await extractAudioFromVideo(videoFile.buffer);

        // 3. Workers AI transcription
        const transcript = await env.AI.run('@cf/openai/whisper', {
          audio: audioBuffer,
          language: 'en',
          response_format: 'verbose_json'
        });

        // 4. Get client context for enhanced processing
        const clientData = await getClientOnboardingData(clientEmail, env);

        // 5. Process transcript with client context
        const processedTranscript = await enhanceTranscript(transcript, clientData, env);

        // 6. Compress original video for storage
        const compressedVideo = await compressVideo(videoFile.buffer);

        // 7. Store everything on Hetzner S3
        const storage = await storeOnHetzner({
          originalVideo: compressedVideo,
          audioExtract: audioBuffer,
          transcript: transcript,
          processedInsights: processedTranscript,
          metadata: {
            clientEmail,
            recordingDate: videoFile.createdTime,
            duration: transcript.duration,
            originalSize: videoFile.size,
            compressedSize: compressedVideo.length
          }
        }, env);

        // 8. Link to client's Google Doc notes
        await linkToClientNotes(clientEmail, storage, env);

        // 9. Update client record in system
        await updateClientRecord(clientEmail, {
          callRecorded: true,
          transcriptUrl: storage.transcriptUrl,
          recordingUrl: storage.videoUrl,
          processedDate: new Date().toISOString()
        }, env);

        return new Response(JSON.stringify({
          success: true,
          transcriptUrl: storage.transcriptUrl,
          videoUrl: storage.videoUrl,
          processingTime: Date.now() - startTime
        }));

      } catch (error) {
        return new Response(JSON.stringify({
          error: error.message
        }), { status: 500 });
      }
    }
  }
};

// Audio extraction using FFmpeg WASM
async function extractAudioFromVideo(videoBuffer) {
  const ffmpeg = new FFmpeg();
  await ffmpeg.load();

  // Write video buffer to virtual filesystem
  await ffmpeg.writeFile('input.mp4', new Uint8Array(videoBuffer));

  // Extract audio optimized for Whisper
  await ffmpeg.exec([
    '-i', 'input.mp4',
    '-vn', // No video track
    '-ar', '16000', // 16kHz sample rate for Whisper
    '-ac', '1', // Mono channel
    '-c:a', 'pcm_s16le', // PCM format
    'output.wav'
  ]);

  // Read extracted audio
  const audioData = await ffmpeg.readFile('output.wav');
  return audioData.buffer;
}

// Video compression for efficient storage
async function compressVideo(videoBuffer) {
  const ffmpeg = new FFmpeg();
  await ffmpeg.load();

  await ffmpeg.writeFile('input.mp4', new Uint8Array(videoBuffer));

  // Compress video for storage
  await ffmpeg.exec([
    '-i', 'input.mp4',
    '-c:v', 'libx264',
    '-crf', '28', // Good quality/size balance
    '-preset', 'fast',
    '-c:a', 'aac',
    '-b:a', '128k',
    'compressed.mp4'
  ]);

  const compressedData = await ffmpeg.readFile('compressed.mp4');
  return compressedData.buffer;
}

1.4 Context-Aware Transcript Enhancement

1.4.1 Enhanced Processing Function

async function enhanceTranscript(rawTranscript, clientData, env) {
  // Build context from onboarding form
  const contextPrompt = `
  You are analyzing a client onboarding call transcript for an ethical investment advisory firm.

  Client Context from Onboarding Form:
  - Name: ${clientData.name}
  - Ethical Priorities: ${clientData.ethical_considerations?.join(', ') || 'Not specified'}
  - Divestment Alignments: ${clientData.divestment_alignments?.join(', ') || 'Not specified'}
  - Investment Experience: ${clientData.investment_experience || 'Not specified'}
  - Risk Tolerance Indicators:
    * Sleep/Stress Patterns: ${clientData.behavioral_responses?.q4_stress_sleep_patterns || 'Not specified'}
    * General Risk Avoidance: ${clientData.behavioral_responses?.q3_general_risk_avoidance || 'Not specified'}
    * Ethics vs Volatility: ${clientData.behavioral_responses?.q7_volatility_vs_ethics || 'Not specified'}
  - Financial Context: Net Worth ~${clientData.net_worth_estimate}, Investable: ${clientData.investable_net_worth}
  - Emergency Liquidity: ${clientData.emergency_liquidity}

  Novel Ethical Concerns: ${clientData.novel_ethical_concerns || 'None specified'}

  Extract and analyze:
  1. ANXIETY INDICATORS: Signs of investment anxiety, stress about volatility, concerns about losses
  2. VALUES VALIDATION: How well client's ethical priorities align with firm's approach
  3. STRATEGY PREFERENCES: Client preferences for Growth/Income/Diversification mix
  4. FINANCIAL PREPAREDNESS: Discussion of emergency funds, debt, cash flow needs
  5. ACTION ITEMS: Specific next steps mentioned in conversation
  6. RED FLAGS: Any concerns about client suitability or special considerations needed
  7. GREEN LIGHTS: Positive indicators for proceeding with investment relationship

  Provide structured analysis for advisor review.
  `;

  const enhanced = await env.AI.run('@cf/meta/llama-2-7b-chat-int8', {
    messages: [{
      role: 'system',
      content: contextPrompt
    }, {
      role: 'user',
      content: `TRANSCRIPT: ${rawTranscript.text}`
    }]
  });

  return {
    rawTranscript: rawTranscript,
    clientContext: {
      name: clientData.name,
      email: clientData.email,
      ethicalPriorities: clientData.ethical_considerations,
      riskIndicators: clientData.behavioral_responses
    },
    analysis: {
      anxietyIndicators: extractSection(enhanced.response, 'ANXIETY INDICATORS'),
      valuesValidation: extractSection(enhanced.response, 'VALUES VALIDATION'),
      strategyPreferences: extractSection(enhanced.response, 'STRATEGY PREFERENCES'),
      financialPreparedness: extractSection(enhanced.response, 'FINANCIAL PREPAREDNESS'),
      actionItems: extractSection(enhanced.response, 'ACTION ITEMS'),
      redFlags: extractSection(enhanced.response, 'RED FLAGS'),
      greenLights: extractSection(enhanced.response, 'GREEN LIGHTS')
    },
    speakerSegments: rawTranscript.segments || [],
    processingTimestamp: new Date().toISOString()
  };
}

function extractSection(text, sectionName) {
  const regex = new RegExp(`${sectionName}:(.*?)(?=\\n\\d\\.|$)`, 'is');
  const match = text.match(regex);
  return match ? match[1].trim() : 'Not found in analysis';
}

1.5 Hetzner S3 Storage Integration

1.5.1 Storage Function

async function storeOnHetzner(data, env) {
  const s3Client = new S3Client({
    region: 'us-east-1', // Hetzner region
    endpoint: env.HETZNER_S3_ENDPOINT,
    credentials: {
      accessKeyId: env.HETZNER_ACCESS_KEY,
      secretAccessKey: env.HETZNER_SECRET_KEY
    }
  });

  const timestamp = new Date().toISOString().split('T')[0];
  const clientFolder = data.metadata.clientEmail.replace('@', '_at_');

  // Create folder structure: /client-recordings/2024/01/client_email/call-type-date/
  const basePath = `client-recordings/${timestamp.slice(0,4)}/${timestamp.slice(5,7)}/${clientFolder}/${generateCallId(data.metadata)}`;

  // Store compressed video
  const videoKey = `${basePath}/recording-compressed.mp4`;
  await s3Client.send(new PutObjectCommand({
    Bucket: env.HETZNER_BUCKET_NAME,
    Key: videoKey,
    Body: data.originalVideo,
    ContentType: 'video/mp4',
    Metadata: {
      clientEmail: data.metadata.clientEmail,
      recordingDate: data.metadata.recordingDate,
      duration: data.metadata.duration.toString()
    }
  }));

  // Store extracted audio
  const audioKey = `${basePath}/extracted-audio.wav`;
  await s3Client.send(new PutObjectCommand({
    Bucket: env.HETZNER_BUCKET_NAME,
    Key: audioKey,
    Body: data.audioExtract,
    ContentType: 'audio/wav'
  }));

  // Store raw transcript
  const transcriptKey = `${basePath}/raw-transcript.json`;
  await s3Client.send(new PutObjectCommand({
    Bucket: env.HETZNER_BUCKET_NAME,
    Key: transcriptKey,
    Body: JSON.stringify(data.transcript, null, 2),
    ContentType: 'application/json'
  }));

  // Store processed insights
  const insightsKey = `${basePath}/processed-insights.json`;
  await s3Client.send(new PutObjectCommand({
    Bucket: env.HETZNER_BUCKET_NAME,
    Key: insightsKey,
    Body: JSON.stringify(data.processedInsights, null, 2),
    ContentType: 'application/json'
  }));

  // Store metadata
  const metadataKey = `${basePath}/metadata.json`;
  await s3Client.send(new PutObjectCommand({
    Bucket: env.HETZNER_BUCKET_NAME,
    Key: metadataKey,
    Body: JSON.stringify(data.metadata, null, 2),
    ContentType: 'application/json'
  }));

  return {
    videoUrl: `${env.HETZNER_S3_ENDPOINT}/${env.HETZNER_BUCKET_NAME}/${videoKey}`,
    audioUrl: `${env.HETZNER_S3_ENDPOINT}/${env.HETZNER_BUCKET_NAME}/${audioKey}`,
    transcriptUrl: `${env.HETZNER_S3_ENDPOINT}/${env.HETZNER_BUCKET_NAME}/${transcriptKey}`,
    insightsUrl: `${env.HETZNER_S3_ENDPOINT}/${env.HETZNER_BUCKET_NAME}/${insightsKey}`,
    metadataUrl: `${env.HETZNER_S3_ENDPOINT}/${env.HETZNER_BUCKET_NAME}/${metadataKey}`,
    storagePath: basePath
  };
}

function generateCallId(metadata) {
  const date = new Date(metadata.recordingDate).toISOString().split('T')[0];
  // Detect call type from filename or duration
  const isDiscovery = metadata.duration < 1800; // Less than 30 minutes
  const callType = isDiscovery ? 'discovery' : 'onboarding';
  return `${callType}-${date}`;
}

1.6 Google Drive Webhook Detection

1.6.1 Webhook Handler

async function handleDriveWebhook(request, env) {
  const notification = await request.json();

  // Check if it's a Meet recording
  if (notification.mimeType?.includes('video') &&
      (notification.name?.includes('Meet Recording') ||
       notification.name?.includes('Google Meet'))) {

    // Extract client identifier from recording metadata or filename
    const clientEmail = await identifyClientFromRecording(notification, env);

    if (clientEmail) {
      // Queue processing job
      await env.RECORDING_QUEUE.send({
        fileId: notification.id,
        clientEmail: clientEmail,
        recordingDate: notification.createdTime || new Date().toISOString(),
        fileName: notification.name
      });

      return new Response(JSON.stringify({ queued: true }));
    }
  }

  return new Response(JSON.stringify({ skipped: true }));
}

async function identifyClientFromRecording(notification, env) {
  // Method 1: Parse filename for email patterns
  const emailPattern = /([a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+\.[a-zA-Z0-9_-]+)/;
  const emailMatch = notification.name?.match(emailPattern);

  if (emailMatch) {
    return emailMatch[1];
  }

  // Method 2: Check recent calendar events for participant matching
  const recentMeetings = await getRecentCalendarEvents(env.GOOGLE_CALENDAR_TOKEN);
  const timeWindow = 2 * 60 * 60 * 1000; // 2 hours
  const recordingTime = new Date(notification.createdTime);

  for (const meeting of recentMeetings) {
    const meetingTime = new Date(meeting.start.dateTime);
    if (Math.abs(recordingTime - meetingTime) < timeWindow) {
      // Find external attendee (client)
      const clientAttendee = meeting.attendees?.find(
        attendee => !attendee.email.includes('@ethicic.com')
      );
      if (clientAttendee) {
        return clientAttendee.email;
      }
    }
  }

  return null; // Unable to identify client
}

1.7 Integration with Client Notes

1.7.1 Google Docs Integration

async function linkToClientNotes(clientEmail, storage, env) {
  // Find client's existing notes document
  const clientNotesDoc = await findClientNotesDoc(clientEmail, env);

  if (clientNotesDoc) {
    // Append transcript section to existing doc
    const transcriptSection = `
---

## Call Recording & Transcript
**Recording Date:** ${storage.metadata?.recordingDate || new Date().toISOString()}
**Recording URL:** ${storage.videoUrl}
**Transcript URL:** ${storage.transcriptUrl}
**Processed Analysis:** ${storage.insightsUrl}

### Key Insights from AI Analysis
${storage.processedInsights?.analysis ?
  Object.entries(storage.processedInsights.analysis)
    .map(([key, value]) => `**${key}:** ${value}`)
    .join('\n\n')
  : 'Processing in progress...'}

### Full Transcript Available
[Click here to view full transcript](${storage.transcriptUrl})

---
    `;

    await appendToGoogleDoc(clientNotesDoc.id, transcriptSection, env);
  }

  return { linked: !!clientNotesDoc };
}

1.8 Error Handling and Monitoring

1.8.1 Processing Status Tracking

async function updateProcessingStatus(clientEmail, status, env) {
  await env.KV_STORAGE.put(`recording_status_${clientEmail}`, JSON.stringify({
    status: status, // 'processing', 'completed', 'error'
    timestamp: new Date().toISOString(),
    lastStep: status
  }));
}

async function handleProcessingError(error, context, env) {
  console.error('Recording processing error:', error);

  // Store error details
  await env.KV_STORAGE.put(`recording_error_${context.clientEmail}`, JSON.stringify({
    error: error.message,
    context: context,
    timestamp: new Date().toISOString()
  }));

  // Optional: Send notification to admin
  await sendErrorNotification(error, context, env);
}

1.9 Environment Variables Required

# Google Drive API
GOOGLE_DRIVE_TOKEN=
GOOGLE_CALENDAR_TOKEN=

# Hetzner S3 Storage
HETZNER_S3_ENDPOINT=
HETZNER_ACCESS_KEY=
HETZNER_SECRET_KEY=
HETZNER_BUCKET_NAME=

# Processing Queue
RECORDING_QUEUE=

# Storage
KV_STORAGE=

This pipeline provides end-to-end automation from Google Meet recording detection through transcription, context-aware analysis, efficient storage, and integration with your client evaluation workflow.