petter2025 commited on
Commit
0518bdb
·
verified ·
1 Parent(s): f4ec74a

Delete utils

Browse files
utils/arf_engine_enhanced.py DELETED
@@ -1,912 +0,0 @@
1
- """
2
- ARF 3.3.9 Enhanced Engine - PhD Level Implementation
3
- FIXED: Unified detection that correctly shows REAL OSS when installed
4
- ADDED: Mathematical sophistication with Bayesian confidence intervals
5
- """
6
-
7
- import random
8
- import time
9
- import numpy as np
10
- from datetime import datetime
11
- from typing import Dict, List, Tuple, Any
12
- from dataclasses import dataclass, field
13
- from enum import Enum
14
- from scipy.stats import beta as Beta
15
-
16
- class RiskCategory(Enum):
17
- """Risk categories with mathematical bounds"""
18
- CRITICAL = (0.8, 1.0, "#F44336")
19
- HIGH = (0.6, 0.8, "#FF9800")
20
- MEDIUM = (0.4, 0.6, "#FFC107")
21
- LOW = (0.0, 0.4, "#4CAF50")
22
-
23
- @classmethod
24
- def from_score(cls, score: float) -> 'RiskCategory':
25
- """Get risk category from score"""
26
- for category in cls:
27
- lower, upper, _ = category.value
28
- if lower <= score < upper:
29
- return category
30
- return cls.LOW
31
-
32
- @property
33
- def color(self) -> str:
34
- """Get color for category"""
35
- return self.value[2]
36
-
37
- @property
38
- def emoji(self) -> str:
39
- """Get emoji for category"""
40
- emoji_map = {
41
- RiskCategory.CRITICAL: "🚨",
42
- RiskCategory.HIGH: "⚠️",
43
- RiskCategory.MEDIUM: "🔶",
44
- RiskCategory.LOW: "✅"
45
- }
46
- return emoji_map[self]
47
-
48
- @dataclass
49
- class BayesianRiskAssessment:
50
- """Enhanced risk assessment with Bayesian confidence"""
51
- score: float
52
- confidence: float
53
- category: RiskCategory
54
- confidence_interval: Tuple[float, float]
55
- factors: List[str]
56
- method: str = "bayesian"
57
-
58
- @property
59
- def formatted_score(self) -> str:
60
- """Formatted risk score"""
61
- return f"{self.score:.1%}"
62
-
63
- @property
64
- def formatted_confidence(self) -> str:
65
- """Formatted confidence"""
66
- return f"{self.confidence:.1%}"
67
-
68
- @property
69
- def confidence_width(self) -> float:
70
- """Width of confidence interval"""
71
- return self.confidence_interval[1] - self.confidence_interval[0]
72
-
73
- def to_dict(self) -> Dict[str, Any]:
74
- """Convert to dictionary"""
75
- return {
76
- 'score': self.score,
77
- 'confidence': self.confidence,
78
- 'category': self.category.name,
79
- 'category_color': self.category.color,
80
- 'category_emoji': self.category.emoji,
81
- 'confidence_interval': self.confidence_interval,
82
- 'confidence_width': self.confidence_width,
83
- 'factors': self.factors,
84
- 'method': self.method,
85
- 'formatted_score': self.formatted_score,
86
- 'formatted_confidence': self.formatted_confidence,
87
- 'is_high_risk': self.score > 0.7
88
- }
89
-
90
- class EnhancedBayesianRiskModel:
91
- """PhD-level Bayesian risk model with confidence intervals"""
92
-
93
- def __init__(self):
94
- # Conjugate priors for different action types (Beta distributions)
95
- self.priors = {
96
- 'database_drop': Beta(2, 8), # α=2, β=8 → 20% prior risk
97
- 'data_delete': Beta(3, 7), # α=3, β=7 → 30% prior risk
98
- 'permission_grant': Beta(4, 6), # α=4, β=6 → 40% prior risk
99
- 'deployment': Beta(5, 5), # α=5, β=5 → 50% prior risk
100
- 'readonly': Beta(1, 9), # α=1, β=9 → 10% prior risk
101
- }
102
-
103
- # Historical data (enterprise-scale)
104
- self.historical_data = {
105
- 'database_drop': {'successes': 95, 'failures': 5}, # 95% success rate
106
- 'data_delete': {'successes': 90, 'failures': 10}, # 90% success rate
107
- 'permission_grant': {'successes': 85, 'failures': 15}, # 85% success rate
108
- 'deployment': {'successes': 80, 'failures': 20}, # 80% success rate
109
- 'readonly': {'successes': 98, 'failures': 2}, # 98% success rate
110
- }
111
-
112
- def assess_with_confidence(self, action: str, context: Dict) -> BayesianRiskAssessment:
113
- """
114
- Bayesian risk assessment with 95% confidence intervals
115
- P(risk|data) ∝ P(data|risk) * P(risk)
116
-
117
- Returns comprehensive assessment with mathematical rigor
118
- """
119
- # Classify action type
120
- action_type = self._classify_action(action)
121
- prior = self.priors.get(action_type, self.priors['readonly'])
122
- historical = self.historical_data.get(action_type, self.historical_data['readonly'])
123
-
124
- # Context adjustment multiplier
125
- context_multiplier = self._calculate_context_multiplier(context)
126
-
127
- # Bayesian update: Posterior = Beta(α + successes, β + failures)
128
- posterior_alpha = prior.args[0] + historical['successes']
129
- posterior_beta = prior.args[1] + historical['failures']
130
-
131
- # Posterior distribution
132
- posterior = Beta(posterior_alpha, posterior_beta)
133
-
134
- # Point estimate (posterior mean)
135
- risk_score = posterior.mean() * context_multiplier
136
-
137
- # 95% credible interval
138
- ci_lower = posterior.ppf(0.025)
139
- ci_upper = posterior.ppf(0.975)
140
-
141
- # Confidence score (inverse of interval width)
142
- interval_width = ci_upper - ci_lower
143
- confidence = 1.0 - interval_width # Narrower interval = higher confidence
144
-
145
- # Cap values
146
- risk_score = min(0.99, max(0.01, risk_score))
147
- confidence = min(0.99, max(0.01, confidence))
148
-
149
- # Risk factors
150
- factors = self._extract_risk_factors(action, context, risk_score)
151
-
152
- # Risk category
153
- category = RiskCategory.from_score(risk_score)
154
-
155
- return BayesianRiskAssessment(
156
- score=risk_score,
157
- confidence=confidence,
158
- category=category,
159
- confidence_interval=(ci_lower, ci_upper),
160
- factors=factors,
161
- method=f"bayesian_{action_type}"
162
- )
163
-
164
- def _classify_action(self, action: str) -> str:
165
- """Classify action type with precision"""
166
- action_lower = action.lower()
167
-
168
- if any(word in action_lower for word in ['drop database', 'drop table', 'truncate', 'purge']):
169
- return 'database_drop'
170
- elif any(word in action_lower for word in ['delete', 'remove', 'erase', 'clear']):
171
- return 'data_delete'
172
- elif any(word in action_lower for word in ['grant', 'permission', 'access', 'admin', 'root']):
173
- return 'permission_grant'
174
- elif any(word in action_lower for word in ['deploy', 'execute', 'run', 'train', 'update']):
175
- return 'deployment'
176
- else:
177
- return 'readonly'
178
-
179
- def _calculate_context_multiplier(self, context: Dict) -> float:
180
- """Calculate context-based risk multiplier with mathematical precision"""
181
- multiplier = 1.0
182
-
183
- # Environment multiplier
184
- env = context.get('environment', '').lower()
185
- env_multipliers = {
186
- 'production': 1.5,
187
- 'staging': 1.2,
188
- 'development': 0.8,
189
- 'testing': 0.7
190
- }
191
- multiplier *= env_multipliers.get(env, 1.0)
192
-
193
- # User role multiplier
194
- user = context.get('user', '').lower()
195
- if 'junior' in user or 'intern' in user or 'new' in user:
196
- multiplier *= 1.3
197
- elif 'senior' in user or 'lead' in user or 'principal' in user:
198
- multiplier *= 0.8
199
- elif 'admin' in user or 'root' in user:
200
- multiplier *= 0.9 # Admins are more careful
201
-
202
- # Time multiplier
203
- time_of_day = context.get('time', '').lower()
204
- if any(word in time_of_day for word in ['2am', '3am', '4am', 'night', 'off-hours']):
205
- multiplier *= 1.4
206
-
207
- # Backup status multiplier
208
- backup = context.get('backup', '').lower()
209
- if backup in ['none', 'none available', 'corrupted', 'old']:
210
- multiplier *= 1.6
211
- elif backup in ['fresh', 'recent', 'verified']:
212
- multiplier *= 0.9
213
-
214
- # Compliance context
215
- compliance = context.get('compliance', '').lower()
216
- if compliance in ['pci-dss', 'hipaa', 'gdpr', 'soc2']:
217
- multiplier *= 1.3 # Higher stakes
218
-
219
- return min(2.0, max(0.5, multiplier))
220
-
221
- def _extract_risk_factors(self, action: str, context: Dict, risk_score: float) -> List[str]:
222
- """Extract mathematically significant risk factors"""
223
- factors = []
224
- action_lower = action.lower()
225
- context_str = str(context).lower()
226
-
227
- # Action-specific factors
228
- if 'drop' in action_lower and 'database' in action_lower:
229
- factors.append("Irreversible data destruction")
230
- factors.append("Potential service outage")
231
- if risk_score > 0.7:
232
- factors.append("High financial impact (>$1M)")
233
-
234
- if 'delete' in action_lower:
235
- factors.append("Data loss risk")
236
- if 'where' not in action_lower:
237
- factors.append("No WHERE clause (mass deletion risk)")
238
-
239
- if 'grant' in action_lower or 'admin' in action_lower:
240
- factors.append("Privilege escalation")
241
- factors.append("Security implications")
242
-
243
- # Context-specific factors
244
- if 'production' in context_str:
245
- factors.append("Production environment")
246
-
247
- if 'junior' in context_str or 'intern' in context_str:
248
- factors.append("Inexperienced operator")
249
-
250
- if '2am' in context_str or 'night' in context_str:
251
- factors.append("Off-hours operation")
252
-
253
- if 'backup' in context_str and ('none' in context_str or 'old' in context_str):
254
- factors.append("Inadequate backup")
255
-
256
- if 'pci' in context_str or 'hipaa' in context_str:
257
- factors.append("Regulated data environment")
258
-
259
- return factors[:4] # Return top 4 most significant factors
260
-
261
- class EnhancedPolicyEngine:
262
- """Enhanced policy engine with mathematical enforcement"""
263
-
264
- def __init__(self):
265
- # Mathematical policy definitions with confidence requirements
266
- self.policies = {
267
- "database_drop": {
268
- "risk_threshold": 0.3,
269
- "confidence_required": 0.9,
270
- "required_approvals": 2,
271
- "backup_required": True,
272
- "time_restricted": True
273
- },
274
- "data_delete": {
275
- "risk_threshold": 0.5,
276
- "confidence_required": 0.8,
277
- "required_approvals": 1,
278
- "backup_required": True,
279
- "time_restricted": False
280
- },
281
- "permission_grant": {
282
- "risk_threshold": 0.4,
283
- "confidence_required": 0.85,
284
- "required_approvals": 1,
285
- "backup_required": False,
286
- "time_restricted": False
287
- },
288
- "deployment": {
289
- "risk_threshold": 0.4,
290
- "confidence_required": 0.8,
291
- "required_approvals": 1,
292
- "backup_required": False,
293
- "tests_required": True
294
- },
295
- "readonly": {
296
- "risk_threshold": 0.8,
297
- "confidence_required": 0.6,
298
- "required_approvals": 0,
299
- "backup_required": False,
300
- "time_restricted": False
301
- }
302
- }
303
-
304
- def evaluate_mathematically(self, action_type: str, risk_assessment: BayesianRiskAssessment) -> Dict:
305
- """
306
- Mathematical policy evaluation with confidence constraints
307
- """
308
- policy = self.policies.get(action_type, self.policies["readonly"])
309
-
310
- risk_score = risk_assessment.score
311
- confidence = risk_assessment.confidence
312
-
313
- # Risk threshold compliance
314
- risk_compliant = risk_score <= policy["risk_threshold"]
315
-
316
- # Confidence requirement
317
- confidence_compliant = confidence >= policy["confidence_required"]
318
-
319
- # Determine compliance level
320
- if not risk_compliant and not confidence_compliant:
321
- compliance = "BLOCKED"
322
- reason = f"Risk ({risk_score:.1%}) > threshold ({policy['risk_threshold']:.0%}) and low confidence ({confidence:.1%})"
323
- elif not risk_compliant:
324
- compliance = "HIGH_RISK"
325
- reason = f"Risk ({risk_score:.1%}) > threshold ({policy['risk_threshold']:.0%})"
326
- elif not confidence_compliant:
327
- compliance = "LOW_CONFIDENCE"
328
- reason = f"Confidence ({confidence:.1%}) < required ({policy['confidence_required']:.0%})"
329
- else:
330
- compliance = "WITHIN_POLICY"
331
- reason = f"Within policy limits: risk ≤ {policy['risk_threshold']:.0%}, confidence ≥ {policy['confidence_required']:.0%}"
332
-
333
- # Generate recommendation
334
- if compliance == "BLOCKED":
335
- recommendation = "🚨 BLOCKED: Action exceeds both risk and confidence thresholds"
336
- elif compliance == "HIGH_RISK":
337
- approvals = policy["required_approvals"]
338
- recommendation = f"⚠️ REQUIRES {approvals} APPROVAL{'S' if approvals > 1 else ''}: High risk action"
339
- elif compliance == "LOW_CONFIDENCE":
340
- recommendation = "🔶 MANUAL REVIEW: Low confidence score requires human oversight"
341
- else:
342
- recommendation = "✅ WITHIN POLICY: Action meets all policy requirements"
343
-
344
- return {
345
- "compliance": compliance,
346
- "recommendation": recommendation,
347
- "policy_type": action_type,
348
- "risk_threshold": policy["risk_threshold"],
349
- "actual_risk": risk_score,
350
- "confidence_required": policy["confidence_required"],
351
- "actual_confidence": confidence,
352
- "reason": reason,
353
- "approvals_required": 0 if compliance == "WITHIN_POLICY" else policy["required_approvals"],
354
- "additional_requirements": self._get_additional_requirements(policy)
355
- }
356
-
357
- def _get_additional_requirements(self, policy: Dict) -> List[str]:
358
- """Get additional requirements"""
359
- requirements = []
360
- if policy.get("backup_required"):
361
- requirements.append("Verified backup required")
362
- if policy.get("time_restricted"):
363
- requirements.append("Business hours only")
364
- if policy.get("tests_required"):
365
- requirements.append("Tests must pass")
366
- return requirements
367
-
368
- class EnhancedLicenseManager:
369
- """Enhanced license manager with enterprise features"""
370
-
371
- def __init__(self):
372
- # Enterprise license definitions with mathematical gates
373
- self.tier_definitions = {
374
- "oss": {
375
- "name": "OSS Edition",
376
- "color": "#1E88E5",
377
- "execution_level": "ADVISORY_ONLY",
378
- "mechanical_gates": 0,
379
- "confidence_threshold": 0.0,
380
- "risk_prevention": 0.0,
381
- "price": "$0",
382
- "support": "Community",
383
- "sla": "None"
384
- },
385
- "trial": {
386
- "name": "Trial Edition",
387
- "color": "#FFB300",
388
- "execution_level": "OPERATOR_REVIEW",
389
- "mechanical_gates": 3,
390
- "confidence_threshold": 0.6,
391
- "risk_prevention": 0.5,
392
- "price": "$0 (14 days)",
393
- "support": "Email",
394
- "sla": "Best Effort"
395
- },
396
- "starter": {
397
- "name": "Starter Edition",
398
- "color": "#FF9800",
399
- "execution_level": "SUPERVISED",
400
- "mechanical_gates": 3,
401
- "confidence_threshold": 0.7,
402
- "risk_prevention": 0.7,
403
- "price": "$2,000/mo",
404
- "support": "Business Hours",
405
- "sla": "99.5%"
406
- },
407
- "professional": {
408
- "name": "Professional Edition",
409
- "color": "#FF6F00",
410
- "execution_level": "AUTONOMOUS_LOW",
411
- "mechanical_gates": 5,
412
- "confidence_threshold": 0.8,
413
- "risk_prevention": 0.85,
414
- "price": "$5,000/mo",
415
- "support": "24/7",
416
- "sla": "99.9%"
417
- },
418
- "enterprise": {
419
- "name": "Enterprise Edition",
420
- "color": "#D84315",
421
- "execution_level": "AUTONOMOUS_HIGH",
422
- "mechanical_gates": 7,
423
- "confidence_threshold": 0.9,
424
- "risk_prevention": 0.92,
425
- "price": "$15,000/mo",
426
- "support": "Dedicated",
427
- "sla": "99.99%"
428
- }
429
- }
430
-
431
- def validate_license(self, license_key: str = None) -> Dict:
432
- """Validate license with enhanced features"""
433
- if not license_key:
434
- return self.tier_definitions["oss"]
435
-
436
- license_upper = license_key.upper()
437
-
438
- if "ARF-TRIAL" in license_upper:
439
- tier = "trial"
440
- # Add trial-specific features
441
- tier_info = self.tier_definitions[trial].copy()
442
- tier_info["days_remaining"] = 14
443
- tier_info["scarcity_message"] = "⏳ 14-day trial ends soon"
444
- return tier_info
445
-
446
- elif "ARF-STARTER" in license_upper:
447
- tier = "starter"
448
- elif "ARF-PRO" in license_upper or "ARF-PROFESSIONAL" in license_upper:
449
- tier = "professional"
450
- elif "ARF-ENTERPRISE" in license_upper:
451
- tier = "enterprise"
452
- else:
453
- tier = "oss"
454
-
455
- return self.tier_definitions[tier]
456
-
457
- def can_execute_at_level(self, license_tier: str, execution_level: str) -> bool:
458
- """Check if license allows execution at given level"""
459
- execution_hierarchy = {
460
- "ADVISORY_ONLY": 0,
461
- "OPERATOR_REVIEW": 1,
462
- "SUPERVISED": 2,
463
- "AUTONOMOUS_LOW": 3,
464
- "AUTONOMOUS_HIGH": 4
465
- }
466
-
467
- tier_hierarchy = {
468
- "oss": 0,
469
- "trial": 1,
470
- "starter": 2,
471
- "professional": 3,
472
- "enterprise": 4
473
- }
474
-
475
- tier_level = tier_hierarchy.get(license_tier, 0)
476
- exec_level = execution_hierarchy.get(execution_level, 0)
477
-
478
- return tier_level >= exec_level
479
-
480
- class EnhancedMechanicalGateEvaluator:
481
- """Mathematical mechanical gate evaluation"""
482
-
483
- def __init__(self):
484
- # Gate definitions with mathematical weights
485
- self.gates = {
486
- "risk_assessment": {
487
- "weight": 0.3,
488
- "required": True,
489
- "function": self._evaluate_risk_gate,
490
- "description": "Assess risk against thresholds"
491
- },
492
- "policy_compliance": {
493
- "weight": 0.25,
494
- "required": True,
495
- "function": self._evaluate_policy_gate,
496
- "description": "Verify policy compliance"
497
- },
498
- "license_validation": {
499
- "weight": 0.2,
500
- "required": True,
501
- "function": self._evaluate_license_gate,
502
- "description": "Validate license entitlement"
503
- },
504
- "rollback_feasibility": {
505
- "weight": 0.15,
506
- "required": False,
507
- "function": self._evaluate_rollback_gate,
508
- "description": "Ensure action reversibility"
509
- },
510
- "resource_availability": {
511
- "weight": 0.1,
512
- "required": False,
513
- "function": self._evaluate_resource_gate,
514
- "description": "Check resource constraints"
515
- },
516
- "admin_approval": {
517
- "weight": 0.1,
518
- "required": False,
519
- "function": self._evaluate_approval_gate,
520
- "description": "Executive approval"
521
- }
522
- }
523
-
524
- def evaluate_gates(self, risk_assessment: BayesianRiskAssessment,
525
- policy_result: Dict, license_info: Dict) -> Dict:
526
- """Evaluate all applicable mechanical gates"""
527
- gate_results = []
528
- total_weight = 0
529
- weighted_score = 0
530
-
531
- # Required gates (always evaluated)
532
- for gate_name, gate_def in self.gates.items():
533
- if gate_def["required"]:
534
- result = gate_def["function"](risk_assessment, policy_result, license_info)
535
- gate_results.append(result)
536
-
537
- if result["passed"]:
538
- weighted_score += gate_def["weight"]
539
- total_weight += gate_def["weight"]
540
-
541
- # Optional gates based on license tier
542
- license_tier = license_info.get("name", "OSS Edition").lower()
543
-
544
- if "trial" in license_tier or "starter" in license_tier:
545
- # Add resource gate
546
- resource_result = self._evaluate_resource_gate(risk_assessment, policy_result, license_info)
547
- gate_results.append(resource_result)
548
-
549
- if resource_result["passed"]:
550
- weighted_score += self.gates["resource_availability"]["weight"]
551
- total_weight += self.gates["resource_availability"]["weight"]
552
-
553
- if "professional" in license_tier or "enterprise" in license_tier:
554
- # Add rollback gate
555
- rollback_result = self._evaluate_rollback_gate(risk_assessment, policy_result, license_info)
556
- gate_results.append(rollback_result)
557
-
558
- if rollback_result["passed"]:
559
- weighted_score += self.gates["rollback_feasibility"]["weight"]
560
- total_weight += self.gates["rollback_feasibility"]["weight"]
561
-
562
- # Add approval gate for high-risk in enterprise
563
- if "enterprise" in license_tier and risk_assessment.score > 0.6:
564
- approval_result = self._evaluate_approval_gate(risk_assessment, policy_result, license_info)
565
- gate_results.append(approval_result)
566
-
567
- if approval_result["passed"]:
568
- weighted_score += self.gates["admin_approval"]["weight"]
569
- total_weight += self.gates["admin_approval"]["weight"]
570
-
571
- # Calculate overall gate score
572
- gate_score = weighted_score / total_weight if total_weight > 0 else 0
573
-
574
- # Determine if all required gates passed
575
- required_gates = [g for g in gate_results if self.gates.get(g["name"].lower().replace(" ", "_"), {}).get("required", False)]
576
- all_required_passed = all(g["passed"] for g in required_gates)
577
-
578
- # Decision logic
579
- if not all_required_passed:
580
- decision = "BLOCKED"
581
- reason = "Failed required mechanical gates"
582
- elif gate_score >= 0.9:
583
- decision = "AUTONOMOUS"
584
- reason = "Passed all mechanical gates with high confidence"
585
- elif gate_score >= 0.7:
586
- decision = "SUPERVISED"
587
- reason = "Passed gates but requires monitoring"
588
- else:
589
- decision = "HUMAN_APPROVAL"
590
- reason = "Requires human review and approval"
591
-
592
- return {
593
- "gate_results": gate_results,
594
- "gate_score": gate_score,
595
- "decision": decision,
596
- "reason": reason,
597
- "gates_passed": len([g for g in gate_results if g["passed"]]),
598
- "total_gates": len(gate_results),
599
- "required_passed": all_required_passed,
600
- "gate_details": self._format_gate_details(gate_results)
601
- }
602
-
603
- def _evaluate_risk_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
604
- """Evaluate risk assessment gate"""
605
- risk_score = risk_assessment.score
606
- confidence = risk_assessment.confidence
607
-
608
- # Risk threshold from license
609
- license_tier = license_info.get("name", "OSS Edition").lower()
610
- risk_threshold = 0.8 # Default
611
-
612
- if "trial" in license_tier:
613
- risk_threshold = 0.7
614
- elif "starter" in license_tier:
615
- risk_threshold = 0.6
616
- elif "professional" in license_tier:
617
- risk_threshold = 0.5
618
- elif "enterprise" in license_tier:
619
- risk_threshold = 0.4
620
-
621
- passed = risk_score < risk_threshold and confidence > 0.6
622
- score = (risk_threshold - min(risk_score, risk_threshold)) / risk_threshold * 0.5
623
- score += (confidence - 0.6) / 0.4 * 0.5 if confidence > 0.6 else 0
624
-
625
- return {
626
- "name": "Risk Assessment",
627
- "passed": passed,
628
- "score": max(0, min(1, score)),
629
- "details": f"Risk: {risk_score:.1%} < {risk_threshold:.0%}, Confidence: {confidence:.1%}",
630
- "required": True
631
- }
632
-
633
- def _evaluate_policy_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
634
- """Evaluate policy compliance gate"""
635
- compliance = policy_result.get("compliance", "BLOCKED")
636
- passed = compliance not in ["BLOCKED", "HIGH_RISK"]
637
- score = 1.0 if passed else 0.3
638
-
639
- return {
640
- "name": "Policy Compliance",
641
- "passed": passed,
642
- "score": score,
643
- "details": f"Policy: {compliance}",
644
- "required": True
645
- }
646
-
647
- def _evaluate_license_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
648
- """Evaluate license validation gate"""
649
- license_name = license_info.get("name", "OSS Edition")
650
- passed = license_name != "OSS Edition"
651
- score = 1.0 if passed else 0.0
652
-
653
- return {
654
- "name": "License Validation",
655
- "passed": passed,
656
- "score": score,
657
- "details": f"License: {license_name}",
658
- "required": True
659
- }
660
-
661
- def _evaluate_rollback_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
662
- """Evaluate rollback feasibility gate"""
663
- risk_score = risk_assessment.score
664
- # Rollback more feasible for lower risk actions
665
- passed = risk_score < 0.7
666
- score = 0.9 if passed else 0.2
667
-
668
- return {
669
- "name": "Rollback Feasibility",
670
- "passed": passed,
671
- "score": score,
672
- "details": "Rollback possible" if passed else "Rollback difficult",
673
- "required": False
674
- }
675
-
676
- def _evaluate_resource_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
677
- """Evaluate resource availability gate"""
678
- # Simulated resource check
679
- passed = random.random() > 0.3 # 70% chance of passing
680
- score = 0.8 if passed else 0.3
681
-
682
- return {
683
- "name": "Resource Availability",
684
- "passed": passed,
685
- "score": score,
686
- "details": "Resources available" if passed else "Resource constraints",
687
- "required": False
688
- }
689
-
690
- def _evaluate_approval_gate(self, risk_assessment: BayesianRiskAssessment, policy_result: Dict, license_info: Dict) -> Dict:
691
- """Evaluate admin approval gate"""
692
- # For high-risk actions, requires manual approval
693
- risk_score = risk_assessment.score
694
- passed = risk_score < 0.6 # Auto-pass if risk is moderate
695
- score = 1.0 if passed else 0.0
696
-
697
- return {
698
- "name": "Admin Approval",
699
- "passed": passed,
700
- "score": score,
701
- "details": "Auto-approved" if passed else "Requires manual approval",
702
- "required": False
703
- }
704
-
705
- def _format_gate_details(self, gate_results: List[Dict]) -> List[Dict]:
706
- """Format gate details for display"""
707
- return [
708
- {
709
- "gate": r["name"],
710
- "status": "✅ PASSED" if r["passed"] else "❌ FAILED",
711
- "score": f"{r['score']:.1%}",
712
- "details": r["details"]
713
- }
714
- for r in gate_results
715
- ]
716
-
717
- class EnhancedARFEngine:
718
- """Enterprise-grade reliability engine with PhD-level mathematics"""
719
-
720
- def __init__(self):
721
- self.risk_model = EnhancedBayesianRiskModel()
722
- self.policy_engine = EnhancedPolicyEngine()
723
- self.license_manager = EnhancedLicenseManager()
724
- self.gate_evaluator = EnhancedMechanicalGateEvaluator()
725
-
726
- # Statistics with mathematical rigor
727
- self.stats = {
728
- "actions_tested": 0,
729
- "risks_prevented": 0,
730
- "high_risk_blocked": 0,
731
- "license_validations": 0,
732
- "mechanical_gates_triggered": 0,
733
- "confidence_average": 0.0,
734
- "risk_average": 0.0,
735
- "start_time": time.time()
736
- }
737
-
738
- self.history = []
739
- self.arf_status = "REAL_OSS" # Unified status
740
-
741
- def assess_action(self, action: str, context: Dict, license_key: str = None) -> Dict:
742
- """Comprehensive action assessment with mathematical rigor"""
743
- start_time = time.time()
744
-
745
- # 1. Bayesian risk assessment with confidence intervals
746
- risk_assessment = self.risk_model.assess_with_confidence(action, context)
747
-
748
- # 2. Action type classification
749
- action_type = self.risk_model._classify_action(action)
750
-
751
- # 3. Policy evaluation with confidence constraints
752
- policy_result = self.policy_engine.evaluate_mathematically(action_type, risk_assessment)
753
-
754
- # 4. License validation
755
- license_info = self.license_manager.validate_license(license_key)
756
-
757
- # 5. Mechanical gate evaluation
758
- gate_results = self.gate_evaluator.evaluate_gates(risk_assessment, policy_result, license_info)
759
-
760
- # 6. Generate enterprise recommendation
761
- recommendation = self._generate_enterprise_recommendation(
762
- risk_assessment, policy_result, license_info, gate_results
763
- )
764
-
765
- # 7. Calculate processing metrics
766
- processing_time = (time.time() - start_time) * 1000 # ms
767
-
768
- # 8. Update statistics with mathematical precision
769
- self._update_statistics(risk_assessment, policy_result, gate_results)
770
-
771
- # 9. Store in history
772
- history_entry = {
773
- "action": action[:50] + "..." if len(action) > 50 else action,
774
- "risk_score": risk_assessment.score,
775
- "confidence": risk_assessment.confidence,
776
- "license_tier": license_info.get("name", "OSS Edition"),
777
- "gate_decision": gate_results["decision"],
778
- "timestamp": datetime.now().isoformat(),
779
- "arf_status": self.arf_status
780
- }
781
- self.history.append(history_entry)
782
-
783
- # Keep only last 100 entries
784
- if len(self.history) > 100:
785
- self.history = self.history[-100:]
786
-
787
- # 10. Compile comprehensive result
788
- return {
789
- "risk_assessment": risk_assessment.to_dict(),
790
- "policy_result": policy_result,
791
- "license_info": license_info,
792
- "gate_results": gate_results,
793
- "recommendation": recommendation,
794
- "processing_metrics": {
795
- "processing_time_ms": round(processing_time, 1),
796
- "assessment_method": "bayesian_with_confidence",
797
- "arf_status": self.arf_status,
798
- "version": "3.3.9"
799
- },
800
- "statistics": self.get_enhanced_stats()
801
- }
802
-
803
- def _generate_enterprise_recommendation(self, risk_assessment: BayesianRiskAssessment,
804
- policy_result: Dict, license_info: Dict,
805
- gate_results: Dict) -> str:
806
- """Generate mathematically-informed enterprise recommendation"""
807
- license_name = license_info.get("name", "OSS Edition")
808
- decision = gate_results["decision"]
809
- risk_score = risk_assessment.score
810
-
811
- if license_name == "OSS Edition":
812
- if risk_score > 0.7:
813
- return "🚨 CRITICAL RISK: Would be BLOCKED by mechanical gates (Enterprise required)"
814
- elif risk_score > 0.4:
815
- return "⚠️ MODERATE RISK: Requires manual review (Mechanical gates automate this)"
816
- else:
817
- return "✅ LOW RISK: Appears safe but cannot execute without license"
818
-
819
- elif decision == "BLOCKED":
820
- risk_factors = ", ".join(risk_assessment.factors[:2])
821
- return f"❌ BLOCKED: Action prevented by mechanical gates. Risk factors: {risk_factors}"
822
-
823
- elif decision == "HUMAN_APPROVAL":
824
- return "🔄 REQUIRES HUMAN APPROVAL: Action meets risk threshold but requires oversight"
825
-
826
- elif decision == "SUPERVISED":
827
- return "👁️ SUPERVISED EXECUTION: Action passes gates but requires monitoring"
828
-
829
- elif decision == "AUTONOMOUS":
830
- confidence = risk_assessment.confidence
831
- return f"✅ AUTONOMOUS APPROVAL: Action passes all mechanical gates with {confidence:.0%} confidence"
832
-
833
- else:
834
- return "⚡ PROCESSING: Action under evaluation"
835
-
836
- def _update_statistics(self, risk_assessment: BayesianRiskAssessment,
837
- policy_result: Dict, gate_results: Dict):
838
- """Update statistics with mathematical precision"""
839
- self.stats["actions_tested"] += 1
840
-
841
- # Update rolling averages
842
- n = self.stats["actions_tested"]
843
- old_avg_risk = self.stats["risk_average"]
844
- old_avg_conf = self.stats["confidence_average"]
845
-
846
- self.stats["risk_average"] = old_avg_risk + (risk_assessment.score - old_avg_risk) / n
847
- self.stats["confidence_average"] = old_avg_conf + (risk_assessment.confidence - old_avg_conf) / n
848
-
849
- # Count high-risk blocks
850
- if risk_assessment.score > 0.7:
851
- self.stats["high_risk_blocked"] += 1
852
-
853
- # Count prevented risks
854
- if gate_results["decision"] == "BLOCKED":
855
- self.stats["risks_prevented"] += 1
856
-
857
- # Count gate triggers
858
- if gate_results["total_gates"] > 0:
859
- self.stats["mechanical_gates_triggered"] += 1
860
-
861
- # Count license validations
862
- if gate_results["gate_results"]:
863
- license_gate = next((g for g in gate_results["gate_results"] if g["name"] == "License Validation"), None)
864
- if license_gate and license_gate["passed"]:
865
- self.stats["license_validations"] += 1
866
-
867
- def get_enhanced_stats(self) -> Dict:
868
- """Get enhanced statistics with mathematical insights"""
869
- elapsed_hours = (time.time() - self.stats["start_time"]) / 3600
870
-
871
- # Calculate prevention rate
872
- prevention_rate = 0.0
873
- if self.stats["actions_tested"] > 0:
874
- prevention_rate = self.stats["risks_prevented"] / self.stats["actions_tested"]
875
-
876
- # Calculate reliability score (mathematically grounded)
877
- reliability_score = 95.0 + (prevention_rate * 5.0) # Base 95% + prevention bonus
878
-
879
- return {
880
- **self.stats,
881
- "actions_per_hour": round(self.stats["actions_tested"] / max(elapsed_hours, 0.1), 1),
882
- "reliability_score": min(99.99, reliability_score),
883
- "prevention_rate": round(prevention_rate * 100, 1),
884
- "average_risk": round(self.stats["risk_average"] * 100, 1),
885
- "average_confidence": round(self.stats["confidence_average"] * 100, 1),
886
- "gate_effectiveness": round((self.stats["risks_prevented"] / max(self.stats["high_risk_blocked"], 1)) * 100, 1),
887
- "history_size": len(self.history),
888
- "demo_duration_hours": round(elapsed_hours, 2),
889
- "arf_status": self.arf_status
890
- }
891
-
892
- def set_arf_status(self, status: str):
893
- """Set ARF status (REAL_OSS, SIMULATION, etc.)"""
894
- self.arf_status = status
895
-
896
- def get_action_history(self, limit: int = 10) -> List[Dict]:
897
- """Get action history with limits"""
898
- return self.history[:limit]
899
-
900
- def reset_statistics(self):
901
- """Reset statistics (for demo purposes)"""
902
- self.stats = {
903
- "actions_tested": 0,
904
- "risks_prevented": 0,
905
- "high_risk_blocked": 0,
906
- "license_validations": 0,
907
- "mechanical_gates_triggered": 0,
908
- "confidence_average": 0.0,
909
- "risk_average": 0.0,
910
- "start_time": time.time()
911
- }
912
- self.history = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/arf_simulation.py DELETED
@@ -1,148 +0,0 @@
1
- """
2
- ARF Simulation - Fallback when real ARF is not available
3
- """
4
-
5
- import random
6
- from datetime import datetime
7
- from typing import Dict, Any, Optional
8
-
9
- class RiskEngine:
10
- def assess(self, action: str, context: Dict) -> Dict:
11
- """Simulate risk assessment"""
12
- action_lower = action.lower()
13
- risk = 0.25
14
-
15
- if "drop" in action_lower and "database" in action_lower:
16
- risk = 0.85
17
- factors = ["Destructive operation", "Data loss", "Production impact"]
18
- elif "delete" in action_lower:
19
- risk = 0.65
20
- factors = ["Data deletion", "Write operation"]
21
- elif "update" in action_lower and "where" not in action_lower:
22
- risk = 0.75
23
- factors = ["Mass update", "No WHERE clause"]
24
- elif "grant" in action_lower:
25
- risk = 0.55
26
- factors = ["Privilege escalation", "Security implications"]
27
- else:
28
- risk = 0.35 + random.random() * 0.2
29
- factors = ["Standard operation"]
30
-
31
- # Adjust based on context
32
- if "production" in str(context).lower():
33
- risk *= 1.3
34
- factors.append("Production environment")
35
-
36
- risk = min(0.95, max(0.25, risk))
37
-
38
- return {
39
- "risk_score": risk,
40
- "confidence": 0.8 + random.random() * 0.15,
41
- "risk_factors": factors,
42
- "timestamp": datetime.now().isoformat()
43
- }
44
-
45
- class PolicyEngine:
46
- def evaluate(self, action: Any, risk_score: float, context: Dict) -> str:
47
- """Simulate policy evaluation"""
48
- if risk_score > 0.7:
49
- return "HIGH_RISK"
50
- elif risk_score > 0.4:
51
- return "MODERATE_RISK"
52
- return "LOW_RISK"
53
-
54
- class ActionValidator:
55
- def parse_action(self, action: str) -> Dict:
56
- """Parse action into structured format"""
57
- return {
58
- "raw": action,
59
- "type": self._classify_action(action),
60
- "tokens": action.split(),
61
- "parsed_at": datetime.now().isoformat()
62
- }
63
-
64
- def _classify_action(self, action: str) -> str:
65
- """Classify action type"""
66
- action_lower = action.lower()
67
- if "drop" in action_lower:
68
- return "DESTRUCTIVE"
69
- elif "delete" in action_lower:
70
- return "DELETE"
71
- elif "update" in action_lower:
72
- return "UPDATE"
73
- elif "grant" in action_lower:
74
- return "PRIVILEGE"
75
- else:
76
- return "QUERY"
77
-
78
- class LicenseManager:
79
- def validate(self, license_key: Optional[str] = None) -> Dict:
80
- """Validate license key"""
81
- if not license_key:
82
- return {"tier": "oss", "name": "OSS Edition", "features": []}
83
-
84
- key_upper = license_key.upper()
85
-
86
- if "ARF-TRIAL" in key_upper:
87
- return {
88
- "tier": "trial",
89
- "name": "Trial Edition",
90
- "features": ["mechanical_gates", "email_support"],
91
- "expires": (datetime.now().timestamp() + 14 * 86400)
92
- }
93
- elif "ARF-PRO" in key_upper:
94
- return {
95
- "tier": "professional",
96
- "name": "Professional Edition",
97
- "features": ["mechanical_gates", "24_7_support", "advanced_gates"],
98
- "price": "$5,000/month"
99
- }
100
- elif "ARF-ENTERPRISE" in key_upper:
101
- return {
102
- "tier": "enterprise",
103
- "name": "Enterprise Edition",
104
- "features": ["full_mechanical_gates", "dedicated_support", "custom_gates", "soc2_compliance"],
105
- "price": "$15,000/month"
106
- }
107
-
108
- return {"tier": "oss", "name": "OSS Edition", "features": []}
109
-
110
- class BayesianRiskScorer:
111
- def assess(self, action: Dict, context: Dict) -> Dict:
112
- """Simulate Bayesian risk assessment"""
113
- # Simplified Bayesian scoring
114
- action_type = action.get("type", "QUERY")
115
-
116
- # Priors based on action type
117
- priors = {
118
- "DESTRUCTIVE": 0.7,
119
- "DELETE": 0.6,
120
- "UPDATE": 0.5,
121
- "PRIVILEGE": 0.4,
122
- "QUERY": 0.2
123
- }
124
-
125
- prior = priors.get(action_type, 0.5)
126
-
127
- # Likelihood adjustments
128
- context_str = str(context).lower()
129
- likelihood = 1.0
130
-
131
- if "production" in context_str:
132
- likelihood *= 1.3
133
- if "junior" in context_str or "intern" in context_str:
134
- likelihood *= 1.2
135
-
136
- # Posterior (simplified)
137
- posterior = (prior * likelihood) / (prior * likelihood + (1 - prior))
138
-
139
- # Add some variance
140
- posterior += random.uniform(-0.05, 0.05)
141
- posterior = max(0.25, min(0.95, posterior))
142
-
143
- return {
144
- "risk_score": posterior,
145
- "confidence": 0.85,
146
- "risk_factors": [f"{action_type} operation"],
147
- "method": "bayesian_simulation"
148
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/business_logic.py DELETED
@@ -1,234 +0,0 @@
1
- """
2
- Business Value Demonstration
3
- ROI Calculator, Tier Pricing, Upgrade Paths
4
- """
5
-
6
- from typing import Dict, List
7
-
8
- class BusinessValueCalculator:
9
- """Calculates business value and ROI for ARF tiers"""
10
-
11
- def __init__(self):
12
- # Industry benchmarks
13
- self.benchmarks = {
14
- "incident_cost": 100000, # Average incident cost
15
- "incident_reduction": {
16
- "oss": 0.0,
17
- "trial": 0.5,
18
- "starter": 0.7,
19
- "professional": 0.85,
20
- "enterprise": 0.92
21
- },
22
- "time_savings_minutes": 15, # Minutes saved per decision
23
- "decisions_per_day": 20,
24
- "engineer_cost_hourly": 150, # $/hour
25
- "operating_days": 250, # Business days per year
26
- "team_size": {
27
- "oss": 1,
28
- "starter": 5,
29
- "professional": 15,
30
- "enterprise": 50
31
- }
32
- }
33
-
34
- # Tier pricing
35
- self.tier_pricing = {
36
- "oss": 0,
37
- "starter": 2000,
38
- "professional": 5000,
39
- "enterprise": 15000
40
- }
41
-
42
- # Feature comparison
43
- self.feature_comparison = {
44
- "oss": {
45
- "name": "OSS Edition",
46
- "price": "$0",
47
- "enforcement": "Advisory Only",
48
- "mechanical_gates": "❌ None",
49
- "approval_workflows": "❌ Manual",
50
- "audit_trail": "❌ None",
51
- "support": "Community",
52
- "sla": "None",
53
- "best_for": "Evaluation"
54
- },
55
- "starter": {
56
- "name": "Starter",
57
- "price": "$2,000/mo",
58
- "enforcement": "Mechanical Gates",
59
- "mechanical_gates": "✅ 3 Gates",
60
- "approval_workflows": "✅ Basic",
61
- "audit_trail": "✅ 30 days",
62
- "support": "Business Hours",
63
- "sla": "99.5%",
64
- "best_for": "Small Teams"
65
- },
66
- "professional": {
67
- "name": "Professional",
68
- "price": "$5,000/mo",
69
- "enforcement": "Advanced Gates",
70
- "mechanical_gates": "✅ 5 Gates",
71
- "approval_workflows": "✅ Advanced",
72
- "audit_trail": "✅ 1 year",
73
- "support": "24/7",
74
- "sla": "99.9%",
75
- "best_for": "Growing Companies"
76
- },
77
- "enterprise": {
78
- "name": "Enterprise",
79
- "price": "$15,000/mo",
80
- "enforcement": "Full Mechanical",
81
- "mechanical_gates": "✅ 7 Gates",
82
- "approval_workflows": "✅ Custom",
83
- "audit_trail": "✅ Unlimited",
84
- "support": "Dedicated",
85
- "sla": "99.99%",
86
- "best_for": "Enterprise Scale"
87
- }
88
- }
89
-
90
- def calculate_roi(self, current_tier: str, target_tier: str) -> Dict:
91
- """Calculate ROI for upgrade"""
92
- current_price = self.tier_pricing.get(current_tier, 0)
93
- target_price = self.tier_pricing.get(target_tier, 0)
94
-
95
- # Calculate incident cost savings
96
- current_reduction = self.benchmarks["incident_reduction"].get(current_tier, 0)
97
- target_reduction = self.benchmarks["incident_reduction"].get(target_tier, 0)
98
-
99
- incident_savings = self.benchmarks["incident_cost"] * (target_reduction - current_reduction) * 12
100
-
101
- # Calculate time savings
102
- team_size_current = self.benchmarks["team_size"].get(current_tier, 1)
103
- team_size_target = self.benchmarks["team_size"].get(target_tier, 1)
104
-
105
- avg_team_size = (team_size_current + team_size_target) / 2
106
-
107
- time_savings = (
108
- self.benchmarks["time_savings_minutes"] / 60 * # Hours per decision
109
- self.benchmarks["decisions_per_day"] * # Decisions per day
110
- self.benchmarks["operating_days"] * # Days per year
111
- self.benchmarks["engineer_cost_hourly"] * # Cost per hour
112
- avg_team_size # Team size
113
- )
114
-
115
- # Total annual savings
116
- annual_savings = incident_savings + time_savings
117
-
118
- # Calculate ROI
119
- price_difference = target_price - current_price
120
- annual_price_difference = price_difference * 12
121
-
122
- if annual_savings > 0:
123
- roi_months = (annual_price_difference / annual_savings) * 12
124
- else:
125
- roi_months = 999
126
-
127
- # Payback period
128
- if annual_savings > annual_price_difference:
129
- payback_months = (annual_price_difference / annual_savings) * 12
130
- else:
131
- payback_months = roi_months
132
-
133
- return {
134
- "current_tier": current_tier.upper(),
135
- "target_tier": target_tier.upper(),
136
- "annual_savings": f"${annual_savings:,.0f}",
137
- "incident_savings": f"${incident_savings:,.0f}",
138
- "time_savings": f"${time_savings:,.0f}",
139
- "monthly_investment": f"${price_difference:,.0f}",
140
- "roi_months": f"{roi_months:.1f}",
141
- "payback_months": f"{payback_months:.1f}",
142
- "annual_roi": f"{(annual_savings / max(annual_price_difference, 1)) * 100:.0f}%"
143
- }
144
-
145
- def get_tier_comparison(self) -> List[Dict]:
146
- """Get tier comparison matrix"""
147
- return [self.feature_comparison[tier] for tier in ["oss", "starter", "professional", "enterprise"]]
148
-
149
- def calculate_enterprise_value(self, company_size: int = 100, incidents_per_year: int = 5) -> Dict:
150
- """Calculate enterprise-specific value"""
151
- # Incident cost avoidance
152
- incident_avoidance = incidents_per_year * self.benchmarks["incident_cost"] * self.benchmarks["incident_reduction"]["enterprise"]
153
-
154
- # Productivity savings
155
- productivity_savings = (
156
- company_size *
157
- self.benchmarks["time_savings_minutes"] / 60 *
158
- self.benchmarks["decisions_per_day"] *
159
- self.benchmarks["operating_days"] *
160
- self.benchmarks["engineer_cost_hourly"]
161
- )
162
-
163
- # Compliance value (estimated)
164
- compliance_value = 500000 # Estimated value of compliance automation
165
-
166
- total_value = incident_avoidance + productivity_savings + compliance_value
167
- enterprise_cost = self.tier_pricing["enterprise"] * 12
168
-
169
- return {
170
- "company_size": company_size,
171
- "incidents_prevented": incidents_per_year * self.benchmarks["incident_reduction"]["enterprise"],
172
- "incident_avoidance": f"${incident_avoidance:,.0f}",
173
- "productivity_savings": f"${productivity_savings:,.0f}",
174
- "compliance_value": f"${compliance_value:,.0f}",
175
- "total_annual_value": f"${total_value:,.0f}",
176
- "enterprise_cost": f"${enterprise_cost:,.0f}",
177
- "value_ratio": f"{total_value / enterprise_cost:.1f}x",
178
- "monthly_roi": f"${(total_value - enterprise_cost) / 12:,.0f}"
179
- }
180
-
181
- def generate_upgrade_path(self, current_tier: str) -> List[Dict]:
182
- """Generate upgrade path from current tier"""
183
- tiers = ["oss", "starter", "professional", "enterprise"]
184
-
185
- try:
186
- current_index = tiers.index(current_tier)
187
- except ValueError:
188
- current_index = 0
189
-
190
- path = []
191
- for i in range(current_index + 1, len(tiers)):
192
- target_tier = tiers[i]
193
- roi_data = self.calculate_roi(current_tier, target_tier)
194
-
195
- path.append({
196
- "from": self.feature_comparison[current_tier]["name"],
197
- "to": self.feature_comparison[target_tier]["name"],
198
- "price_increase": f"${self.tier_pricing[target_tier] - self.tier_pricing[current_tier]:,.0f}/mo",
199
- "annual_savings": roi_data["annual_savings"],
200
- "payback_period": roi_data["payback_months"] + " months",
201
- "key_features": self._get_upgrade_features(current_tier, target_tier)
202
- })
203
-
204
- return path
205
-
206
- def _get_upgrade_features(self, from_tier: str, to_tier: str) -> List[str]:
207
- """Get key features gained in upgrade"""
208
- features = {
209
- "oss→starter": [
210
- "Mechanical gates (3 gates)",
211
- "Basic approval workflows",
212
- "30-day audit trail",
213
- "Business hours support"
214
- ],
215
- "starter→professional": [
216
- "Advanced gates (5 gates)",
217
- "Custom approval workflows",
218
- "1-year audit trail",
219
- "24/7 support",
220
- "99.9% SLA"
221
- ],
222
- "professional→enterprise": [
223
- "Full mechanical gates (7 gates)",
224
- "Custom gate development",
225
- "Unlimited audit trail",
226
- "Dedicated support engineer",
227
- "99.99% SLA",
228
- "On-prem deployment",
229
- "SOC 2 compliance automation"
230
- ]
231
- }
232
-
233
- key = f"{from_tier}→{to_tier}"
234
- return features.get(key, ["Enhanced features and support"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/psychology_layer_enhanced.py DELETED
@@ -1,689 +0,0 @@
1
- """
2
- Enhanced Psychology Layer with Prospect Theory Mathematics
3
- PhD-Level Psychological Optimization for Investor Demos
4
- """
5
-
6
- import random
7
- import numpy as np
8
- from typing import Dict, List, Tuple, Any
9
- from dataclasses import dataclass
10
- from enum import Enum
11
-
12
- class PsychologicalPrinciple(Enum):
13
- """Psychological principles with mathematical implementations"""
14
- LOSS_AVERSION = "loss_aversion"
15
- PROSPECT_THEORY = "prospect_theory"
16
- SOCIAL_PROOF = "social_proof"
17
- SCARCITY = "scarcity"
18
- AUTHORITY = "authority"
19
- ANCHORING = "anchoring"
20
-
21
- @dataclass
22
- class ProspectTheoryParameters:
23
- """Kahneman & Tversky's Prospect Theory parameters"""
24
- alpha: float = 0.88 # Risk aversion for gains (0 ≤ α ≤ 1)
25
- beta: float = 0.88 # Risk seeking for losses (0 ≤ β ≤ 1)
26
- lambda_param: float = 2.25 # Loss aversion coefficient (λ > 1)
27
- gamma: float = 0.61 # Probability weighting for gains
28
- delta: float = 0.69 # Probability weighting for losses
29
-
30
- def __post_init__(self):
31
- """Validate parameters"""
32
- assert 0 < self.alpha <= 1, "Alpha must be between 0 and 1"
33
- assert 0 < self.beta <= 1, "Beta must be between 0 and 1"
34
- assert self.lambda_param > 1, "Lambda must be greater than 1"
35
- assert 0 < self.gamma <= 1, "Gamma must be between 0 and 1"
36
- assert 0 < self.delta <= 1, "Delta must be between 0 and 1"
37
-
38
- class ProspectTheoryEngine:
39
- """Mathematical implementation of Kahneman & Tversky's Prospect Theory"""
40
-
41
- def __init__(self, params: ProspectTheoryParameters = None):
42
- self.params = params or ProspectTheoryParameters()
43
-
44
- def value_function(self, x: float) -> float:
45
- """
46
- Kahneman & Tversky's value function:
47
- v(x) = { x^α if x ≥ 0, -λ(-x)^β if x < 0 }
48
-
49
- For risk scores (always positive loss domain):
50
- perceived_loss = risk_score^α * λ
51
- """
52
- if x >= 0:
53
- # Gains domain (not typically used for risk)
54
- return x ** self.params.alpha
55
- else:
56
- # Loss domain (risk is always positive loss)
57
- return -self.params.lambda_param * ((-x) ** self.params.beta)
58
-
59
- def probability_weighting(self, p: float, is_gain: bool = False) -> float:
60
- """
61
- Probability weighting function π(p)
62
- Overweights small probabilities, underweights large probabilities
63
-
64
- π(p) = p^γ / (p^γ + (1-p)^γ)^(1/γ) for gains
65
- π(p) = p^δ / (p^δ + (1-p)^δ)^(1/δ) for losses
66
- """
67
- if p == 0:
68
- return 0
69
- if p == 1:
70
- return 1
71
-
72
- gamma = self.params.gamma if is_gain else self.params.delta
73
-
74
- numerator = p ** gamma
75
- denominator = (p ** gamma + (1 - p) ** gamma) ** (1 / gamma)
76
-
77
- return numerator / denominator
78
-
79
- def weighted_perceived_risk(self, risk_score: float) -> float:
80
- """
81
- Calculate prospect-theory weighted perceived risk
82
- Combines value function with probability weighting
83
- """
84
- # Loss domain (risk is always positive loss)
85
- base_value = self.value_function(-risk_score) # Negative because it's a loss
86
-
87
- # Probability weighting for losses
88
- weighted_prob = self.probability_weighting(risk_score, is_gain=False)
89
-
90
- # Combine
91
- perceived_risk = abs(base_value) * weighted_prob
92
-
93
- return min(1.0, perceived_risk)
94
-
95
- def calculate_psychological_impact(self, risk_score: float, license_tier: str) -> Dict[str, Any]:
96
- """
97
- Multi-dimensional psychological impact calculation
98
- Based on Prospect Theory with tier-specific adjustments
99
- """
100
- # Base perceived risk using Prospect Theory
101
- perceived_risk = self.weighted_perceived_risk(risk_score)
102
-
103
- # License-tier anxiety multiplier (enterprise reduces anxiety)
104
- anxiety_multipliers = {
105
- 'oss': 1.3, # Higher anxiety without protection
106
- 'trial': 1.0, # Balanced with temporary protection
107
- 'starter': 0.9, # Some protection
108
- 'professional': 0.8, # Good protection
109
- 'enterprise': 0.7 # Full protection
110
- }
111
-
112
- final_anxiety = perceived_risk * anxiety_multipliers.get(license_tier, 1.0)
113
-
114
- # Conversion probability based on anxiety and tier (sigmoid function)
115
- # Higher anxiety → higher conversion probability up to a point
116
- conversion_probability = self._sigmoid_conversion(final_anxiety, license_tier)
117
-
118
- # Urgency score (derivative of anxiety)
119
- urgency_score = min(1.0, final_anxiety * 1.2)
120
-
121
- # Loss aversion weight (tier-specific)
122
- loss_aversion_weight = self.params.lambda_param * (1 + (license_tier == 'oss') * 0.5)
123
-
124
- return {
125
- 'perceived_risk': round(perceived_risk, 3),
126
- 'anxiety_level': round(final_anxiety, 3),
127
- 'conversion_probability': round(conversion_probability, 3),
128
- 'urgency_score': round(urgency_score, 3),
129
- 'loss_aversion_weight': round(loss_aversion_weight, 2),
130
- 'psychological_impact_category': self._categorize_impact(final_anxiety),
131
- 'prospect_theory_parameters': {
132
- 'alpha': self.params.alpha,
133
- 'beta': self.params.beta,
134
- 'lambda': self.params.lambda_param,
135
- 'gamma': self.params.gamma,
136
- 'delta': self.params.delta
137
- }
138
- }
139
-
140
- def _sigmoid_conversion(self, anxiety: float, license_tier: str) -> float:
141
- """Sigmoid function for conversion probability"""
142
- # Base conversion curve
143
- x = (anxiety - 0.5) * 3 # Center at 0.5 anxiety, scale by 3
144
-
145
- # Sigmoid with tier-specific adjustments
146
- base_sigmoid = 1 / (1 + np.exp(-x))
147
-
148
- # Tier multipliers (enterprise users convert more easily)
149
- tier_multipliers = {
150
- 'oss': 0.6,
151
- 'trial': 0.8,
152
- 'starter': 0.85,
153
- 'professional': 0.9,
154
- 'enterprise': 0.95
155
- }
156
-
157
- multiplier = tier_multipliers.get(license_tier, 0.8)
158
- converted = base_sigmoid * multiplier
159
-
160
- # Add minimum conversion probability
161
- return min(0.95, max(0.1, converted))
162
-
163
- def _categorize_impact(self, anxiety: float) -> str:
164
- """Categorize psychological impact"""
165
- if anxiety > 0.8:
166
- return "CRITICAL_IMPACT"
167
- elif anxiety > 0.6:
168
- return "HIGH_IMPACT"
169
- elif anxiety > 0.4:
170
- return "MODERATE_IMPACT"
171
- elif anxiety > 0.2:
172
- return "LOW_IMPACT"
173
- else:
174
- return "MINIMAL_IMPACT"
175
-
176
- class BayesianSocialProofEngine:
177
- """Bayesian social proof optimization with credibility updating"""
178
-
179
- def __init__(self):
180
- # Beta distribution priors for different proof types
181
- # α = successes + 1, β = failures + 1
182
- self.priors = {
183
- 'fortune_500': (9, 2), # α=9, β=2 → 82% prior credibility
184
- 'scaleup': (7, 4), # α=7, β=4 → 64% prior credibility
185
- 'developer_count': (8, 3), # α=8, β=3 → 73% prior credibility
186
- 'savings': (10, 1), # α=10, β=1 → 91% prior credibility
187
- 'incident_reduction': (9, 2), # 82% prior credibility
188
- 'compliance': (8, 2), # 80% prior credibility
189
- }
190
-
191
- # User type profiles with likelihood weights
192
- self.user_profiles = {
193
- 'engineer': {
194
- 'fortune_500': 0.6,
195
- 'scaleup': 0.8,
196
- 'developer_count': 0.9,
197
- 'savings': 0.7,
198
- 'incident_reduction': 0.95,
199
- 'compliance': 0.5
200
- },
201
- 'executive': {
202
- 'fortune_500': 0.9,
203
- 'savings': 0.95,
204
- 'scaleup': 0.7,
205
- 'incident_reduction': 0.85,
206
- 'compliance': 0.9,
207
- 'developer_count': 0.4
208
- },
209
- 'investor': {
210
- 'savings': 0.9,
211
- 'fortune_500': 0.85,
212
- 'growth': 0.8,
213
- 'incident_reduction': 0.75,
214
- 'compliance': 0.7,
215
- 'scaleup': 0.6
216
- },
217
- 'compliance_officer': {
218
- 'compliance': 0.95,
219
- 'fortune_500': 0.8,
220
- 'incident_reduction': 0.85,
221
- 'savings': 0.6,
222
- 'developer_count': 0.3,
223
- 'scaleup': 0.4
224
- }
225
- }
226
-
227
- # Proof templates
228
- self.proof_templates = {
229
- 'fortune_500': {
230
- 'title': '🏢 Trusted by Fortune 500',
231
- 'message': 'Deployed at 50+ Fortune 500 companies including FAANG',
232
- 'icon': '🏢',
233
- 'credibility_baseline': 0.85
234
- },
235
- 'scaleup': {
236
- 'title': '🚀 Scale-up Proven',
237
- 'message': 'Trusted by 200+ high-growth tech scale-ups',
238
- 'icon': '🚀',
239
- 'credibility_baseline': 0.75
240
- },
241
- 'developer_count': {
242
- 'title': '👨‍💻 Developer Love',
243
- 'message': 'Join 1,000+ active developers using ARF for AI safety',
244
- 'icon': '👨‍💻',
245
- 'credibility_baseline': 0.8
246
- },
247
- 'savings': {
248
- 'title': '💰 Proven Savings',
249
- 'message': 'Average $3.9M breach cost prevented, 92% incident reduction',
250
- 'icon': '💰',
251
- 'credibility_baseline': 0.9
252
- },
253
- 'incident_reduction': {
254
- 'title': '🛡️ Risk Reduction',
255
- 'message': '92% of incidents prevented with mechanical gates',
256
- 'icon': '🛡️',
257
- 'credibility_baseline': 0.88
258
- },
259
- 'compliance': {
260
- 'title': '📋 Compliance Ready',
261
- 'message': 'SOC 2, GDPR, ISO 27001 certified with zero findings',
262
- 'icon': '📋',
263
- 'credibility_baseline': 0.82
264
- }
265
- }
266
-
267
- def get_optimized_proof(self, user_type: str, license_tier: str,
268
- risk_context: Dict[str, Any]) -> Dict[str, Any]:
269
- """
270
- Get psychologically optimized social proof using Bayesian updating
271
- """
272
- user_type = user_type if user_type in self.user_profiles else 'engineer'
273
- user_profile = self.user_profiles[user_type]
274
-
275
- # Calculate posterior credibility for each proof type
276
- posteriors = {}
277
- for proof_type, (alpha_prior, beta_prior) in self.priors.items():
278
- if proof_type not in user_profile:
279
- continue
280
-
281
- likelihood = user_profile[proof_type]
282
-
283
- # Bayesian update: Posterior = Beta(α + successes, β + failures)
284
- # successes = likelihood * 10, failures = (1 - likelihood) * 10
285
- successes = likelihood * 10
286
- failures = (1 - likelihood) * 10
287
-
288
- posterior_alpha = alpha_prior + successes
289
- posterior_beta = beta_prior + failures
290
-
291
- posterior_mean = posterior_alpha / (posterior_alpha + posterior_beta)
292
- posterior_variance = (posterior_alpha * posterior_beta) / \
293
- ((posterior_alpha + posterior_beta) ** 2 * \
294
- (posterior_alpha + posterior_beta + 1))
295
-
296
- posteriors[proof_type] = {
297
- 'credibility': posterior_mean,
298
- 'confidence': 1 - posterior_variance,
299
- 'alpha': posterior_alpha,
300
- 'beta': posterior_beta,
301
- 'likelihood': likelihood
302
- }
303
-
304
- if not posteriors:
305
- return self._get_default_proof(license_tier)
306
-
307
- # Select proof with highest credibility
308
- best_proof_type = max(posteriors.items(), key=lambda x: x[1]['credibility'])[0]
309
- best_proof_data = posteriors[best_proof_type]
310
-
311
- return self._format_proof(
312
- best_proof_type,
313
- best_proof_data,
314
- user_type,
315
- license_tier,
316
- risk_context
317
- )
318
-
319
- def _format_proof(self, proof_type: str, proof_data: Dict[str, Any],
320
- user_type: str, license_tier: str,
321
- risk_context: Dict[str, Any]) -> Dict[str, Any]:
322
- """Format social proof with credibility metrics"""
323
- template = self.proof_templates.get(
324
- proof_type,
325
- self.proof_templates['developer_count']
326
- )
327
-
328
- # Adjust message based on license tier
329
- tier_adjustments = {
330
- 'trial': "Start your free trial today",
331
- 'starter': "Upgrade to Starter for mechanical gates",
332
- 'professional': "Professional includes 24/7 support",
333
- 'enterprise': "Enterprise includes dedicated support"
334
- }
335
-
336
- adjusted_message = f"{template['message']}. {tier_adjustments.get(license_tier, '')}"
337
-
338
- return {
339
- **template,
340
- 'message': adjusted_message,
341
- 'proof_type': proof_type,
342
- 'credibility': round(proof_data['credibility'], 3),
343
- 'confidence': round(proof_data['confidence'], 3),
344
- 'credibility_interval': self._calculate_credibility_interval(
345
- proof_data['alpha'], proof_data['beta']
346
- ),
347
- 'optimized_for': user_type,
348
- 'recommended_for_tier': license_tier,
349
- 'risk_context_match': self._assess_risk_context_match(proof_type, risk_context),
350
- 'bayesian_parameters': {
351
- 'prior_alpha': self.priors[proof_type][0],
352
- 'prior_beta': self.priors[proof_type][1],
353
- 'posterior_alpha': proof_data['alpha'],
354
- 'posterior_beta': proof_data['beta'],
355
- 'likelihood': proof_data['likelihood']
356
- }
357
- }
358
-
359
- def _calculate_credibility_interval(self, alpha: float, beta: float,
360
- confidence: float = 0.95) -> Tuple[float, float]:
361
- """Calculate credibility interval for Beta distribution"""
362
- # Simplified calculation for demo
363
- mean = alpha / (alpha + beta)
364
- variance = (alpha * beta) / ((alpha + beta) ** 2 * (alpha + beta + 1))
365
- std_dev = np.sqrt(variance)
366
-
367
- # Approximate 95% interval
368
- lower = max(0, mean - 1.96 * std_dev)
369
- upper = min(1, mean + 1.96 * std_dev)
370
-
371
- return round(lower, 3), round(upper, 3)
372
-
373
- def _assess_risk_context_match(self, proof_type: str, risk_context: Dict[str, Any]) -> float:
374
- """Assess how well proof matches risk context"""
375
- risk_score = risk_context.get('risk_score', 0.5)
376
- risk_category = risk_context.get('risk_category', 'MEDIUM')
377
-
378
- # Proof effectiveness by risk level
379
- effectiveness = {
380
- 'fortune_500': {'LOW': 0.7, 'MEDIUM': 0.8, 'HIGH': 0.9, 'CRITICAL': 0.95},
381
- 'savings': {'LOW': 0.6, 'MEDIUM': 0.8, 'HIGH': 0.9, 'CRITICAL': 0.95},
382
- 'incident_reduction': {'LOW': 0.5, 'MEDIUM': 0.7, 'HIGH': 0.85, 'CRITICAL': 0.9},
383
- 'compliance': {'LOW': 0.6, 'MEDIUM': 0.7, 'HIGH': 0.8, 'CRITICAL': 0.85},
384
- 'developer_count': {'LOW': 0.8, 'MEDIUM': 0.7, 'HIGH': 0.6, 'CRITICAL': 0.5},
385
- 'scaleup': {'LOW': 0.7, 'MEDIUM': 0.75, 'HIGH': 0.8, 'CRITICAL': 0.7}
386
- }
387
-
388
- return effectiveness.get(proof_type, {}).get(risk_category, 0.7)
389
-
390
- def _get_default_proof(self, license_tier: str) -> Dict[str, Any]:
391
- """Get default social proof"""
392
- return {
393
- 'title': '👨‍💻 Developer Trusted',
394
- 'message': 'Join 1,000+ developers using ARF for AI safety',
395
- 'icon': '👨‍💻',
396
- 'credibility': 0.8,
397
- 'confidence': 0.7,
398
- 'proof_type': 'default',
399
- 'optimized_for': 'default',
400
- 'recommended_for_tier': license_tier,
401
- 'risk_context_match': 0.7,
402
- 'credibility_interval': (0.72, 0.88)
403
- }
404
-
405
- class EnhancedPsychologyEngine:
406
- """Complete psychology engine combining all principles"""
407
-
408
- def __init__(self):
409
- self.prospect_theory = ProspectTheoryEngine()
410
- self.social_proof = BayesianSocialProofEngine()
411
-
412
- # Loss aversion scenarios with financial impact
413
- self.loss_scenarios = {
414
- "CRITICAL": [
415
- {"text": "Data breach ($3.9M average cost)", "impact": 3900000},
416
- {"text": "Service disruption ($300k/hour)", "impact": 7200000},
417
- {"text": "Compliance fines (up to $20M)", "impact": 20000000},
418
- {"text": "Reputational damage (6+ months recovery)", "impact": 5000000}
419
- ],
420
- "HIGH": [
421
- {"text": "Data corruption (24h recovery)", "impact": 1000000},
422
- {"text": "Performance degradation (50% slower)", "impact": 500000},
423
- {"text": "Security vulnerability exposure", "impact": 750000},
424
- {"text": "Customer churn (15% increase)", "impact": 1500000}
425
- ],
426
- "MEDIUM": [
427
- {"text": "Increased operational overhead", "impact": 250000},
428
- {"text": "Manual review delays (2+ hours)", "impact": 150000},
429
- {"text": "Team productivity loss (20%)", "impact": 300000},
430
- {"text": "Audit findings & remediation", "impact": 200000}
431
- ],
432
- "LOW": [
433
- {"text": "Minor configuration drift", "impact": 50000},
434
- {"text": "Documentation gaps", "impact": 25000},
435
- {"text": "Process inefficiencies", "impact": 75000},
436
- {"text": "Training requirements", "impact": 100000}
437
- ]
438
- }
439
-
440
- # Scarcity messaging with mathematical decay
441
- self.scarcity_patterns = {
442
- "trial": {
443
- "base_urgency": 0.8,
444
- "decay_rate": 0.07, # per day
445
- "messages": [
446
- "⏳ {days} days remaining in free trial",
447
- "🎁 Trial ends in {days} days - upgrade to keep mechanical gates",
448
- "⚠️ Free access expires in {days} days"
449
- ]
450
- },
451
- "starter": {
452
- "base_urgency": 0.6,
453
- "decay_rate": 0.05,
454
- "messages": [
455
- "💰 Special pricing ends in {days} days",
456
- "👥 Limited seats at current price",
457
- "⏰ Quarterly offer expires soon"
458
- ]
459
- }
460
- }
461
-
462
- # Authority signals with credibility scores
463
- self.authority_signals = [
464
- {"text": "SOC 2 Type II Certified", "credibility": 0.95, "audience": ["executive", "compliance"]},
465
- {"text": "GDPR & CCPA Compliant", "credibility": 0.9, "audience": ["compliance", "executive"]},
466
- {"text": "ISO 27001 Certified", "credibility": 0.92, "audience": ["executive", "compliance"]},
467
- {"text": "99.9% SLA Guarantee", "credibility": 0.88, "audience": ["engineer", "executive"]},
468
- {"text": "24/7 Dedicated Support", "credibility": 0.85, "audience": ["engineer", "executive"]},
469
- {"text": "On-prem Deployment Available", "credibility": 0.87, "audience": ["executive", "compliance"]},
470
- {"text": "Fortune 500 Deployed", "credibility": 0.93, "audience": ["executive", "investor"]},
471
- {"text": "Venture Backed", "credibility": 0.8, "audience": ["investor", "executive"]}
472
- ]
473
-
474
- def generate_comprehensive_insights(self, risk_score: float, risk_category: str,
475
- license_tier: str, user_type: str = "engineer",
476
- days_remaining: int = 14) -> Dict[str, Any]:
477
- """
478
- Generate comprehensive psychological insights for investor demos
479
- """
480
- # Prospect Theory impact
481
- prospect_impact = self.prospect_theory.calculate_psychological_impact(
482
- risk_score, license_tier
483
- )
484
-
485
- # Social proof optimization
486
- social_proof = self.social_proof.get_optimized_proof(
487
- user_type, license_tier,
488
- {"risk_score": risk_score, "risk_category": risk_category}
489
- )
490
-
491
- # Loss aversion framing
492
- loss_aversion = self._generate_loss_aversion_framing(risk_category, risk_score)
493
-
494
- # Scarcity messaging
495
- scarcity = self._generate_scarcity_message(license_tier, days_remaining)
496
-
497
- # Authority signals
498
- authority = self._generate_authority_signals(user_type)
499
-
500
- # Anchoring effect (reference pricing)
501
- anchoring = self._generate_anchoring_effect(license_tier)
502
-
503
- # Conversion prediction
504
- conversion_prediction = self._predict_conversion(
505
- prospect_impact['anxiety_level'],
506
- social_proof['credibility'],
507
- scarcity.get('urgency', 0.5),
508
- license_tier
509
- )
510
-
511
- return {
512
- "prospect_theory_impact": prospect_impact,
513
- "optimized_social_proof": social_proof,
514
- "loss_aversion_framing": loss_aversion,
515
- "scarcity_signaling": scarcity,
516
- "authority_signals": authority,
517
- "anchoring_effects": anchoring,
518
- "conversion_prediction": conversion_prediction,
519
- "psychological_summary": self._generate_psychological_summary(
520
- prospect_impact, social_proof, loss_aversion
521
- ),
522
- "user_type": user_type,
523
- "license_tier": license_tier,
524
- "risk_context": {
525
- "score": risk_score,
526
- "category": risk_category,
527
- "perceived_impact": prospect_impact['perceived_risk']
528
- }
529
- }
530
-
531
- def _generate_loss_aversion_framing(self, risk_category: str, risk_score: float) -> Dict[str, Any]:
532
- """Generate loss aversion framing with financial impact"""
533
- scenarios = self.loss_scenarios.get(risk_category, self.loss_scenarios["MEDIUM"])
534
-
535
- # Select scenarios based on risk score
536
- num_scenarios = min(3, int(risk_score * 4) + 1)
537
- selected = random.sample(scenarios, min(num_scenarios, len(scenarios)))
538
-
539
- # Calculate total potential impact
540
- total_impact = sum(s["impact"] for s in selected)
541
-
542
- return {
543
- "title": f"🚨 Without Enterprise protection, you risk:",
544
- "scenarios": [s["text"] for s in selected],
545
- "total_potential_impact": f"${total_impact:,.0f}",
546
- "average_scenario_impact": f"${total_impact/len(selected):,.0f}",
547
- "risk_category": risk_category,
548
- "psychological_impact": "HIGH" if risk_category in ["CRITICAL", "HIGH"] else "MODERATE"
549
- }
550
-
551
- def _generate_scarcity_message(self, license_tier: str, days_remaining: int) -> Dict[str, Any]:
552
- """Generate scarcity messaging with mathematical urgency"""
553
- if license_tier not in self.scarcity_patterns:
554
- return {"message": "", "urgency": 0.0}
555
-
556
- pattern = self.scarcity_patterns[license_tier]
557
-
558
- # Calculate urgency with decay
559
- urgency = pattern["base_urgency"] * (1 - pattern["decay_rate"] * (14 - days_remaining))
560
- urgency = max(0.1, min(0.95, urgency))
561
-
562
- # Select message
563
- message_template = random.choice(pattern["messages"])
564
- message = message_template.format(days=days_remaining)
565
-
566
- return {
567
- "message": message,
568
- "urgency": round(urgency, 2),
569
- "days_remaining": days_remaining,
570
- "urgency_category": "HIGH" if urgency > 0.7 else "MEDIUM" if urgency > 0.4 else "LOW"
571
- }
572
-
573
- def _generate_authority_signals(self, user_type: str, count: int = 3) -> List[Dict[str, Any]]:
574
- """Generate authority signals optimized for user type"""
575
- # Filter signals for user type
576
- relevant_signals = [
577
- s for s in self.authority_signals
578
- if user_type in s["audience"]
579
- ]
580
-
581
- # Sort by credibility
582
- relevant_signals.sort(key=lambda x: x["credibility"], reverse=True)
583
-
584
- # Select top signals
585
- selected = relevant_signals[:count]
586
-
587
- return [
588
- {
589
- "text": s["text"],
590
- "credibility": s["credibility"],
591
- "relevance_to_user": "HIGH" if user_type in s["audience"] else "MEDIUM",
592
- "formatted": f"✓ {s['text']} ({s['credibility']:.0%} credibility)"
593
- }
594
- for s in selected
595
- ]
596
-
597
- def _generate_anchoring_effect(self, current_tier: str) -> Dict[str, Any]:
598
- """Generate anchoring effects for pricing"""
599
- tier_prices = {
600
- "oss": 0,
601
- "trial": 0,
602
- "starter": 2000,
603
- "professional": 5000,
604
- "enterprise": 15000
605
- }
606
-
607
- current_price = tier_prices.get(current_tier, 0)
608
-
609
- # Generate reference prices (anchors)
610
- anchors = []
611
- for tier, price in tier_prices.items():
612
- if price > current_price:
613
- discount = ((price - current_price) / price) * 100
614
- anchors.append({
615
- "reference_tier": tier,
616
- "reference_price": price,
617
- "discount_percentage": round(discount, 1),
618
- "anchor_strength": "STRONG" if discount > 50 else "MODERATE"
619
- })
620
-
621
- # Select strongest anchor
622
- if anchors:
623
- strongest_anchor = max(anchors, key=lambda x: x["discount_percentage"])
624
- else:
625
- strongest_anchor = {
626
- "reference_tier": "enterprise",
627
- "reference_price": 15000,
628
- "discount_percentage": 100.0,
629
- "anchor_strength": "MAXIMUM"
630
- }
631
-
632
- return {
633
- "current_tier": current_tier,
634
- "current_price": current_price,
635
- "anchors": anchors,
636
- "strongest_anchor": strongest_anchor,
637
- "perceived_value": f"{strongest_anchor['discount_percentage']:.0f}% discount vs {strongest_anchor['reference_tier']}",
638
- "anchoring_effect_strength": strongest_anchor["anchor_strength"]
639
- }
640
-
641
- def _predict_conversion(self, anxiety: float, social_credibility: float,
642
- scarcity_urgency: float, license_tier: str) -> Dict[str, Any]:
643
- """Predict conversion probability using multiple factors"""
644
- # Base conversion probability
645
- base_prob = anxiety * 0.6 + social_credibility * 0.3 + scarcity_urgency * 0.1
646
-
647
- # Tier adjustment
648
- tier_multipliers = {
649
- 'oss': 1.0,
650
- 'trial': 1.2,
651
- 'starter': 1.1,
652
- 'professional': 1.0,
653
- 'enterprise': 0.8
654
- }
655
-
656
- adjusted_prob = base_prob * tier_multipliers.get(license_tier, 1.0)
657
- adjusted_prob = min(0.95, max(0.05, adjusted_prob))
658
-
659
- # Confidence interval
660
- std_error = np.sqrt(adjusted_prob * (1 - adjusted_prob) / 100) # Assuming 100 samples
661
- ci_lower = max(0, adjusted_prob - 1.96 * std_error)
662
- ci_upper = min(1, adjusted_prob + 1.96 * std_error)
663
-
664
- return {
665
- "conversion_probability": round(adjusted_prob, 3),
666
- "confidence_interval": (round(ci_lower, 3), round(ci_upper, 3)),
667
- "confidence_width": round(ci_upper - ci_lower, 3),
668
- "key_factors": {
669
- "anxiety_contribution": round(anxiety * 0.6, 3),
670
- "social_proof_contribution": round(social_credibility * 0.3, 3),
671
- "scarcity_contribution": round(scarcity_urgency * 0.1, 3)
672
- },
673
- "prediction_quality": "HIGH" if (ci_upper - ci_lower) < 0.2 else "MODERATE"
674
- }
675
-
676
- def _generate_psychological_summary(self, prospect_impact: Dict,
677
- social_proof: Dict, loss_aversion: Dict) -> str:
678
- """Generate psychological summary for investors"""
679
- anxiety = prospect_impact.get('anxiety_level', 0.5)
680
- credibility = social_proof.get('credibility', 0.7)
681
-
682
- if anxiety > 0.7 and credibility > 0.8:
683
- return "HIGH CONVERSION POTENTIAL: Strong anxiety combined with credible social proof creates ideal conversion conditions."
684
- elif anxiety > 0.5:
685
- return "GOOD CONVERSION POTENTIAL: Moderate anxiety levels with supporting social proof suggest healthy conversion rates."
686
- elif credibility > 0.85:
687
- return "STRONG SOCIAL PROOF: High credibility signals will drive conversions even with lower anxiety levels."
688
- else:
689
- return "BASIC CONVERSION SETUP: Standard psychological triggers in place. Consider increasing urgency or social proof."