Ξεκινάμε εισάγοντας μερικές βιβλιοθήκες και ένα μυστικό κλειδί API YouTube. Εάν δεν έχετε κλειδί API, μπορείτε να δημιουργήσετε έναν ακόλουθο αυτόν τον οδηγό.
import requests
import json
import polars as pl
from my_sk import my_keyfrom youtube_transcript_api import YouTubeTranscriptApi
Στη συνέχεια, θα ορίσουμε μεταβλητές που θα μας βοηθήσουν να εξάγουμε δεδομένα βίντεο από το API του YouTube. Εδώ, καθορίζω το αναγνωριστικό του καναλιού μου στο YouTube και τη διεύθυνση URL του API, αρχικοποίηση page_tokenκαι δημιουργήστε μια λίστα για την αποθήκευση δεδομένων βίντεο.
# define channel ID
channel_id = 'UCa9gErQ9AE5jT2DZLjXBIdA'# define url for API
url = 'https://www.googleapis.com/youtube/v3/search'
# initialize page token
page_token = None
# intialize list to store video data
video_record_list = []
Το επόμενο κομμάτι κώδικα μπορεί να είναι τρομακτικό, οπότε θα εξηγήσω πρώτα τι συμβαίνει. Θα εκτελέσουμε αιτήματα GET στο API αναζήτησης του YouTube. Αυτό είναι ακριβώς όπως η αναζήτηση βίντεο στο YouTube, αλλά αντί να χρησιμοποιούμε τη διεπαφή χρήστη, θα πραγματοποιούμε αναζητήσεις μέσω προγραμματισμού.
Δεδομένου ότι τα αποτελέσματα αναζήτησης περιορίζονται στα 50 ανά σελίδα, πρέπει να πραγματοποιούμε αναδρομικά αναζητήσεις για να επιστρέψουμε κάθε βίντεο που αντιστοιχεί στα κριτήρια αναζήτησης. Δείτε πώς φαίνεται στον κώδικα Python.
# extract video data across multiple search result pageswhile page_token != 0:
# define parameters for API call
params = {'key': my_key, 'channelId': channel_id,
'part': ["snippet","id"], 'order': "date",
'maxResults':50, 'pageToken': page_token}
# make get request
response = requests.get(url, params=params)
# append video data from page results to list
video_record_list += getVideoRecords(response)
try:
# grab next page token
page_token = json.loads(response.text)['nextPageToken']
except:
# if no next page token kill while loop
page_token = 0
getVideoRecords() είναι μια συνάρτηση που ορίζεται από το χρήστη που εξάγει τις σχετικές πληροφορίες από μια απάντηση API.
# extract video data from single search result pagedef getVideoRecords(response: requests.models.Response) -> list:
"""
Function to extract YouTube video data from GET request response
"""
# initialize list to store video data from page results
video_record_list = []
for raw_item in json.loads(response.text)['items']:
# only execute for youtube videos
if raw_item['id']['kind'] != "youtube#video":
continue
# extract relevant data
video_record = {}
video_record['video_id'] = raw_item['id']['videoId']
video_record['datetime'] = raw_item['snippet']['publishedAt']
video_record['title'] = raw_item['snippet']['title']
# append record to list
video_record_list.append(video_record)
return video_record_list
Τώρα που έχουμε πληροφορίες για όλα τα βίντεό μου στο YouTube, ας εξαγάγουμε τους υπότιτλους που δημιουργούνται αυτόματα. Για να διευκολύνω την πρόσβαση στα αναγνωριστικά βίντεο, θα αποθηκεύσω τα δεδομένα βίντεο σε ένα πλαίσιο δεδομένων Polars.
# store data in polars dataframe
df = pl.DataFrame(video_record_list)
print(df.head())
Για να τραβήξω τους λεζάντες του βίντεο, θα χρησιμοποιήσω το youtube_transcript_api Βιβλιοθήκη Python. Θα κάνω κύκλο σε κάθε αναγνωριστικό βίντεο στο πλαίσιο δεδομένων και θα εξαγάγω τη σχετική μεταγραφή.
# intialize list to store video captions
transcript_text_list = []# loop through each row of dataframe
for i in range(len(df)):
# try to extract captions
try:
# get transcript
transcript = YouTubeTranscriptApi.get_transcript(df['video_id'][i])
# extract text transcript
transcript_text = extract_text(transcript)
# if not captions available set as n/a
except:
transcript_text = "n/a"
# append transcript text to list
transcript_text_list.append(transcript_text)
Και πάλι, χρησιμοποιώ μια συνάρτηση που ορίζεται από το χρήστη που ονομάζεται extract_text() για να εξαγάγετε τις απαραίτητες πληροφορίες από το API.
def extract_text(transcript: list) -> str:
"""
Function to extract text from transcript dictionary
"""text_list = [transcript[i]['text'] for i in range(len(transcript))]
return ' '.join(text_list)
Στη συνέχεια, μπορούμε να προσθέσουμε τις μεταγραφές για κάθε βίντεο στο πλαίσιο δεδομένων.
# add transcripts to dataframe
df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
print(df.head())
Με τα δεδομένα που εξάγονται, μπορούμε να τα μετατρέψουμε ώστε να είναι έτοιμο για την περίπτωση χρήσης κατάντη. Αυτό απαιτεί κάποια διερευνητική ανάλυση δεδομένων (EDA).
Παράδοση διπλότυπων
Ένα καλό σημείο εκκίνησης για το EDA είναι να εξετάσει τον αριθμό των μοναδικών σειρών και στοιχείων σε κάθε στήλη. Εδώ, περιμέναμε κάθε σειρά να προσδιορίζεται μοναδικά από το video_id. Επιπλέον, κάθε στήλη δεν πρέπει να έχει επαναλαμβανόμενα στοιχεία εκτός από βίντεο για τα οποία δεν υπήρχε μεταγραφή, τα οποία ορίσαμε ως “α/α“.
Εδώ είναι κάποιος κώδικας για να διερευνήσετε αυτές τις πληροφορίες. Μπορούμε να δούμε από την έξοδο ότι τα δεδομένα ταιριάζουν με τις προσδοκίες μας.
# shape + unique values
print("shape:", df.shape)
print("n unique rows:", df.n_unique())
for j in range(df.shape[1]):
print("n unique elements (" + df.columns[j] + "):", df[:,j].n_unique())### output
# shape: (84, 4)
# n unique rows: 84
# n unique elements (video_id): 84
# n unique elements (datetime): 84
# n unique elements (title): 84
# n unique elements (transcript): 82
Ελέγξτε τους τύπους d
Στη συνέχεια, μπορούμε να εξετάσουμε τους τύπους δεδομένων κάθε στήλης. Στην παραπάνω εικόνα, είδαμε ότι όλες οι στήλες είναι συμβολοσειρές.
Ενώ αυτό είναι κατάλληλο για την video_id, τίτλοςκαι αντίγραφοαυτή δεν είναι καλή επιλογή για το ημερομηνία ώρα στήλη. Μπορούμε να αλλάξουμε αυτόν τον τύπο με τον ακόλουθο τρόπο.
# change datetime to Datetime dtype
df = df.with_columns(pl.col('datetime').cast(pl.Datetime))
print(df.head())
Χειρισμός ειδικών χαρακτήρων
Δεδομένου ότι εργαζόμαστε με δεδομένα κειμένου, είναι σημαντικό να προσέχουμε για ειδικές συμβολοσειρές χαρακτήρων. Αυτό απαιτεί λίγο χειροκίνητο skimming του κειμένου, αλλά μετά από λίγα λεπτά, βρήκα 2 ειδικές περιπτώσεις: ‘ → ' και & → &
Στον παρακάτω κώδικα, αντικατέστησα αυτές τις συμβολοσειρές με τους κατάλληλους χαρακτήρες και άλλαξα “sha” προς την “Shaw“.
# list all special strings and their replacements
special_strings = [''', '&', 'sha ']
special_string_replacements = ["'", "&", "Shaw "]# replace each special string appearing in title and transcript columns
for i in range(len(special_strings)):
df = df.with_columns(df['title'].str.replace(special_strings[i],
special_string_replacements[i]).alias('title'))
df = df.with_columns(df['transcript'].str.replace(special_strings[i],
special_string_replacements[i]).alias('transcript'))
Δεδομένου ότι το σύνολο δεδομένων εδώ είναι πολύ μικρό (84 σειρές και 4 στήλες, ~900k χαρακτήρες), μπορούμε να αποθηκεύσουμε τα δεδομένα απευθείας στον κατάλογο του έργου. Αυτό μπορεί να γίνει σε μία γραμμή κώδικα χρησιμοποιώντας το write_parquet() μέθοδος στα Polars. Το τελικό μέγεθος αρχείου είναι 341 KB.
# write data to file
df.write_parquet('data/video-transcripts.parquet')
Εδώ, συζητήσαμε τα βασικά της δημιουργίας αγωγών δεδομένων στο πλαίσιο της Επιστήμης Δεδομένων Full Stack και περιηγηθήκαμε σε ένα συγκεκριμένο παράδειγμα χρησιμοποιώντας δεδομένα πραγματικού κόσμου.
Στο επόμενο άρθρο αυτής της σειράς, θα συνεχίσουμε να κατεβαίνουμε τη στοίβα τεχνολογίας της επιστήμης δεδομένων και θα συζητήσουμε πώς μπορούμε να χρησιμοποιήσουμε αυτήν τη γραμμή δεδομένων για να αναπτύξουμε ένα σύστημα σημασιολογικής αναζήτησης για τα βίντεό μου στο YouTube.
Περισσότερα σε αυτή τη σειρά 👇