File size: 2,751 Bytes
4b82ab5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from transformers import RobertaTokenizer
from torch.utils.data import Dataset, DataLoader
import torch
import json
from pathlib import Path


class VulnerabilityDataset(Dataset):
    """PyTorch dataset for vulnerability detection"""

    def __init__(self, data_path, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length

        self.data = []
        data_path = Path(data_path)

        if not data_path.exists():
            raise FileNotFoundError(f"Dataset file not found: {data_path}")

        with open(data_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line:
                    self.data.append(json.loads(line))

        print(f"{data_path.name}: {len(self.data)} samples")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]

        code = sample["func"]          # confirmed correct
        label = sample["target"]       # confirmed correct (0/1)

        encoding = self.tokenizer(
            code,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long)
        }


def load_tokenizer(model_name="Salesforce/codet5-base"):
    print(f"Tokenizer: {model_name}")
    return RobertaTokenizer.from_pretrained(model_name)


def create_dataloader(
    train_path,
    valid_path,
    test_path,
    tokenizer,
    batch_size=8,
    max_length=512,
    num_workers=2,
):
    train_dataset = VulnerabilityDataset(train_path, tokenizer, max_length)
    valid_dataset = VulnerabilityDataset(valid_path, tokenizer, max_length)
    test_dataset = VulnerabilityDataset(test_path, tokenizer, max_length)

    if len(train_dataset) == 0:
        raise RuntimeError(f"No samples found in {train_path}")

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True
    )

    valid_loader = DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True
    )

    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True
    )

    return train_loader, valid_loader, test_loader